Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.sense-lab.ai/llms.txt

Use this file to discover all available pages before exploring further.

The Python SDK provides the AgentMemory class — the primary interface for reading, writing, and managing agent memory.

Installation

pip install amfs
Set the connection env vars and the SDK auto-detects the HTTP adapter:
export AMFS_HTTP_URL="https://amfs.sense-lab.ai"
export AMFS_API_KEY="<your-api-key>"

Creating an Instance

from amfs import AgentMemory

mem = AgentMemory(agent_id="my-agent")

With Custom Configuration

from pathlib import Path

mem = AgentMemory(
    agent_id="my-agent",
    config_path=Path("./custom-amfs.yaml"),
    ttl_sweep_interval=60.0,
    decay_half_life_days=30.0,
)

With a Pre-configured Adapter

from amfs_filesystem import FilesystemAdapter

adapter = FilesystemAdapter(root=Path(".amfs"), namespace="staging")
mem = AgentMemory(agent_id="my-agent", adapter=adapter)

With an Embedder

from amfs_core.embedder import EmbedderABC

class MyEmbedder(EmbedderABC):
    def embed(self, text: str) -> list[float]:
        # Return embedding vector
        ...
    def embed_value(self, value) -> list[float]:
        return self.embed(str(value))

mem = AgentMemory(agent_id="my-agent", embedder=MyEmbedder())

With an Importance Evaluator

from amfs_core.importance import ImportanceEvaluator

class MyEvaluator(ImportanceEvaluator):
    def evaluate(self, entity_path, key, value):
        score = 0.8 if "critical" in str(value).lower() else 0.3
        return score, {"criticality": score}

mem = AgentMemory(agent_id="my-agent", importance_evaluator=MyEvaluator())
Every write() call will automatically score the entry and set importance_score and importance_dimensions. If the evaluator raises, the write proceeds without scoring.

Full Constructor Signature

AgentMemory(
    agent_id: str,
    *,
    session_id: str | None = None,
    config_path: Path | None = None,
    adapter: AdapterABC | None = None,
    ttl_sweep_interval: float | None = None,
    decay_half_life_days: float | None = None,
    embedder: EmbedderABC | None = None,
    conflict_policy: ConflictPolicy = ConflictPolicy.LAST_WRITE_WINS,
    on_conflict: Callable | None = None,
    importance_evaluator: Any | None = None,
)

Core Operations

Write

entry = mem.write(
    "checkout-service",              # entity_path
    "retry-pattern",                 # key
    {"max_retries": 3},              # value (any JSON-serializable data)
    confidence=0.85,                 # optional, default 1.0
    pattern_refs=["retry"],          # optional cross-references
    memory_type=MemoryType.FACT,     # optional: fact (default), belief, or experience
    shared=True,                     # optional, default True — visible to other agents
    artifact_refs=None,              # optional list of ArtifactRef
    branch=None,                     # optional, defaults to "main"
)
Write with a TTL (time-to-live):
from datetime import datetime, timedelta, timezone

mem.write(
    "svc", "temp-flag", {"active": True},
    ttl_at=datetime.now(timezone.utc) + timedelta(hours=24),
)

Read

entry = mem.read("checkout-service", "retry-pattern")

if entry:
    print(entry.value)
    print(entry.version)
    print(entry.confidence)
With minimum confidence filter:
entry = mem.read("svc", "pattern", min_confidence=0.5)
Read from a specific branch:
entry = mem.read("svc", "pattern", branch="experiment-1")

List

# All entries
entries = mem.list()

# Entries for a specific entity
entries = mem.list("checkout-service")

# Include superseded versions
entries = mem.list("checkout-service", include_superseded=True)

# From a specific branch
entries = mem.list("checkout-service", branch="experiment-1")
results = mem.search(entity_path="checkout-service", min_confidence=0.5)
Full search options:
results = mem.search(
    query="retry strategy",              # text query (full-text via adapter)
    entity_path="checkout-service",      # filter by entity
    entity_paths=["svc-a", "svc-b"],    # multi-scope search
    min_confidence=0.5,                  # minimum confidence
    max_confidence=1.0,                  # maximum confidence
    agent_id="review-agent",             # filter by author
    since=datetime(2026, 3, 1),          # filter by time
    pattern_ref="retry-pattern",         # filter by pattern reference
    sort_by="confidence",                # "confidence" | "recency" | "version"
    limit=100,                           # max results
    depth=3,                             # tier depth (1=HOT, 2=HOT+WARM, 3=all)
)
Progressive retrieval with depth — search only high-priority tiers for fast, high-signal results:
hot_only = mem.search(query="retry strategy", depth=1)   # Hot tier
hot_warm = mem.search(query="retry strategy", depth=2)   # Hot + Warm
all_tiers = mem.search(query="retry strategy")            # All (default)
Composite recall scoring (blends semantic similarity, recency, and confidence):
from amfs import RecallConfig

scored = mem.search(
    query="how do we handle retries?",
    recall_config=RecallConfig(semantic_weight=0.5, recency_weight=0.3, confidence_weight=0.2),
)
for item in scored:
    print(f"{item.entry.key} — score={item.score:.3f}")
Semantic scoring requires an embedder. Without one, the semantic component is 0.0.

Stats

stats = mem.stats()
print(f"Total entries: {stats.total_entries}")
print(f"Total entities: {stats.total_entities}")
print(f"Total agents: {stats.total_agents}")
print(f"Confidence avg: {stats.confidence_avg}")
print(f"Outcome-linked: {stats.outcome_linked_count}")

Outcomes

Recording Outcomes

from amfs import OutcomeType

# With explicit causal keys
updated = mem.commit_outcome(
    outcome_ref="INC-1042",
    outcome_type=OutcomeType.CRITICAL_FAILURE,
    causal_entry_keys=["checkout-service/retry-pattern"],
)

# With auto-causal linking (uses everything read in this session)
updated = mem.commit_outcome(
    outcome_ref="DEP-300",
    outcome_type=OutcomeType.SUCCESS,
)

# With additional options
updated = mem.commit_outcome(
    outcome_ref="DEP-500",
    outcome_type=OutcomeType.SUCCESS,
    causal_confidence=0.9,
    decision_summary="Deployed after CI passed and PagerDuty clear",
)

Outcome Types

OutcomeType.CRITICAL_FAILURE  # × 1.15
OutcomeType.FAILURE           # × 1.10
OutcomeType.MINOR_FAILURE     # × 1.08
OutcomeType.SUCCESS           # × 0.97

Memory Types

Classify entries to control decay behavior:
from amfs import MemoryType

# Facts (default) — objective knowledge, standard decay
mem.write("svc", "config", {"pool_size": 10}, memory_type=MemoryType.FACT)

# Beliefs — subjective inferences, decay 2× faster
mem.write("svc", "hypothesis", "Likely an N+1 query issue", memory_type=MemoryType.BELIEF)

# Experiences — action logs, decay 1.5× slower
mem.write("svc", "action-log", "Added index on user_id", memory_type=MemoryType.EXPERIENCE)

Agent-Scoped Memory

The SDK distinguishes between shared memory and agent-scoped memory. Use recall, my_entries, and read_from for explicit agent-level operations.

Recall

Read only entries written by this agent — what this brain knows from direct experience:
mine = mem.recall("checkout-service", "retry-pattern")
Unlike read(), which returns the latest version by any agent, recall() falls back through history to find the most recent version authored by this agent.

My Entries

List everything this agent has written:
all_mine = mem.my_entries("checkout-service")

Read From Another Agent

Explicitly read from another agent’s memory with tracked knowledge transfer:
theirs = mem.read_from("deploy-agent", "checkout-service", "deploy-config")
This creates a graph edge (learned_from) and logs cross-agent read events on both sides for full traceability.

Cross-Agent Reads

See which other agents’ memory this agent has consumed:
reads = mem.cross_agent_reads()
# {'deploy-agent': [{'entity_path': 'checkout-service', 'key': 'retry-pattern', 'read_count': 3}]}

# Convenience: just the agent IDs
agents = mem.agents_i_read_from()
# ['deploy-agent', 'review-agent']

Scopes

A MemoryScope provides a focused view bound to a single entity path, reducing boilerplate:
from amfs import MemoryScope

checkout = mem.scope("checkout-service")

# All operations are scoped
checkout.write("retry-pattern", {"max_retries": 5}, confidence=0.9)
entry = checkout.read("retry-pattern")
all_entries = checkout.list()
history = checkout.history("retry-pattern")
scope_info = checkout.info()

# Readonly scopes prevent accidental writes
readonly = mem.scope("checkout-service", readonly=True)
readonly.write("key", "value")  # raises PermissionError

Scope Utilities

# List all entity paths that contain entries
scopes = mem.list_scopes()

# Get summary info about a scope
info = mem.info("checkout-service")
# ScopeInfo(path, entry_count, avg_confidence, keys, oldest, newest)

# Render entity paths as an indented tree
print(mem.tree())
# myapp (5)
#   auth (2)
#   checkout-service (3)

History (Temporal Queries)

Retrieve the full version history of an entry with optional time filtering:
from datetime import datetime, timedelta, timezone

# All versions
versions = mem.history("checkout-service", "retry-pattern")
for v in versions:
    print(f"v{v.version} — confidence: {v.confidence}{v.provenance.written_at}")

# Versions from the last 7 days
since = datetime.now(timezone.utc) - timedelta(days=7)
recent = mem.history("checkout-service", "retry-pattern", since=since)

Explainability

Inspect the causal chain — which entries were read during the current session and how they connect to outcomes:
chain = mem.explain()
print(chain["session_id"])
print(chain["causal_entries"])     # full entry details for each causal key
print(chain["external_contexts"])  # tool/API inputs that informed the decision
Filter by outcome reference:
chain = mem.explain(outcome_ref="INC-1042")

Decision Traces

When you call commit_outcome(), AMFS snapshots the full decision trace — every entry that was read, every context that was recorded, and every query that was made. The resulting trace is persisted and can be retrieved later.

Getting the Trace from an Outcome

mem.record_context("ci-check", "All tests green", source="GitHub Actions")
entry = mem.read("checkout-service", "retry-pattern")

updated = mem.commit_outcome("DEP-500", OutcomeType.SUCCESS)

# The trace is available after commit
trace = mem._last_trace
print(f"Trace ID: {trace.id}")
print(f"Causal entries: {len(trace.causal_entries)}")
print(f"External contexts: {len(trace.external_contexts)}")
print(f"Session duration: {trace.session_duration_ms}ms")

Browsing Past Traces

# List recent traces (via adapter)
traces = mem.adapter.list_traces(limit=10)
for t in traces:
    print(f"{t.id}{t.agent_id}{t.outcome_ref} ({t.outcome_type})")

# Get a specific trace
trace = mem.adapter.get_trace("ddbcefff-901a-4fa6-...")
print(trace.decision_summary)
for entry in trace.causal_entries:
    print(f"  Read: {entry.entity_path}/{entry.key} (v{entry.version})")

Filtering Traces

traces = mem.adapter.list_traces(
    entity_path="checkout-service",
    agent_id="deploy-agent",
    outcome_type="success",
    limit=5,
)

Tool Context

When agents call external tools or APIs, there are two ways to capture that context in AMFS depending on your needs.

Record in the Causal Chain (Lightweight)

Use record_context() to add external inputs to the causal chain without writing to storage. This makes explain() return a complete decision trace:
entry = mem.read("checkout-service", "retry-pattern")

mem.record_context(
    "pagerduty-incidents",
    "3 SEV-1 incidents in the last 24h for checkout-service",
    source="PagerDuty API",
)
mem.record_context(
    "git-log",
    "15 commits since last deploy, 3 touching retry logic",
    source="git",
)

mem.commit_outcome("DEP-500", OutcomeType.SUCCESS)

chain = mem.explain()
print(chain["causal_entries"])     # AMFS entries that were read
print(chain["external_contexts"])  # tool/API inputs that informed the decision

Persist for Other Agents (Durable)

Use MemoryType.EXPERIENCE with a TTL to store tool results so downstream agents can retrieve them:
from datetime import datetime, timedelta, timezone

mem.write(
    "checkout-service",
    "tool-result-pagerduty",
    {"incidents": 3, "sev1": True, "last_24h": True},
    memory_type=MemoryType.EXPERIENCE,
    ttl_at=datetime.now(timezone.utc) + timedelta(hours=1),
)
The next agent reads it with mem.read("checkout-service", "tool-result-pagerduty") instead of re-calling the API.

Timeline

View the event log for this agent — every read, write, outcome, and cross-agent interaction:
events = mem.timeline(limit=50)
for event in events:
    print(f"[{event.event_type}] {event.summary}")

# Filter by event type
writes = mem.timeline(event_type="WRITE", limit=20)

# Filter by time
from datetime import datetime, timedelta, timezone
recent = mem.timeline(since=datetime.now(timezone.utc) - timedelta(hours=24))

Watch

Get real-time notifications when entries change:
def on_change(entry):
    print(f"{entry.key} updated to v{entry.version}")

handle = mem.watch("checkout-service", on_change)

# Stop watching
handle.cancel()
Watch behavior depends on the adapter. The filesystem adapter supports real-time watches. The HTTP adapter currently logs a warning and returns a no-op handle — use the MCP server’s streaming capabilities for remote real-time updates.

Transactions

Group multiple writes into an atomic commit:
with mem.transaction("update retry configs") as tx:
    tx.write("checkout-service", "retry-pattern", {"max_retries": 5})
    tx.write("checkout-service", "timeout-config", {"timeout_ms": 3000})
    tx.write("payment-service", "retry-pattern", {"max_retries": 2})
    print(tx.pending_count)  # 3
# All writes committed atomically on exit

# Access commit info after
print(tx.commit.id)
print(len(tx.entries))

Integrity Verification

Verify content hashes and integrity chains for stored entries:
result = mem.verify("checkout-service")
# Or verify all: mem.verify()

print(result["total_checked"])
print(result["valid"])
print(result["corrupted"])     # list of corrupted entry info
print(result["chain_breaks"])  # list of chain break info

Commit Log & DAG

Inspect the commit history and navigate the commit DAG:
# Get recent commits on the current branch
commits = mem.commit_log(limit=10)
for c in commits:
    print(f"{c.id}{c.message} ({len(c.entries)} entries)")

# Get a single commit
commit = mem.get_commit("abc123...")

# Find common ancestor of two commits
ancestor = mem.common_ancestor(commit_a_id, commit_b_id)

Diff & Patch

Compute structural diffs between entry versions:
# Diff between the two most recent versions
diff = mem.diff("checkout-service", "retry-pattern")
print(diff)

# Diff against a specific old version
diff = mem.diff("checkout-service", "retry-pattern", old_version=1)

# Create a serializable patch
patch = mem.create_patch("checkout-service", "retry-pattern")
# Or from a specific source version
patch = mem.create_patch("checkout-service", "retry-pattern", source_version=2)

Agent Binding

Profile

Set a profile for this agent (description, defaults, tags):
mem.set_profile(
    description="Handles deployment decisions for checkout-service",
    default_branch="main",
    auto_context_paths=["checkout-service", "payment-service"],
    tags=["deploy", "infra"],
)

Capabilities

Declare capabilities so other agents can discover you:
mem.declare_capability(
    "deployment-analysis",
    description="Analyzes deployment readiness based on CI, PagerDuty, and recent changes",
    entity_paths=["checkout-service", "payment-service"],
)

Contracts

Set memory contracts that define expectations for entries:
mem.set_contracts([
    {
        "entity_path": "checkout-service",
        "key_pattern": "retry-*",
        "min_confidence": 0.5,
        "max_confidence": 1.0,
        "required_fields": ["max_retries"],
        "ttl_required": False,
        "description": "Retry configuration entries",
    }
])

Discovery

Find other agents by capability or entity path:
agents = mem.discover_agents(capability="deployment-analysis")
agents = mem.discover_agents(entity_path="checkout-service")

Briefing (Memory Cortex)

When the Memory Cortex is running, agents can retrieve pre-compiled knowledge digests ranked by relevance. This is how agents consume the “brain brief” — compiled summaries of entities, other agents, and external sources — without having to search through raw memory entries.

Basic Usage

mem = AgentMemory(agent_id="deploy-agent", adapter=adapter)

# Get a ranked briefing of compiled knowledge
briefs = mem.briefing(entity_path="myapp/checkout-service", limit=5)

for digest in briefs:
    print(digest.digest_type)   # "entity", "agent_brief", "source", or "connection_map"
    print(digest.scope)         # the entity path, agent ID, or source ID
    print(digest.summary)       # structured summary (varies by digest type)
    print(digest.entry_count)   # number of source entries compiled
    print(digest.compiled_at)   # when the digest was last compiled

Parameters

ParameterTypeDescription
entity_pathstr | NoneFocus on digests relevant to this entity
agent_idstr | NoneFocus on digests relevant to this agent
limitintMaximum number of digests to return (default: 10)
branchstr | NoneBranch to read digests from (defaults to active branch)

Digest Types

TypeScopeWhat It Contains
entityEntity path (e.g. myapp/checkout-service)Summary of all knowledge about an entity — key count, average confidence, top keys, narrative
agent_briefAgent ID (e.g. deploy-agent)Summary of an agent’s knowledge and activity — entries written, entities touched, outcomes
sourceSource ID (e.g. github)Summary of external data from a connector — events ingested, entities touched
connection_mapCross-entity scopeCross-entity relationships (Pro)

Resolution Order

The briefing method tries multiple backends in order:
  1. Adapter-native briefing() — e.g. HttpAdapter proxies to the server
  2. amfs_cortex.BriefingService — full Cortex scoring (if amfs-cortex is installed)
  3. Inline scoring via adapter list_digests() — fallback when Cortex is not installed but adapter has digest access
If none are available, briefing() returns an empty list — your agent code can safely call it without checking.

Knowledge Graph

The knowledge graph builds automatically as agents write, commit outcomes, and learn from each other. Pattern refs and causal chains create edges. You can traverse it directly:
edges = mem.graph_neighbors(
    "checkout-service/retry-pattern",
    direction="both",
    depth=2,
    min_confidence=0.5,
    limit=50,
)
for edge in edges:
    print(f"{edge.source_entity} --{edge.relation}--> {edge.target_entity}")
ParameterTypeDescription
entitystrStarting entity to explore
relationstr | NoneFilter by relation type (e.g. "references", "informed", "learned_from", "co_occurs_with")
directionstr"outgoing", "incoming", or "both"
depthintTraversal depth (1 = direct neighbors, >1 for multi-hop)
min_confidencefloatMinimum edge confidence (default: 0.0)
limitintMaximum results (default: 50)
Multi-hop traversal (depth > 1) requires the Postgres adapter. The Filesystem and S3 adapters return an empty list for graph methods.

If you configure an embedder, you can search by meaning:
results = mem.semantic_search(
    "how do we handle retries?",
    entity_path="checkout-service",
    limit=10,
    min_confidence=0.5,
    min_similarity=0.3,
)
for entry, score in results:
    print(f"{entry.key} (similarity: {score:.3f})")
ParameterTypeDescription
textstrThe query text to search by meaning
entity_pathstr | NoneFilter to a specific entity
min_confidencefloatMinimum entry confidence (default: 0.0)
limitintMaximum results (default: 10)
min_similarityfloatMinimum cosine similarity threshold (default: 0.0)

Context Manager

Use AgentMemory as a context manager for automatic cleanup:
with AgentMemory(agent_id="my-agent") as mem:
    mem.write("svc", "key", "value")
    entry = mem.read("svc", "key")
# Watchers, TTL sweepers, and background threads are cleaned up

Connecting to AMFS

Set two env vars and the SDK connects automatically — no adapter setup needed:
export AMFS_HTTP_URL="https://amfs.sense-lab.ai"
export AMFS_API_KEY="<your-api-key>"
from amfs import AgentMemory

mem = AgentMemory(agent_id="my-agent")
mem.write("checkout-service", "retry-pattern", {"max_retries": 3})
See the Connect to AMFS guide for details.

Rooms & Collaboration

The SDK includes room-based collaboration APIs for multi-agent knowledge sharing. These require an HTTP adapter connection.
# List rooms available to this agent
rooms = mem.my_rooms()

# Join a room
mem.room_join("room-123")

# Get room details
info = mem.room_info("room-123")

# Get recent activity
updates = mem.room_updates("room-123", since="2026-05-01T00:00:00Z")

# Post a discussion message
mem.room_discuss(
    "room-123",
    "I've updated the retry config based on recent incidents",
    message_type="update",
    addressed_to="deploy-agent",
)

# Read discussions
messages = mem.room_discussions("room-123", limit=20)

# Leave a room (snapshots are preserved)
mem.room_leave("room-123")

Negotiation

Rooms support structured negotiation sessions between agents:
# Create a negotiation session
session = mem.negotiate_create(
    "room-123",
    "Retry Config Agreement",
    description="Agree on shared retry parameters",
    max_rounds=5,
)

# Make a proposal
mem.negotiate_propose(
    "room-123", session["id"],
    action="propose",
    proposal={"max_retries": 3, "backoff_ms": 1000},
    rationale="Based on P99 latency data from last week",
)

# Respond to a proposal
mem.negotiate_respond(
    "room-123", session["id"],
    action="accept",
    rationale="Aligns with our SLA requirements",
)

# Check negotiation status
status = mem.negotiate_status("room-123", session["id"])
Room and negotiation methods require an HTTP adapter that exposes the rooms API. Set AMFS_HTTP_URL to enable these features.

Conflict Handling

Handle concurrent writes to the same key:
from amfs import ConflictPolicy

# Raise an error on conflict
mem = AgentMemory(
    agent_id="my-agent",
    conflict_policy=ConflictPolicy.RAISE,
)

# Custom conflict resolution
def merge(existing, incoming, value):
    return {**existing.value, **value}

mem = AgentMemory(
    agent_id="my-agent",
    on_conflict=merge,
)

Snapshots

Export and import the full state of your memory:
from amfs_core.snapshot import SnapshotExporter, SnapshotImporter

# Export
exporter = SnapshotExporter(mem.adapter)
exporter.export("backup.json")

# Import into a different adapter
from amfs_filesystem import FilesystemAdapter
target = FilesystemAdapter(root=Path("/new/.amfs"), namespace="restored")
importer = SnapshotImporter(target)
importer.restore("backup.json")

Exports

from amfs import (
    # Classes
    AgentMemory,
    MemoryScope,

    # Models
    MemoryEntry,
    MemoryStats,
    MemoryType,
    OutcomeType,
    OutcomeRecord,
    Provenance,
    ProvenanceTier,
    DecisionTrace,
    Event,
    RecallConfig,
    ScoredEntry,
    SearchQuery,
    SemanticQuery,
    SessionMetadata,
    ConflictPolicy,
    QualityIssue,
    QualityReport,

    # Abstract base
    EmbedderABC,
)

Properties

PropertyTypeDescription
agent_idstrThis agent’s identifier
session_idstrCurrent session identifier
namespacestrActive namespace
adapterAdapterABCThe underlying storage adapter
session_metadataSessionMetadata | NoneGet/set session metadata (model, client, platform)
read_loglist[str]Entry keys read during this session