Skip to main content
The Python SDK provides the AgentMemory class — the primary interface for reading, writing, and managing agent memory.

Installation

pip install amfs
Set the connection env vars and the SDK auto-detects the HTTP adapter:
export AMFS_HTTP_URL="https://amfs-login.sense-lab.ai"
export AMFS_API_KEY="<your-api-key>"

Creating an Instance

from amfs import AgentMemory

mem = AgentMemory(agent_id="my-agent")

With Custom Configuration

from pathlib import Path

mem = AgentMemory(
    agent_id="my-agent",
    config_path=Path("./custom-amfs.yaml"),
    ttl_sweep_interval=60.0,
    decay_half_life_days=30.0,
)

With a Pre-configured Adapter

from amfs_filesystem import FilesystemAdapter

adapter = FilesystemAdapter(root=Path(".amfs"), namespace="staging")
mem = AgentMemory(agent_id="my-agent", adapter=adapter)

With an Importance Evaluator

from amfs_core.importance import ImportanceEvaluator

class MyEvaluator(ImportanceEvaluator):
    def evaluate(self, entity_path, key, value):
        score = 0.8 if "critical" in str(value).lower() else 0.3
        return score, {"criticality": score}

mem = AgentMemory(agent_id="my-agent", importance_evaluator=MyEvaluator())
Every write() call will automatically score the entry and set importance_score and importance_dimensions. If the evaluator raises, the write proceeds without scoring.

Core Operations

Write

entry = mem.write(
    "checkout-service",          # entity_path
    "retry-pattern",             # key
    {"max_retries": 3},          # value (any JSON-serializable data)
    confidence=0.85,             # optional, default 1.0
    pattern_refs=["retry"],      # optional cross-references
    memory_type=MemoryType.FACT, # optional: fact (default), belief, or experience
)
Write with a TTL (time-to-live):
from datetime import datetime, timedelta, timezone

mem.write(
    "svc", "temp-flag", {"active": True},
    ttl_at=datetime.now(timezone.utc) + timedelta(hours=24),
)

Read

entry = mem.read("checkout-service", "retry-pattern")

if entry:
    print(entry.value)
    print(entry.version)
    print(entry.confidence)
With minimum confidence filter:
entry = mem.read("svc", "pattern", min_confidence=0.5)

List

# All entries
entries = mem.list()

# Entries for a specific entity
entries = mem.list("checkout-service")

# Include superseded versions
entries = mem.list("checkout-service", include_superseded=True)
results = mem.search(entity_path="checkout-service", min_confidence=0.5)
Progressive retrieval with depth — search only high-priority tiers for fast, high-signal results:
hot_only = mem.search(query="retry strategy", depth=1)   # Hot tier
hot_warm = mem.search(query="retry strategy", depth=2)   # Hot + Warm
all_tiers = mem.search(query="retry strategy")            # All (default)
Composite recall scoring (blends semantic similarity, recency, and confidence):
from amfs_core.models import RecallConfig

scored = mem.search(
    query="how do we handle retries?",
    recall_config=RecallConfig(semantic_weight=0.5, recency_weight=0.3, confidence_weight=0.2),
)
for item in scored:
    print(f"{item.entry.key} — score={item.score:.3f}")
Semantic scoring requires an embedder. Without one, the semantic component is 0.0.

Stats

stats = mem.stats()
print(f"Total entries: {stats.total_entries}")
print(f"Total outcomes: {stats.total_outcomes}")

Outcomes

Recording Outcomes

from amfs import OutcomeType

# With explicit causal keys
updated = mem.commit_outcome(
    outcome_ref="INC-1042",
    outcome_type=OutcomeType.CRITICAL_FAILURE,
    causal_entry_keys=["checkout-service/retry-pattern"],
)

# With auto-causal linking (uses everything read in this session)
updated = mem.commit_outcome(
    outcome_ref="DEP-300",
    outcome_type=OutcomeType.SUCCESS,
)

Outcome Types

OutcomeType.CRITICAL_FAILURE  # × 1.15
OutcomeType.FAILURE           # × 1.10
OutcomeType.MINOR_FAILURE     # × 1.08
OutcomeType.SUCCESS           # × 0.97

Memory Types

Classify entries to control decay behavior:
from amfs import MemoryType

# Facts (default) — objective knowledge, standard decay
mem.write("svc", "config", {"pool_size": 10}, memory_type=MemoryType.FACT)

# Beliefs — subjective inferences, decay 2× faster
mem.write("svc", "hypothesis", "Likely an N+1 query issue", memory_type=MemoryType.BELIEF)

# Experiences — action logs, decay 1.5× slower
mem.write("svc", "action-log", "Added index on user_id", memory_type=MemoryType.EXPERIENCE)

History (Temporal Queries)

Retrieve the full version history of an entry with optional time filtering:
from datetime import datetime, timedelta, timezone

# All versions
versions = mem.history("checkout-service", "retry-pattern")
for v in versions:
    print(f"v{v.version} — confidence: {v.confidence}{v.provenance.written_at}")

# Versions from the last 7 days
since = datetime.now(timezone.utc) - timedelta(days=7)
recent = mem.history("checkout-service", "retry-pattern", since=since)

Explainability

Inspect the causal chain — which entries were read during the current session and how they connect to outcomes:
chain = mem.explain()
print(chain["session_id"])
print(chain["causal_keys"])   # list of entity_path/key pairs that were read
print(chain["entries"])       # full entry details for each causal key
Filter by outcome reference:
chain = mem.explain(outcome_ref="INC-1042")

Decision Traces

When you call commit_outcome(), AMFS snapshots the full decision trace — every entry that was read, every context that was recorded, and every query that was made. The resulting trace is persisted and can be retrieved later.

Getting the trace from an outcome

mem.record_context("ci-check", "All tests green", source="GitHub Actions")
entry = mem.read("checkout-service", "retry-pattern")

updated = mem.commit_outcome("DEP-500", OutcomeType.SUCCESS)

# The trace is attached to the outcome
trace = mem._last_trace
print(f"Trace ID: {trace.id}")
print(f"Causal entries: {len(trace.causal_entries)}")
print(f"External contexts: {len(trace.external_contexts)}")

Browsing past traces

# List recent traces
traces = mem._adapter.list_traces(limit=10)
for t in traces:
    print(f"{t['id']}{t['agent_id']}{t['outcome_ref']} ({t['outcome_type']})")

# Get a specific trace
trace = mem._adapter.get_trace("ddbcefff-901a-4fa6-...")
print(trace.decision_summary)
print(f"Session duration: {trace.session_duration_ms}ms")
for entry in trace.causal_entries:
    print(f"  Read: {entry.entity_path}/{entry.key} (v{entry.version})")

Filtering traces

traces = mem._adapter.list_traces(
    entity_path="checkout-service",
    agent_id="deploy-agent",
    outcome_type="success",
    limit=5,
)

Tool Context

When agents call external tools or APIs, there are two ways to capture that context in AMFS depending on your needs.

Record in the causal chain (lightweight)

Use record_context() to add external inputs to the causal chain without writing to storage. This makes explain() return a complete decision trace:
entry = mem.read("checkout-service", "retry-pattern")

mem.record_context(
    "pagerduty-incidents",
    "3 SEV-1 incidents in the last 24h for checkout-service",
    source="PagerDuty API",
)
mem.record_context(
    "git-log",
    "15 commits since last deploy, 3 touching retry logic",
    source="git",
)

mem.commit_outcome("DEP-500", OutcomeType.SUCCESS)

chain = mem.explain()
print(chain["causal_entries"])     # AMFS entries that were read
print(chain["external_contexts"])  # tool/API inputs that informed the decision

Persist for other agents (durable)

Use MemoryType.EXPERIENCE with a TTL to store tool results so downstream agents can retrieve them:
from datetime import datetime, timedelta, timezone

mem.write(
    "checkout-service",
    "tool-result-pagerduty",
    {"incidents": 3, "sev1": True, "last_24h": True},
    memory_type=MemoryType.EXPERIENCE,
    ttl_at=datetime.now(timezone.utc) + timedelta(hours=1),
)
The next agent reads it with mem.read("checkout-service", "tool-result-pagerduty") instead of re-calling the API.

Watch

Get real-time notifications when entries change:
def on_change(entry):
    print(f"{entry.key} updated to v{entry.version}")

handle = mem.watch("checkout-service", on_change)

# Stop watching
handle.cancel()

Briefing (Memory Cortex)

When the Memory Cortex is running, agents can retrieve pre-compiled knowledge digests ranked by relevance. This is how agents consume the “brain brief” — compiled summaries of entities, other agents, and external sources — without having to search through raw memory entries.

Basic Usage

mem = AgentMemory(agent_id="deploy-agent", adapter=adapter)

# Get a ranked briefing of compiled knowledge
briefs = mem.briefing(entity_path="myapp/checkout-service", limit=5)

for digest in briefs:
    print(digest.digest_type)   # "entity", "agent_brief", "source", or "connection_map"
    print(digest.scope)         # the entity path, agent ID, or source ID
    print(digest.summary)       # structured summary (varies by digest type)
    print(digest.entry_count)   # number of source entries compiled
    print(digest.compiled_at)   # when the digest was last compiled

Parameters

ParameterTypeDescription
entity_pathstr | NoneFocus on digests relevant to this entity
agent_idstr | NoneFocus on digests relevant to this agent
limitintMaximum number of digests to return (default: 10)

Digest Types

TypeScopeWhat It Contains
entityEntity path (e.g. myapp/checkout-service)Summary of all knowledge about an entity — key count, average confidence, top keys, narrative
agent_briefAgent ID (e.g. deploy-agent)Summary of an agent’s knowledge and activity — entries written, entities touched, outcomes
sourceSource ID (e.g. github)Summary of external data from a connector — events ingested, entities touched
connection_mapCross-entity scopeCross-entity relationships (Pro)

Workflow Integration

Call briefing() at the start of a task to get context before making decisions:
with AgentMemory(agent_id="deploy-agent", adapter=adapter) as mem:
    # Step 1: Get a briefing on what you need to know
    briefs = mem.briefing(entity_path="myapp/checkout-service", limit=5)
    for digest in briefs:
        print(f"[{digest.digest_type}] {digest.scope}: {digest.summary.get('narrative', '')}")

    # Step 2: Read specific entries based on the briefing
    entry = mem.read("myapp/checkout-service", "retry-pattern")

    # Step 3: Do your work, record context
    mem.record_context("ci-pipeline", "All tests passing, deploy ready", source="GitHub Actions")

    # Step 4: Commit the outcome
    mem.commit_outcome("DEP-500", OutcomeType.SUCCESS)
If the Cortex is not running, briefing() returns an empty list — your agent code can safely call it without checking.

Snapshots

Export and import the full state of your memory:
from amfs_core.snapshot import SnapshotExporter, SnapshotImporter

# Export
exporter = SnapshotExporter(mem.adapter)
exporter.export("backup.json")

# Import into a different adapter
from amfs_filesystem import FilesystemAdapter
target = FilesystemAdapter(root=Path("/new/.amfs"), namespace="restored")
importer = SnapshotImporter(target)
importer.restore("backup.json")

Knowledge Graph

The knowledge graph builds automatically as agents write, commit outcomes, and learn from each other. You can also traverse it directly:
edges = mem.graph_neighbors(
    "checkout-service/retry-pattern",
    direction="both",
    depth=2,
    min_confidence=0.5,
)
for edge in edges:
    print(f"{edge.source_entity} --{edge.relation}--> {edge.target_entity}")
ParameterDescription
entityStarting entity to explore
relationFilter by relation type (e.g. "references", "informed")
direction"outgoing", "incoming", or "both"
depthTraversal depth (1 = direct neighbors, >1 for multi-hop)
Multi-hop traversal (depth > 1) requires the Postgres adapter. The Filesystem and S3 adapters return an empty list for graph methods.

If you configure an embedder, you can search by meaning:
results = mem.semantic_search("how do we handle retries?", top_k=5)
for entry, score in results:
    print(f"{entry.key} (similarity: {score:.3f})")

Context Manager

Use AgentMemory as a context manager for automatic cleanup:
with AgentMemory(agent_id="my-agent") as mem:
    mem.write("svc", "key", "value")
    entry = mem.read("svc", "key")
# Watchers, TTL sweepers, and background threads are cleaned up

Connecting to AMFS

Set two env vars and the SDK connects automatically — no adapter setup needed:
export AMFS_HTTP_URL="https://amfs-login.sense-lab.ai"
export AMFS_API_KEY="<your-api-key>"
from amfs import AgentMemory

mem = AgentMemory(agent_id="my-agent")
mem.write("checkout-service", "retry-pattern", {"max_retries": 3})
See the Connect to AMFS guide for details.

Conflict Handling

Handle concurrent writes to the same key:
from amfs_core.models import ConflictPolicy

# Raise an error on conflict
mem = AgentMemory(
    agent_id="my-agent",
    conflict_policy=ConflictPolicy.RAISE,
)

# Custom conflict resolution
def merge(existing, incoming, value):
    return {**existing.value, **value}

mem = AgentMemory(
    agent_id="my-agent",
    on_conflict=merge,
)