The data store enables multi-agent workflows where agents share data through a common storage layer.
All agents running for the same user share the same data store. Data written by one agent is immediately available to other agents.
┌─────────────┐ ┌─────────────────┐ ┌─────────────┐
│ Agent A │────▶│ Data Store │◀────│ Agent B │
│ (Producer) │ │ (User Scoped) │ │ (Consumer) │
└─────────────┘ └─────────────────┘ └─────────────┘
│ ▲ │
│ │ │
└────── writes ──────┴────── reads ─────────┘
Generates and stores data:
async def run(input_dict: dict, tools: dict) -> dict: # Analyze something report = await generate_analysis(input_dict.get("data")) # Store for other agents shared = data_store.use_namespace("shared") shared.set("latest-report", report) return {"status": "report generated", "key": "latest-report"}
Reads and uses the data:
async def run(input_dict: dict, tools: dict) -> dict: shared = data_store.use_namespace("shared") # Read data from producer report = shared.get("latest-report") if not report: return {"error": "No report found. Run the analysis agent first."} # Use the data summary = await summarize(report) return {"summary": summary}
Sequential processing where each agent builds on the previous:
┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
│ Fetcher │───▶│ Parser │───▶│ Analyzer │───▶│ Reporter │
└───────────┘ └───────────┘ └───────────┘ └───────────┘
│ │ │ │
▼ ▼ ▼ ▼
raw:data parsed:data analysis:data report:final
Agent 1: Fetcher
async def run(input_dict: dict, tools: dict) -> dict: url = input_dict.get("url") raw_data = await fetch_data(url) pipeline = data_store.use_namespace("pipeline:job-123") pipeline.set("raw", raw_data) pipeline.set("status", {"stage": "fetched", "timestamp": now()}) return {"stage": "fetched", "next": "parser"}
Agent 2: Parser
async def run(input_dict: dict, tools: dict) -> dict: pipeline = data_store.use_namespace("pipeline:job-123") raw = pipeline.get("raw") if not raw: return {"error": "No raw data. Run fetcher first."} parsed = parse_data(raw) pipeline.set("parsed", parsed) pipeline.set("status", {"stage": "parsed", "timestamp": now()}) return {"stage": "parsed", "next": "analyzer"}
Agent 3: Analyzer
async def run(input_dict: dict, tools: dict) -> dict: pipeline = data_store.use_namespace("pipeline:job-123") parsed = pipeline.get("parsed") if not parsed: return {"error": "No parsed data. Run parser first."} analysis = await analyze(parsed) pipeline.set("analysis", analysis) pipeline.set("status", {"stage": "analyzed", "timestamp": now()}) return {"stage": "analyzed", "next": "reporter"}
Agent 4: Reporter
async def run(input_dict: dict, tools: dict) -> dict: pipeline = data_store.use_namespace("pipeline:job-123") analysis = pipeline.get("analysis") if not analysis: return {"error": "No analysis. Run analyzer first."} report = generate_report(analysis) pipeline.set("final_report", report) pipeline.set("status", {"stage": "complete", "timestamp": now()}) return {"report": report}
One agent triggers multiple parallel workers:
┌──────────┐
│ Splitter │
└────┬─────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│Worker 1 │ │Worker 2 │ │Worker 3 │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└────────────┴────────────┘
│
▼
┌───────────┐
│ Collector │
└───────────┘
Splitter Agent
async def run(input_dict: dict, tools: dict) -> dict: items = input_dict.get("items", []) job_id = input_dict.get("job_id") work = data_store.use_namespace(f"fanout:{job_id}") # Distribute work for i, item in enumerate(items): work.set(f"task:{i}", { "item": item, "status": "pending" }) work.set("meta", { "total_tasks": len(items), "completed": 0 }) return {"job_id": job_id, "tasks_created": len(items)}
Worker Agent (run multiple times)
async def run(input_dict: dict, tools: dict) -> dict: job_id = input_dict.get("job_id") task_id = input_dict.get("task_id") work = data_store.use_namespace(f"fanout:{job_id}") # Get task task = work.get(f"task:{task_id}") if not task or task["status"] != "pending": return {"error": "Task not available"} # Mark in progress task["status"] = "processing" work.set(f"task:{task_id}", task) # Do work result = await process_item(task["item"]) # Store result task["status"] = "complete" task["result"] = result work.set(f"task:{task_id}", task) # Update meta meta = work.get("meta") meta["completed"] += 1 work.set("meta", meta) return {"task_id": task_id, "result": result}
Collector Agent
async def run(input_dict: dict, tools: dict) -> dict: job_id = input_dict.get("job_id") work = data_store.use_namespace(f"fanout:{job_id}") meta = work.get("meta") if meta["completed"] < meta["total_tasks"]: return { "status": "in_progress", "completed": meta["completed"], "total": meta["total_tasks"] } # Collect all results in one query all_tasks = work.get_all() results = [ all_tasks[f"task:{i}"]["result"] for i in range(meta["total_tasks"]) ] return {"status": "complete", "results": results}
Common pattern for processing codebases:
Ingestion Agent
async def run(input_dict: dict, tools: dict) -> dict: repo = input_dict.get("repo") files_ns = data_store.use_namespace(f"files:{repo}") summary_ns = data_store.use_namespace(f"summary:{repo}") files = await fetch_repo_files(repo) for path, content in files.items(): # Store raw content files_ns.set(path, { "content": content, "size": len(content), "type": detect_type(path) }) # Generate and store summary summary = await summarize_file(content) summary_ns.set(path, summary) # Store metadata meta_ns = data_store.use_namespace(f"meta:{repo}") meta_ns.set("ingestion", { "file_count": len(files), "completed_at": datetime.now().isoformat() }) return {"repo": repo, "files_processed": len(files)}
Search Agent
async def run(input_dict: dict, tools: dict) -> dict: query = input_dict.get("query") # Discover all indexed repos namespaces = data_store.list_namespaces() summary_namespaces = [ns for ns in namespaces if ns.startswith("summary:")] results = [] for ns in summary_namespaces: repo = ns.replace("summary:", "") summaries = data_store.use_namespace(ns) # Load all summaries in one query instead of list_keys + get per key all_summaries = summaries.get_all() for path, summary in all_summaries.items(): if query.lower() in str(summary).lower(): results.append({ "repo": repo, "file": path, "summary": summary }) return {"query": query, "matches": results}
Track multi-agent workflow status:
# Any agent can update status status = data_store.use_namespace("workflow:status") status.set("current_stage", "processing") status.set("last_update", { "agent": "analyzer", "timestamp": datetime.now().isoformat(), "message": "Processing file 45 of 100" }) # Monitor agent can check status status = data_store.use_namespace("workflow:status") current = status.get("current_stage") update = status.get("last_update")
Explicit handoff between agents:
# Producer signals completion handoff = data_store.use_namespace("handoff") handoff.set("data-ready", { "producer": "agent-a", "data_key": "processed-data", "namespace": "results", "ready_at": datetime.now().isoformat() }) # Consumer waits for signal handoff = data_store.use_namespace("handoff") signal = handoff.get("data-ready") if signal: results = data_store.use_namespace(signal["namespace"]) data = results.get(signal["data_key"])
Share errors across the workflow:
# Agent encounters error errors = data_store.use_namespace("workflow:errors") errors.set(f"error:{datetime.now().isoformat()}", { "agent": "parser", "error": str(e), "context": {"file": current_file} }) # Other agents can check for errors errors = data_store.use_namespace("workflow:errors") error_keys = errors.list_keys() if error_keys: return {"status": "workflow_error", "errors": error_keys}
Document and share namespace conventions:
# Shared constants NAMESPACE_FILES = lambda repo: f"files:{repo}" NAMESPACE_SUMMARY = lambda repo: f"summary:{repo}" NAMESPACE_META = lambda repo: f"meta:{repo}"
Always include context about who wrote the data:
data_store.set("result", { "data": actual_data, "_meta": { "produced_by": "analyzer-agent", "produced_at": datetime.now().isoformat(), "version": "1.0" } })
data = shared.get("expected-key") if not data: return { "error": "Required data not found", "expected_key": "expected-key", "namespace": "shared", "suggestion": "Run the producer agent first" }
# At workflow end cleanup_namespaces = [ f"pipeline:{job_id}", f"temp:{job_id}", f"handoff:{job_id}" ] for ns in cleanup_namespaces: data_store.use_namespace(ns).clear()
data_store.set("config", { "_version": 2, "setting_a": "value", "setting_b": "value" }) # Consumer checks version config = data_store.get("config") if config.get("_version", 1) < 2: # Handle old format or request re-generation pass