Documentation Index
Fetch the complete documentation index at: https://docs.sense-lab.ai/llms.txt
Use this file to discover all available pages before exploring further.
The Python SDK provides the AgentMemory class — the primary interface for reading, writing, and managing agent memory.
Installation
Set the connection env vars and the SDK auto-detects the HTTP adapter:
export AMFS_HTTP_URL="https://amfs.sense-lab.ai"
export AMFS_API_KEY="<your-api-key>"
Creating an Instance
from amfs import AgentMemory
mem = AgentMemory(agent_id="my-agent")
With Custom Configuration
from pathlib import Path
mem = AgentMemory(
agent_id="my-agent",
config_path=Path("./custom-amfs.yaml"),
ttl_sweep_interval=60.0,
decay_half_life_days=30.0,
)
from amfs_filesystem import FilesystemAdapter
adapter = FilesystemAdapter(root=Path(".amfs"), namespace="staging")
mem = AgentMemory(agent_id="my-agent", adapter=adapter)
With an Embedder
from amfs_core.embedder import EmbedderABC
class MyEmbedder(EmbedderABC):
def embed(self, text: str) -> list[float]:
# Return embedding vector
...
def embed_value(self, value) -> list[float]:
return self.embed(str(value))
mem = AgentMemory(agent_id="my-agent", embedder=MyEmbedder())
With an Importance Evaluator
from amfs_core.importance import ImportanceEvaluator
class MyEvaluator(ImportanceEvaluator):
def evaluate(self, entity_path, key, value):
score = 0.8 if "critical" in str(value).lower() else 0.3
return score, {"criticality": score}
mem = AgentMemory(agent_id="my-agent", importance_evaluator=MyEvaluator())
Every write() call will automatically score the entry and set importance_score and importance_dimensions. If the evaluator raises, the write proceeds without scoring.
Full Constructor Signature
AgentMemory(
agent_id: str,
*,
session_id: str | None = None,
config_path: Path | None = None,
adapter: AdapterABC | None = None,
ttl_sweep_interval: float | None = None,
decay_half_life_days: float | None = None,
embedder: EmbedderABC | None = None,
conflict_policy: ConflictPolicy = ConflictPolicy.LAST_WRITE_WINS,
on_conflict: Callable | None = None,
importance_evaluator: Any | None = None,
)
Core Operations
Write
entry = mem.write(
"checkout-service", # entity_path
"retry-pattern", # key
{"max_retries": 3}, # value (any JSON-serializable data)
confidence=0.85, # optional, default 1.0
pattern_refs=["retry"], # optional cross-references
memory_type=MemoryType.FACT, # optional: fact (default), belief, or experience
shared=True, # optional, default True — visible to other agents
artifact_refs=None, # optional list of ArtifactRef
branch=None, # optional, defaults to "main"
)
Write with a TTL (time-to-live):
from datetime import datetime, timedelta, timezone
mem.write(
"svc", "temp-flag", {"active": True},
ttl_at=datetime.now(timezone.utc) + timedelta(hours=24),
)
Read
entry = mem.read("checkout-service", "retry-pattern")
if entry:
print(entry.value)
print(entry.version)
print(entry.confidence)
With minimum confidence filter:
entry = mem.read("svc", "pattern", min_confidence=0.5)
Read from a specific branch:
entry = mem.read("svc", "pattern", branch="experiment-1")
List
# All entries
entries = mem.list()
# Entries for a specific entity
entries = mem.list("checkout-service")
# Include superseded versions
entries = mem.list("checkout-service", include_superseded=True)
# From a specific branch
entries = mem.list("checkout-service", branch="experiment-1")
Search
results = mem.search(entity_path="checkout-service", min_confidence=0.5)
Full search options:
results = mem.search(
query="retry strategy", # text query (full-text via adapter)
entity_path="checkout-service", # filter by entity
entity_paths=["svc-a", "svc-b"], # multi-scope search
min_confidence=0.5, # minimum confidence
max_confidence=1.0, # maximum confidence
agent_id="review-agent", # filter by author
since=datetime(2026, 3, 1), # filter by time
pattern_ref="retry-pattern", # filter by pattern reference
sort_by="confidence", # "confidence" | "recency" | "version"
limit=100, # max results
depth=3, # tier depth (1=HOT, 2=HOT+WARM, 3=all)
)
Progressive retrieval with depth — search only high-priority tiers for fast, high-signal results:
hot_only = mem.search(query="retry strategy", depth=1) # Hot tier
hot_warm = mem.search(query="retry strategy", depth=2) # Hot + Warm
all_tiers = mem.search(query="retry strategy") # All (default)
Composite recall scoring (blends semantic similarity, recency, and confidence):
from amfs import RecallConfig
scored = mem.search(
query="how do we handle retries?",
recall_config=RecallConfig(semantic_weight=0.5, recency_weight=0.3, confidence_weight=0.2),
)
for item in scored:
print(f"{item.entry.key} — score={item.score:.3f}")
Semantic scoring requires an embedder. Without one, the semantic component is 0.0.
Stats
stats = mem.stats()
print(f"Total entries: {stats.total_entries}")
print(f"Total entities: {stats.total_entities}")
print(f"Total agents: {stats.total_agents}")
print(f"Confidence avg: {stats.confidence_avg}")
print(f"Outcome-linked: {stats.outcome_linked_count}")
Outcomes
Recording Outcomes
from amfs import OutcomeType
# With explicit causal keys
updated = mem.commit_outcome(
outcome_ref="INC-1042",
outcome_type=OutcomeType.CRITICAL_FAILURE,
causal_entry_keys=["checkout-service/retry-pattern"],
)
# With auto-causal linking (uses everything read in this session)
updated = mem.commit_outcome(
outcome_ref="DEP-300",
outcome_type=OutcomeType.SUCCESS,
)
# With additional options
updated = mem.commit_outcome(
outcome_ref="DEP-500",
outcome_type=OutcomeType.SUCCESS,
causal_confidence=0.9,
decision_summary="Deployed after CI passed and PagerDuty clear",
)
Outcome Types
OutcomeType.CRITICAL_FAILURE # × 1.15
OutcomeType.FAILURE # × 1.10
OutcomeType.MINOR_FAILURE # × 1.08
OutcomeType.SUCCESS # × 0.97
Memory Types
Classify entries to control decay behavior:
from amfs import MemoryType
# Facts (default) — objective knowledge, standard decay
mem.write("svc", "config", {"pool_size": 10}, memory_type=MemoryType.FACT)
# Beliefs — subjective inferences, decay 2× faster
mem.write("svc", "hypothesis", "Likely an N+1 query issue", memory_type=MemoryType.BELIEF)
# Experiences — action logs, decay 1.5× slower
mem.write("svc", "action-log", "Added index on user_id", memory_type=MemoryType.EXPERIENCE)
Agent-Scoped Memory
The SDK distinguishes between shared memory and agent-scoped memory. Use recall, my_entries, and read_from for explicit agent-level operations.
Recall
Read only entries written by this agent — what this brain knows from direct experience:
mine = mem.recall("checkout-service", "retry-pattern")
Unlike read(), which returns the latest version by any agent, recall() falls back through history to find the most recent version authored by this agent.
My Entries
List everything this agent has written:
all_mine = mem.my_entries("checkout-service")
Read From Another Agent
Explicitly read from another agent’s memory with tracked knowledge transfer:
theirs = mem.read_from("deploy-agent", "checkout-service", "deploy-config")
This creates a graph edge (learned_from) and logs cross-agent read events on both sides for full traceability.
Cross-Agent Reads
See which other agents’ memory this agent has consumed:
reads = mem.cross_agent_reads()
# {'deploy-agent': [{'entity_path': 'checkout-service', 'key': 'retry-pattern', 'read_count': 3}]}
# Convenience: just the agent IDs
agents = mem.agents_i_read_from()
# ['deploy-agent', 'review-agent']
Scopes
A MemoryScope provides a focused view bound to a single entity path, reducing boilerplate:
from amfs import MemoryScope
checkout = mem.scope("checkout-service")
# All operations are scoped
checkout.write("retry-pattern", {"max_retries": 5}, confidence=0.9)
entry = checkout.read("retry-pattern")
all_entries = checkout.list()
history = checkout.history("retry-pattern")
scope_info = checkout.info()
# Readonly scopes prevent accidental writes
readonly = mem.scope("checkout-service", readonly=True)
readonly.write("key", "value") # raises PermissionError
Scope Utilities
# List all entity paths that contain entries
scopes = mem.list_scopes()
# Get summary info about a scope
info = mem.info("checkout-service")
# ScopeInfo(path, entry_count, avg_confidence, keys, oldest, newest)
# Render entity paths as an indented tree
print(mem.tree())
# myapp (5)
# auth (2)
# checkout-service (3)
History (Temporal Queries)
Retrieve the full version history of an entry with optional time filtering:
from datetime import datetime, timedelta, timezone
# All versions
versions = mem.history("checkout-service", "retry-pattern")
for v in versions:
print(f"v{v.version} — confidence: {v.confidence} — {v.provenance.written_at}")
# Versions from the last 7 days
since = datetime.now(timezone.utc) - timedelta(days=7)
recent = mem.history("checkout-service", "retry-pattern", since=since)
Explainability
Inspect the causal chain — which entries were read during the current session and how they connect to outcomes:
chain = mem.explain()
print(chain["session_id"])
print(chain["causal_entries"]) # full entry details for each causal key
print(chain["external_contexts"]) # tool/API inputs that informed the decision
Filter by outcome reference:
chain = mem.explain(outcome_ref="INC-1042")
Decision Traces
When you call commit_outcome(), AMFS snapshots the full decision trace — every entry that was read, every context that was recorded, and every query that was made. The resulting trace is persisted and can be retrieved later.
Getting the Trace from an Outcome
mem.record_context("ci-check", "All tests green", source="GitHub Actions")
entry = mem.read("checkout-service", "retry-pattern")
updated = mem.commit_outcome("DEP-500", OutcomeType.SUCCESS)
# The trace is available after commit
trace = mem._last_trace
print(f"Trace ID: {trace.id}")
print(f"Causal entries: {len(trace.causal_entries)}")
print(f"External contexts: {len(trace.external_contexts)}")
print(f"Session duration: {trace.session_duration_ms}ms")
Browsing Past Traces
# List recent traces (via adapter)
traces = mem.adapter.list_traces(limit=10)
for t in traces:
print(f"{t.id} — {t.agent_id} — {t.outcome_ref} ({t.outcome_type})")
# Get a specific trace
trace = mem.adapter.get_trace("ddbcefff-901a-4fa6-...")
print(trace.decision_summary)
for entry in trace.causal_entries:
print(f" Read: {entry.entity_path}/{entry.key} (v{entry.version})")
Filtering Traces
traces = mem.adapter.list_traces(
entity_path="checkout-service",
agent_id="deploy-agent",
outcome_type="success",
limit=5,
)
Tool Context
When agents call external tools or APIs, there are two ways to capture that context in AMFS depending on your needs.
Record in the Causal Chain (Lightweight)
Use record_context() to add external inputs to the causal chain without writing to storage. This makes explain() return a complete decision trace:
entry = mem.read("checkout-service", "retry-pattern")
mem.record_context(
"pagerduty-incidents",
"3 SEV-1 incidents in the last 24h for checkout-service",
source="PagerDuty API",
)
mem.record_context(
"git-log",
"15 commits since last deploy, 3 touching retry logic",
source="git",
)
mem.commit_outcome("DEP-500", OutcomeType.SUCCESS)
chain = mem.explain()
print(chain["causal_entries"]) # AMFS entries that were read
print(chain["external_contexts"]) # tool/API inputs that informed the decision
Persist for Other Agents (Durable)
Use MemoryType.EXPERIENCE with a TTL to store tool results so downstream agents can retrieve them:
from datetime import datetime, timedelta, timezone
mem.write(
"checkout-service",
"tool-result-pagerduty",
{"incidents": 3, "sev1": True, "last_24h": True},
memory_type=MemoryType.EXPERIENCE,
ttl_at=datetime.now(timezone.utc) + timedelta(hours=1),
)
The next agent reads it with mem.read("checkout-service", "tool-result-pagerduty") instead of re-calling the API.
Timeline
View the event log for this agent — every read, write, outcome, and cross-agent interaction:
events = mem.timeline(limit=50)
for event in events:
print(f"[{event.event_type}] {event.summary}")
# Filter by event type
writes = mem.timeline(event_type="WRITE", limit=20)
# Filter by time
from datetime import datetime, timedelta, timezone
recent = mem.timeline(since=datetime.now(timezone.utc) - timedelta(hours=24))
Watch
Get real-time notifications when entries change:
def on_change(entry):
print(f"{entry.key} updated to v{entry.version}")
handle = mem.watch("checkout-service", on_change)
# Stop watching
handle.cancel()
Watch behavior depends on the adapter. The filesystem adapter supports real-time watches. The HTTP adapter currently logs a warning and returns a no-op handle — use the MCP server’s streaming capabilities for remote real-time updates.
Transactions
Group multiple writes into an atomic commit:
with mem.transaction("update retry configs") as tx:
tx.write("checkout-service", "retry-pattern", {"max_retries": 5})
tx.write("checkout-service", "timeout-config", {"timeout_ms": 3000})
tx.write("payment-service", "retry-pattern", {"max_retries": 2})
print(tx.pending_count) # 3
# All writes committed atomically on exit
# Access commit info after
print(tx.commit.id)
print(len(tx.entries))
Integrity Verification
Verify content hashes and integrity chains for stored entries:
result = mem.verify("checkout-service")
# Or verify all: mem.verify()
print(result["total_checked"])
print(result["valid"])
print(result["corrupted"]) # list of corrupted entry info
print(result["chain_breaks"]) # list of chain break info
Commit Log & DAG
Inspect the commit history and navigate the commit DAG:
# Get recent commits on the current branch
commits = mem.commit_log(limit=10)
for c in commits:
print(f"{c.id} — {c.message} ({len(c.entries)} entries)")
# Get a single commit
commit = mem.get_commit("abc123...")
# Find common ancestor of two commits
ancestor = mem.common_ancestor(commit_a_id, commit_b_id)
Diff & Patch
Compute structural diffs between entry versions:
# Diff between the two most recent versions
diff = mem.diff("checkout-service", "retry-pattern")
print(diff)
# Diff against a specific old version
diff = mem.diff("checkout-service", "retry-pattern", old_version=1)
# Create a serializable patch
patch = mem.create_patch("checkout-service", "retry-pattern")
# Or from a specific source version
patch = mem.create_patch("checkout-service", "retry-pattern", source_version=2)
Agent Binding
Profile
Set a profile for this agent (description, defaults, tags):
mem.set_profile(
description="Handles deployment decisions for checkout-service",
default_branch="main",
auto_context_paths=["checkout-service", "payment-service"],
tags=["deploy", "infra"],
)
Capabilities
Declare capabilities so other agents can discover you:
mem.declare_capability(
"deployment-analysis",
description="Analyzes deployment readiness based on CI, PagerDuty, and recent changes",
entity_paths=["checkout-service", "payment-service"],
)
Contracts
Set memory contracts that define expectations for entries:
mem.set_contracts([
{
"entity_path": "checkout-service",
"key_pattern": "retry-*",
"min_confidence": 0.5,
"max_confidence": 1.0,
"required_fields": ["max_retries"],
"ttl_required": False,
"description": "Retry configuration entries",
}
])
Discovery
Find other agents by capability or entity path:
agents = mem.discover_agents(capability="deployment-analysis")
agents = mem.discover_agents(entity_path="checkout-service")
Briefing (Memory Cortex)
When the Memory Cortex is running, agents can retrieve pre-compiled knowledge digests ranked by relevance. This is how agents consume the “brain brief” — compiled summaries of entities, other agents, and external sources — without having to search through raw memory entries.
Basic Usage
mem = AgentMemory(agent_id="deploy-agent", adapter=adapter)
# Get a ranked briefing of compiled knowledge
briefs = mem.briefing(entity_path="myapp/checkout-service", limit=5)
for digest in briefs:
print(digest.digest_type) # "entity", "agent_brief", "source", or "connection_map"
print(digest.scope) # the entity path, agent ID, or source ID
print(digest.summary) # structured summary (varies by digest type)
print(digest.entry_count) # number of source entries compiled
print(digest.compiled_at) # when the digest was last compiled
Parameters
| Parameter | Type | Description |
|---|
entity_path | str | None | Focus on digests relevant to this entity |
agent_id | str | None | Focus on digests relevant to this agent |
limit | int | Maximum number of digests to return (default: 10) |
branch | str | None | Branch to read digests from (defaults to active branch) |
Digest Types
| Type | Scope | What It Contains |
|---|
entity | Entity path (e.g. myapp/checkout-service) | Summary of all knowledge about an entity — key count, average confidence, top keys, narrative |
agent_brief | Agent ID (e.g. deploy-agent) | Summary of an agent’s knowledge and activity — entries written, entities touched, outcomes |
source | Source ID (e.g. github) | Summary of external data from a connector — events ingested, entities touched |
connection_map | Cross-entity scope | Cross-entity relationships (Pro) |
Resolution Order
The briefing method tries multiple backends in order:
- Adapter-native
briefing() — e.g. HttpAdapter proxies to the server
amfs_cortex.BriefingService — full Cortex scoring (if amfs-cortex is installed)
- Inline scoring via adapter
list_digests() — fallback when Cortex is not installed but adapter has digest access
If none are available, briefing() returns an empty list — your agent code can safely call it without checking.
Knowledge Graph
The knowledge graph builds automatically as agents write, commit outcomes, and learn from each other. Pattern refs and causal chains create edges. You can traverse it directly:
edges = mem.graph_neighbors(
"checkout-service/retry-pattern",
direction="both",
depth=2,
min_confidence=0.5,
limit=50,
)
for edge in edges:
print(f"{edge.source_entity} --{edge.relation}--> {edge.target_entity}")
| Parameter | Type | Description |
|---|
entity | str | Starting entity to explore |
relation | str | None | Filter by relation type (e.g. "references", "informed", "learned_from", "co_occurs_with") |
direction | str | "outgoing", "incoming", or "both" |
depth | int | Traversal depth (1 = direct neighbors, >1 for multi-hop) |
min_confidence | float | Minimum edge confidence (default: 0.0) |
limit | int | Maximum results (default: 50) |
Multi-hop traversal (depth > 1) requires the Postgres adapter. The Filesystem and S3 adapters return an empty list for graph methods.
Semantic Search
If you configure an embedder, you can search by meaning:
results = mem.semantic_search(
"how do we handle retries?",
entity_path="checkout-service",
limit=10,
min_confidence=0.5,
min_similarity=0.3,
)
for entry, score in results:
print(f"{entry.key} (similarity: {score:.3f})")
| Parameter | Type | Description |
|---|
text | str | The query text to search by meaning |
entity_path | str | None | Filter to a specific entity |
min_confidence | float | Minimum entry confidence (default: 0.0) |
limit | int | Maximum results (default: 10) |
min_similarity | float | Minimum cosine similarity threshold (default: 0.0) |
Context Manager
Use AgentMemory as a context manager for automatic cleanup:
with AgentMemory(agent_id="my-agent") as mem:
mem.write("svc", "key", "value")
entry = mem.read("svc", "key")
# Watchers, TTL sweepers, and background threads are cleaned up
Connecting to AMFS
Set two env vars and the SDK connects automatically — no adapter setup needed:
export AMFS_HTTP_URL="https://amfs.sense-lab.ai"
export AMFS_API_KEY="<your-api-key>"
from amfs import AgentMemory
mem = AgentMemory(agent_id="my-agent")
mem.write("checkout-service", "retry-pattern", {"max_retries": 3})
See the Connect to AMFS guide for details.
Rooms & Collaboration
The SDK includes room-based collaboration APIs for multi-agent knowledge sharing. These require an HTTP adapter connection.
# List rooms available to this agent
rooms = mem.my_rooms()
# Join a room
mem.room_join("room-123")
# Get room details
info = mem.room_info("room-123")
# Get recent activity
updates = mem.room_updates("room-123", since="2026-05-01T00:00:00Z")
# Post a discussion message
mem.room_discuss(
"room-123",
"I've updated the retry config based on recent incidents",
message_type="update",
addressed_to="deploy-agent",
)
# Read discussions
messages = mem.room_discussions("room-123", limit=20)
# Leave a room (snapshots are preserved)
mem.room_leave("room-123")
Negotiation
Rooms support structured negotiation sessions between agents:
# Create a negotiation session
session = mem.negotiate_create(
"room-123",
"Retry Config Agreement",
description="Agree on shared retry parameters",
max_rounds=5,
)
# Make a proposal
mem.negotiate_propose(
"room-123", session["id"],
action="propose",
proposal={"max_retries": 3, "backoff_ms": 1000},
rationale="Based on P99 latency data from last week",
)
# Respond to a proposal
mem.negotiate_respond(
"room-123", session["id"],
action="accept",
rationale="Aligns with our SLA requirements",
)
# Check negotiation status
status = mem.negotiate_status("room-123", session["id"])
Room and negotiation methods require an HTTP adapter that exposes the rooms API. Set AMFS_HTTP_URL to enable these features.
Conflict Handling
Handle concurrent writes to the same key:
from amfs import ConflictPolicy
# Raise an error on conflict
mem = AgentMemory(
agent_id="my-agent",
conflict_policy=ConflictPolicy.RAISE,
)
# Custom conflict resolution
def merge(existing, incoming, value):
return {**existing.value, **value}
mem = AgentMemory(
agent_id="my-agent",
on_conflict=merge,
)
Snapshots
Export and import the full state of your memory:
from amfs_core.snapshot import SnapshotExporter, SnapshotImporter
# Export
exporter = SnapshotExporter(mem.adapter)
exporter.export("backup.json")
# Import into a different adapter
from amfs_filesystem import FilesystemAdapter
target = FilesystemAdapter(root=Path("/new/.amfs"), namespace="restored")
importer = SnapshotImporter(target)
importer.restore("backup.json")
Exports
from amfs import (
# Classes
AgentMemory,
MemoryScope,
# Models
MemoryEntry,
MemoryStats,
MemoryType,
OutcomeType,
OutcomeRecord,
Provenance,
ProvenanceTier,
DecisionTrace,
Event,
RecallConfig,
ScoredEntry,
SearchQuery,
SemanticQuery,
SessionMetadata,
ConflictPolicy,
QualityIssue,
QualityReport,
# Abstract base
EmbedderABC,
)
Properties
| Property | Type | Description |
|---|
agent_id | str | This agent’s identifier |
session_id | str | Current session identifier |
namespace | str | Active namespace |
adapter | AdapterABC | The underlying storage adapter |
session_metadata | SessionMetadata | None | Get/set session metadata (model, client, platform) |
read_log | list[str] | Entry keys read during this session |