Type-safe updates
AgentMesh uses a fluent command-based API for type-safe state updates. Nodes return commands that combine state updates with routing in a single expression.
Basic pattern
All nodes use the NodeFunc signature with typed state keys for compile-time type safety:
import "github.com/hupe1980/agentmesh/pkg/graph"
// Define typed keys
var (
CounterKey = graph.NewKey[int]("counter", 0)
StatusKey = graph.NewKey[string]("status", "")
)
// Create graph with keys
g := graph.New[string, string](CounterKey, StatusKey)
// Node function using commands
g.Node("process", func(ctx context.Context, view graph.View) (*graph.Command, error) {
// Read current state
counter := graph.Get(view, CounterKey)
// Return update + routing in one expression
return graph.Set(CounterKey, counter+1).
Set(StatusKey, "processing").
To("next"), nil
}, "next")
g.Start("process")
compiled, _ := g.Build()
Command patterns
The command API provides fluent, type-safe state updates:
// Set single value and route
return graph.Set(CounterKey, 42).To("next"), nil
// Set multiple values
return graph.Set(CounterKey, 42).
Set(StatusKey, "ready").
To("next"), nil
// Append to list
return graph.Append(TagsKey, "new-tag").To("next"), nil
// Just route (no state changes)
return graph.To("next"), nil
// Route to END
return graph.To(graph.END), nil
// Signal failure
return graph.Fail(err)
Node patterns
Pattern 1: Single target with updates
g.Node("process", func(ctx context.Context, view graph.View) (*graph.Command, error) {
return graph.Set(ResultKey, "processed").To("next"), nil
}, "next")
Pattern 2: Multiple targets (parallel execution)
g.Node("split", func(ctx context.Context, view graph.View) (*graph.Command, error) {
return graph.Set(StatusKey, "splitting").
To("worker1", "worker2", "worker3"), nil
}, "worker1", "worker2", "worker3")
Pattern 3: Conditional routing
g.Node("decide", func(ctx context.Context, view graph.View) (*graph.Command, error) {
score := graph.Get(view, ScoreKey)
cmd := graph.Set(ScoreKey, score+10)
if score > 50 {
return cmd.To("high_priority")
}
return cmd.To("normal_priority")
}, "high_priority", "normal_priority")
Pattern 4: End node
g.Node("final", func(ctx context.Context, view graph.View) (*graph.Command, error) {
return graph.Set(StatusKey, "complete").To(graph.END), nil
}, graph.END)
Pattern 5: Read-only node
g.Node("log", func(ctx context.Context, view graph.View) (*graph.Command, error) {
data := graph.Get(view, DataKey)
fmt.Printf("Data: %v\n", data)
return graph.To("next"), nil
}, "next")
Type safety features
Compile-time guarantees:
- Type mismatches caught during compilation
- Typed key definitions with
graph.NewKey[T]() - Type-safe reads with
graph.Get(view, TypedKey) - Zero runtime overhead for type checking
Using typed keys:
// Define typed keys upfront
var (
CounterKey = graph.NewKey[int]("counter", 0)
StatusKey = graph.NewKey[string]("status", "")
ValidKey = graph.NewKey[bool]("valid", false)
TagsKey = graph.NewListKey[string]("tags")
MessagesKey = message.MessagesKey // Built-in message list key
)
// Use in node function
g.Node("process", func(ctx context.Context, view graph.View) (*graph.Command, error) {
// ✅ Type-safe reads
counter := graph.Get(view, CounterKey) // int
status := graph.Get(view, StatusKey) // string
valid := graph.Get(view, ValidKey) // bool
tags := graph.GetList(view, TagsKey) // []string
// ✅ Type-safe updates
return graph.Set(CounterKey, counter+1).
Set(StatusKey, "active").
Set(ValidKey, true).
Append(TagsKey, "new").
To("next"), nil
}, "next")
See examples/typed_updates for a complete working example.
Namespaces
Namespaces provide state isolation for multi-agent systems, subgraphs, and tools. They allow different components to use the same key names without conflicts.
Philosophy: Global First
AgentMesh follows a global-first approach:
- Default: Use simple global keys (no namespace prefix)
- Opt-in: Add namespaces only when you need isolation
- Zero overhead: Namespaces are just string prefixes (e.g.,
"agent1.status")
When to use namespaces
Use namespaces when:
- Running multiple instances of the same agent/component
- Building multi-agent systems with separate state
- Isolating subgraph state from parent graph
- Preventing key collisions between tools
Don’t use namespaces when:
- You have a single agent
- Keys are naturally unique
- Simplicity is more important than organization
Basic usage
import "github.com/hupe1980/agentmesh/pkg/graph"
// 1. Global keys (default) - simple, no prefix
var GlobalConfig = graph.NewKey[string]("config", "")
var GlobalCounter = graph.NewKey[int]("counter", 0)
// 2. Namespaced keys - use dot notation for logical grouping
var Agent1Status = graph.NewKey[string]("agent1.status", "idle")
var Agent2Status = graph.NewKey[string]("agent2.status", "idle")
// Create graph with all keys
g := graph.New[string, string](
GlobalConfig, GlobalCounter,
Agent1Status, Agent2Status,
)
// Each agent updates its own namespaced key
g.Node("agent1", func(ctx context.Context, view graph.View) (*graph.Command, error) {
return graph.Set(Agent1Status, "processing").To("next"), nil
}, "next")
g.Node("agent2", func(ctx context.Context, view graph.View) (*graph.Command, error) {
return graph.Set(Agent2Status, "waiting").To("next"), nil
}, "next")
Creating namespaces
// Create namespace for logical grouping
ns := graph.NewNamespace("agent1")
// Use namespace to prefix keys
prefixedKey := ns.Prefix("status") // Returns "agent1.status"
// Create a namespaced key directly
var AgentStatus = graph.NewKey[string](ns.Prefix("status"), "idle")
}
Validation rules:
- Must start with letter or underscore
- Can contain letters, numbers, underscores
- Cannot contain dots (reserved for key separation)
- Empty string = global namespace
Creating namespaced keys
// Regular keys
modelNS := state.MustNamespace("model")
counterKey := state.TypedKey[int](modelNS, "counter", 0) // "model.counter"
statusKey := state.TypedKey[string](modelNS, "status", "idle") // "model.status"
// List keys
toolNS := state.MustNamespace("tool")
resultsKey := state.TypedListKey[string](toolNS, "results", 100, nil) // "tool.results"
// Global keys (no prefix)
configKey := state.TypedKey[string](state.Global, "config", "") // "config"
Namespace operations
Get namespace view - Filter state by namespace:
g.Node("reader", func(ctx context.Context, view graph.View) (*graph.Command, error) {
// Get all keys in a namespace
agent1NS := state.MustNamespace("agent1")
agent1View := state.GetNamespaceView(view, agent1NS)
// Returns: map[string]any{"status": "processing", "progress": 50}
// Note: Keys are returned WITHOUT namespace prefix
// Get global keys
globalView := state.GetNamespaceView(view, state.Global)
// Returns: map[string]any{"config": "production", "counter": 100}
return graph.To("next"), nil
}, "next")
List namespaces - Discover active namespaces:
g.Node("discover", func(ctx context.Context, view graph.View) (*graph.Command, error) {
namespaces := state.ListNamespaces(view)
for _, ns := range namespaces {
if ns.IsGlobal() {
fmt.Println("(global)")
} else {
fmt.Printf("%s\n", ns.Name())
}
}
// Output:
// agent1
// agent2
// tool
return graph.To("next"), nil
}, "next")
Key introspection
// Check if key is namespaced
isNS := state.IsNamespaced("agent1.status") // true
isNS = state.IsNamespaced("config") // false
// Parse namespaced key
ns, local := state.ParseNamespacedKey("agent1.status")
// ns = "agent1", local = "status"
ns, local = state.ParseNamespacedKey("config")
// ns = "", local = "config" (global)
// Extract namespace object
ns := state.ExtractNamespace("agent1.status")
// Returns: Namespace{name: "agent1"}
Multi-agent example
// Define namespaces for each agent
researcherNS := state.MustNamespace("researcher")
writerNS := state.MustNamespace("writer")
editorNS := state.MustNamespace("editor")
// Each agent has its own "status" key
researcherStatus := state.TypedKey[string](researcherNS, "status", "")
writerStatus := state.TypedKey[string](writerNS, "status", "")
editorStatus := state.TypedKey[string](editorNS, "status", "")
// Create graph with all keys
g := graph.New[string, string](
researcherStatus,
writerStatus,
editorStatus,
)
// Each agent updates its own state independently
g.Node("researcher", func(ctx context.Context, view graph.View) (*graph.Command, error) {
return graph.Set(researcherStatus, "researching").To("writer"), nil
}, "writer")
g.Node("writer", func(ctx context.Context, view graph.View) (*graph.Command, error) {
return graph.Set(writerStatus, "writing").To("editor"), nil
}, "editor")
g.Node("editor", func(ctx context.Context, view graph.View) (*graph.Command, error) {
return graph.Set(editorStatus, "editing").To(graph.END), nil
}, graph.END)
g.Start("researcher")
compiled, _ := g.Build()
Best practices
1. Package-level namespace constants:
// pkg/agent/researcher/keys.go
package researcher
var (
NS = state.MustNamespace("researcher")
StatusKey = state.TypedKey[string](NS, "status", "idle")
ResultsKey = state.TypedListKey[string](NS, "results", 100, nil)
)
2. Namespace naming conventions:
- Use lowercase with underscores:
"agent_name","tool_1" - Keep names short and descriptive
- Avoid abbreviations unless well-known
3. Documentation:
// Keys for the model execution subsystem
// Namespace: "model"
// Keys:
// - counter: int - Number of API calls
// - status: string - Current execution status
var (
ModelNS = state.MustNamespace("model")
CounterKey = state.TypedKey[int](ModelNS, "counter", 0)
StatusKey = state.TypedKey[string](ModelNS, "status", "idle")
)
4. Avoid deeply nested namespaces:
// ❌ Too complex
ns := state.MustNamespace("agent.researcher.team1")
// ✅ Keep it simple
researcherNS := state.MustNamespace("researcher_team1")
Limitations
- No key deletion:
DeleteNamespace()is not implemented (channels cannot be deleted) - Copy requires registration: Target keys must be registered before
CopyNamespace() - No nested namespaces: Only one level of hierarchy (single dot)
See examples/namespaces for a complete working example.
Node-level namespace scoping
For guaranteed state isolation, nodes can be scoped to operate within a specific namespace. This is ideal for multi-agent systems and pipeline stages where you want to enforce strict boundaries.
Creating namespaced nodes
Use g.NamespacedNode() for namespace-scoped nodes:
import "github.com/hupe1980/agentmesh/pkg/graph"
// Define namespaces
validationNS := state.MustNamespace("validation")
enrichmentNS := state.MustNamespace("enrichment")
// Define keys per namespace
validKey := state.TypedKey[bool](validationNS, "is_valid", false)
enrichedKey := state.TypedKey[map[string]any](enrichmentNS, "data", nil)
// Create graph with all keys
g := graph.New[string, string](validKey, enrichedKey)
// Create namespaced nodes using fluent API
g.NamespacedNode("validation", validationNS,
func(ctx context.Context, view graph.View) (*graph.Command, error) {
// This node only works with "validation.*" keys
return graph.Set(validKey, true).To("enrichment"), nil
},
"enrichment",
)
g.NamespacedNode("enrichment", enrichmentNS,
func(ctx context.Context, view graph.View) (*graph.Command, error) {
// This node only works with "enrichment.*" keys
enrichedData := map[string]any{"status": "enriched"}
return graph.Set(enrichedKey, enrichedData).To(graph.END), nil
},
graph.END,
)
g.Start("validation")
compiled, _ := g.Build()
With retry policies
Namespace-scoped nodes also support retry policies:
retryPolicy := graph.RetryPolicy{
MaxAttempts: 3,
InitialBackoff: 100 * time.Millisecond,
MaxBackoff: time.Second,
BackoffFactor: 2.0,
}
g.NamespacedNodeWithRetry("processor", processorNS,
func(ctx context.Context, view graph.View) (*graph.Command, error) {
// Processing logic
return graph.Set(resultKey, "processed").To(graph.END), nil
},
retryPolicy,
graph.END,
)
When to use namespaced nodes
Use NamespacedNode when:
- Building multi-agent systems with strict state isolation
- Creating reusable pipeline stages with clear boundaries
- You want compile-time safety that nodes can’t access each other’s state
- Documentation should clearly show which namespace each node uses
Use regular nodes when:
- Single agent with naturally unique keys
- Nodes need to share state freely
- Simplicity is more important than isolation
How enforcement works
State isolation is enforced through runtime view filtering and update validation:
- When a
NamespacedNodeexecutes, it receives a filtered view - The filtered view only exposes keys from the node’s namespace
- Calling
view.Keys()returns only keys from that namespace - Returned updates are validated - attempting to update keys outside the namespace causes an error
// Keys are created with namespace prefixes
agent1Status := state.TypedKey[string](agent1NS, "status", "") // "agent1.status"
agent2Status := state.TypedKey[string](agent2NS, "status", "") // "agent2.status"
// When agent1 node executes:
// - view.Keys() returns ["status"] (only agent1's keys, without prefix)
// - Cannot access agent2's state at all
Update validation
NamespacedNode validates all returned updates:
g.NamespacedNode("validator", agent1NS,
func(ctx context.Context, view graph.View) (*graph.Command, error) {
// ❌ This will cause a validation error:
return graph.Set(agent1StatusKey, "ok"). // ✅ Allowed (own namespace)
Set(agent2StatusKey, "failed"). // ❌ ERROR: wrong namespace
To(graph.END), nil
},
graph.END,
)
// Execution will fail with:
// "node 'validator' in namespace 'agent1' attempted to update key
// 'agent2.status' which belongs to a different namespace"
Best practices
1. One namespace per agent/stage:
// ✅ Clear separation
researcherNS := state.MustNamespace("researcher")
writerNS := state.MustNamespace("writer")
g.NamespacedNode("researcher", researcherNS, researcherFunc, "writer")
g.NamespacedNode("writer", writerNS, writerFunc, graph.END)
2. Use package-level namespace and keys:
// pkg/pipeline/validation/node.go
package validation
var (
NS = state.MustNamespace("validation")
IsValidKey = state.TypedKey[bool](NS, "is_valid", false)
ScoreKey = state.TypedKey[int](NS, "score", 0)
)
3. Document namespace usage:
// ValidationNode checks input data quality
// Namespace: "validation"
// Keys: is_valid (bool), score (int)
g.NamespacedNode("validation", validation.NS, validateFunc, targets...)
See examples/subgraph for a complete working example with namespaced pipeline stages.
Checkpointing
Checkpointing enables automatic state persistence during graph execution. Every superstep can be saved, allowing you to:
- 🔄 Resume interrupted workflows from the last checkpoint
- 🐛 Debug production issues by replaying exact execution states
- ⏪ Time-travel to any previous superstep
- 📊 Audit agent decisions with complete execution history
Basic usage
import (
"github.com/hupe1980/agentmesh/pkg/graph"
"github.com/hupe1980/agentmesh/pkg/checkpoint"
)
// Define keys
var StatusKey = graph.NewKey[string]("status", "")
// Create graph
g := graph.New[string, string](StatusKey)
g.Node("process", func(ctx context.Context, view graph.View) (*graph.Command, error) {
return graph.Set(StatusKey, "done").To(graph.END), nil
}, graph.END)
g.Start("process")
// Build with checkpointer
checkpointer := checkpoint.NewInMemory()
compiled, _ := g.Build(graph.WithCheckpointer(checkpointer))
// Execute with run ID for persistence
seq := compiled.Run(ctx, "input",
graph.WithRunID("workflow-123"),
graph.WithCheckpointInterval(1),
graph.WithAutoRestore(true),
)
for result := range seq {
// Process results
}
// Resume from checkpoint after failure
seq = compiled.Run(ctx, "input",
graph.WithRunID("workflow-123"),
graph.WithAutoRestore(true),
)
> **Performance note:** Restores now reuse the checkpoint map directly and wrap it in a copy-on-write layer. Large checkpoints (10k+ keys) no longer trigger duplicate map allocations during resume—only mutated keys incur copies. See `BenchmarkRestoreCheckpoint10KKeys` in `pkg/graph` for reference numbers.
Checkpoint contents
Each checkpoint captures:
type Checkpoint struct {
RunID string // Unique execution ID
Superstep int64 // Iteration number
State map[string]any // Graph state snapshot
CompletedNodes []string // Nodes that completed execution
PausedNodes []string // Nodes paused for human-in-the-loop
ApprovalMetadata *ApprovalMetadata // Pending approvals and history
Metadata map[string]any // Custom metadata
}
Checkpoint intervals
Control how often checkpoints are saved:
// Save every superstep (most granular)
graph.WithCheckpointInterval(1)
// Save every 5 supersteps (balance performance/recoverability)
graph.WithCheckpointInterval(5)
// Save only at specific points (use checkpoint.Save() manually)
graph.WithCheckpointInterval(0)
Storage backends
AgentMesh supports multiple checkpoint storage backends.
Memory (development/testing)
In-memory storage - fast but not persistent across restarts:
checkpointer := checkpoint.NewInMemory()
Use when:
- Local development and testing
- Short-lived workflows
- No persistence required
SQL (production-ready)
SQL-based storage for production use:
import (
"database/sql"
"github.com/hupe1980/agentmesh/pkg/checkpoint"
_ "github.com/lib/pq" // PostgreSQL driver
)
db, err := sql.Open("postgres", connectionString)
checkpointer, err := checkpoint.NewSQL(db, checkpoint.SQLOptions{
TableName: "agentmesh_checkpoints",
})
Supported databases:
- PostgreSQL
- MySQL
- SQLite
Use when:
- Production workflows
- Long-running processes
- Shared state across instances
DynamoDB (AWS)
AWS DynamoDB for serverless architectures:
import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/hupe1980/agentmesh/pkg/checkpoint"
)
sess := session.Must(session.NewSession())
checkpointer, err := checkpoint.NewDynamoDB(sess, checkpoint.DynamoDBOptions{
TableName: "agentmesh-checkpoints",
})
Use when:
- AWS-based infrastructure
- Serverless deployments
- Global distribution needed
Custom storage
Implement the Checkpointer interface for custom backends:
type Checkpointer interface {
Save(ctx context.Context, cp *Checkpoint) error
Load(ctx context.Context, runID string) (*Checkpoint, error)
List(ctx context.Context, runID string) ([]*Checkpoint, error)
Delete(ctx context.Context, runID string) error
}
Time travel debugging
Debug workflows by replaying from any superstep.
List checkpoints
checkpoints, err := checkpointer.List(ctx, "workflow-123")
for _, cp := range checkpoints {
fmt.Printf("Superstep %d at %v\n", cp.Superstep, cp.Timestamp)
fmt.Printf(" Completed nodes: %v\n", cp.CompletedNodes)
}
Resume from specific superstep
// Resume from superstep 5
for result, err := range compiled.Run(ctx, newInput,
graph.WithRunID("workflow-123"),
graph.WithResumeFromSuperstep(5),
) {
if err != nil {
log.Fatal(err)
}
fmt.Println(graph.Get(result, StatusKey))
}
Time Travel Debugging
// Resume with modified state
for result, err := range compiled.Run(ctx, input,
graph.WithRunID("workflow-123"),
graph.WithResumeFromSuperstep(3),
) {
if err != nil {
log.Fatal(err)
}
// Compare with original execution
}
Debugging workflow
- Identify problematic superstep from logs or errors
- List checkpoints to find the superstep before the issue
- Resume execution from that checkpoint with modifications
- Compare results to understand what changed
Example:
// Original execution failed at superstep 10
// Resume from superstep 8 with debug logging enabled
ctx = context.WithValue(ctx, "debug", true)
for result, err := range compiled.Run(ctx, input,
graph.WithRunID(runID),
graph.WithResumeFromSuperstep(8),
) {
if err != nil {
log.Fatal(err)
}
// Debug output
fmt.Printf("Superstep completed: %v\n", result)
}
See examples/time_travel for a complete demonstration.
Message retention
Control conversation history to prevent context overflow and manage costs.
Set message limits
// Create message key with limit (max 50 messages)
var LimitedMessagesKey = graph.NewListKey[message.Message]("messages")
// When using MessageGraph, limit is configured at build time
g := message.NewGraphBuilder()
// Add message retention configuration
compiled, _ := g.Build(graph.WithMessageRetention(50))
Pruning strategies
When limit is reached, oldest messages are removed:
// Current messages: [msg1, msg2, msg3, ..., msg100]
// After adding msg101: [msg2, msg3, ..., msg100, msg101]
Unlimited messages
For workflows that need full history, use 0 as the limit:
// Unlimited message history (default)
compiled, _ := g.Build(graph.WithMessageRetention(0))
When to use unlimited:
- Short conversations (< 50 messages)
- Analysis that needs full context
- When using external message storage
When to limit:
- Long-running conversations
- Cost-sensitive applications (token usage)
- Fixed context window models
See examples/message_retention for pruning strategies.
Human-in-the-loop
Pause execution for human approval or input.
Interrupt execution
g.Node("request_approval", func(ctx context.Context, view graph.View) (*graph.Command, error) {
return graph.Set(StatusKey, "awaiting_approval").
Set(DataKey, sensitiveData).
Interrupt(), nil // Pause here
}, "next")
Resume with input
// Initial execution pauses at approval node
seq := compiled.Run(ctx, input,
graph.WithRunID("approval-flow"),
)
// Process events until interrupt
for result := range seq {
if result.Interrupted {
break
}
}
// Human reviews and provides input
// ...
// Resume execution with updated state
seq = compiled.Run(ctx, input,
graph.WithRunID("approval-flow"),
graph.WithAutoRestore(true),
graph.WithStateUpdates(map[string]any{
"approved": true,
"reviewer": "alice@example.com",
}),
)
Use cases
- Approval workflows - Manager approval before taking action
- Data validation - Human verification of extracted data
- Content review - Review AI-generated content before publishing
- Interactive debugging - Pause and inspect state during development
See examples/human_pause for a complete workflow.
Approval Workflows
Advanced human-in-the-loop pattern with conditional guards, structured responses, state edits, and audit trails. Ideal for production workflows requiring human oversight.
Key Features
- 🛡️ Conditional Guards - Approval only when needed (e.g., sensitive keywords detected)
- ✍️ State Edits - Modify state during approval (e.g., redact sensitive data)
- ❌ Rejection Handling - Gracefully handle rejected operations
- 📊 Audit Trail - Complete approval history with timestamps and users
- ⏱️ Timeouts - Configurable approval timeouts
- 📝 Feedback Annotations - Optionally add approval decision to message history
Basic Approval Workflow
import (
"github.com/hupe1980/agentmesh/pkg/graph"
"github.com/hupe1980/agentmesh/pkg/checkpoint"
)
// Define keys
var ContentKey = graph.NewKey[string]("content", "")
var SentKey = graph.NewKey[bool]("sent", false)
// Create graph
g := graph.New[string, bool](ContentKey, SentKey)
// Define approval guard function
approvalGuard := func(ctx context.Context, view graph.View) (bool, string, error) {
content := graph.Get(view, ContentKey)
if containsSensitiveData(content) {
return true, "Contains sensitive information", nil
}
return false, "", nil // No approval needed
}
// Add node with approval guard
g.Node("send_email", func(ctx context.Context, view graph.View) (*graph.Command, error) {
content := graph.Get(view, ContentKey)
sendEmail(content)
return graph.Set(SentKey, true).To(graph.END), nil
}, graph.END)
// Configure interrupt before node with guard
g.InterruptBefore("send_email",
graph.WithApprovalGuard(approvalGuard),
graph.WithFeedbackAnnotation(true),
graph.WithApprovalTimeout(10 * time.Minute),
)
g.Start("send_email")
// Build with checkpointer
checkpointer := checkpoint.NewInMemory()
compiled, _ := g.Build(graph.WithCheckpointer(checkpointer))
// Step 1: Run until approval guard triggers
runID := "email-workflow-001"
for result := range compiled.Run(ctx, "Hello world",
graph.WithRunID(runID),
graph.WithCheckpointInterval(1),
) {
// Execution pauses when guard returns true
}
// Step 2: Load checkpoint and review pending approval
cp, _ := checkpointer.Load(ctx, runID)
if cp.ApprovalMetadata != nil {
for nodeName, pending := range cp.ApprovalMetadata.PendingApprovals {
fmt.Printf("Approval needed for: %s\n", nodeName)
fmt.Printf("Reason: %s\n", pending.Reason)
}
}
// Step 3: Provide approval response
approval := &graph.ApprovalResponse{
Decision: graph.ApprovalApproved,
Reason: "Reviewed and approved",
User: "alice@example.com",
Timestamp: time.Now(),
Edits: map[string]any{
ContentKey.Name(): "Redacted sensitive content",
},
}
// Step 4: Resume with approval
for result := range compiled.Run(ctx, "",
graph.WithCheckpoint(cp),
graph.WithApproval("send_email", approval),
) {
// Execution continues with approval applied
}
// Step 5: Query approval history
history, _ := checkpointer.GetApprovalHistory(ctx, runID)
for _, record := range history {
fmt.Printf("%s: %s by %s\n",
record.NodeName, record.Decision, record.User)
}
Approval Decisions
Four types of approval decisions:
// Approve and continue
approval := &graph.ApprovalResponse{
Decision: graph.ApprovalApproved,
Reason: "Looks good",
User: "alice@example.com",
}
// Reject and stop
rejection := &graph.ApprovalResponse{
Decision: graph.ApprovalRejected,
Reason: "Policy violation",
User: "security@example.com",
}
// Approve with state edits
editApproval := &graph.ApprovalResponse{
Decision: graph.ApprovalEdit,
Reason: "Approved with modifications",
User: "editor@example.com",
Edits: map[string]any{
ContentKey.Name(): "Modified content",
},
}
// Skip approval (auto-approve)
skip := &graph.ApprovalResponse{
Decision: graph.ApprovalSkip,
Reason: "Automated approval",
}
Conditional Guards
Guards control when approval is needed:
// Example: Sensitive keyword detection
sensitiveGuard := func(ctx context.Context, view graph.View) (bool, string, error) {
content := graph.Get(view, ContentKey)
keywords := []string{"confidential", "secret", "classified"}
for _, kw := range keywords {
if strings.Contains(strings.ToLower(content), kw) {
return true, fmt.Sprintf("Contains sensitive keyword: %s", kw), nil
}
}
return false, "", nil // Auto-continue
}
// Example: Amount threshold
amountGuard := func(ctx context.Context, view graph.View) (bool, string, error) {
amount := graph.Get(view, AmountKey)
if amount > 10000 {
return true, fmt.Sprintf("Amount exceeds $10k: $%.2f", amount), nil
}
return false, "", nil
}
// Example: Always require approval
alwaysGuard := func(ctx context.Context, view graph.View) (bool, string, error) {
return true, "Manual approval required", nil
}
State Edits During Approval
Modify state as part of the approval process:
approval := &graph.ApprovalResponse{
Decision: graph.ApprovalApproved,
User: "reviewer@example.com",
Edits: map[string]any{
// Redact sensitive data
ContentKey.Name(): redactSensitiveInfo(originalContent),
// Add approval metadata
"approved_by": "reviewer@example.com",
"approved_at": time.Now(),
// Modify execution parameters
"priority": "high",
},
}
State edits are applied BEFORE the node executes, allowing the node to see the modified state.
Approval Configuration Options
g.InterruptBefore("critical_action",
// Required: Guard function
graph.WithApprovalGuard(guard),
// Optional: Add approval decision to message history
graph.WithFeedbackAnnotation(true),
// Optional: Timeout after which approval auto-rejects
graph.WithApprovalTimeout(30 * time.Minute),
// Optional: Snapshot specific state keys for approval review
graph.WithStateSnapshot("content", "metadata", "config"),
)
Multiple Approvals
Handle multiple approval points in a single workflow:
// Add approvals at different stages
g.InterruptBefore("draft", graph.WithApprovalGuard(draftGuard))
g.InterruptBefore("publish", graph.WithApprovalGuard(publishGuard))
// Provide approvals for each stage
for result := range compiled.Run(ctx, input,
graph.WithCheckpoint(cp),
graph.WithApproval("draft", draftApproval),
graph.WithApproval("publish", publishApproval),
) {
// Process
}
Error Handling
// Check if approval is required but not provided
if err := graph.CheckApproval(ctx, "send_email", true); err != nil {
log.Printf("Approval required: %v", err)
}
// Create approval required error
if needsApproval {
info := &graph.ApprovalInfo{
NodeName: "send_email",
Reason: "Sensitive content detected",
RequestedAt: time.Now(),
}
return graph.NewApprovalRequiredError(info)
}
// Check error type
if graph.IsApprovalRequired(err) {
info := graph.ApprovalInfoFromError(err)
fmt.Printf("Approval needed: %s\n", info.Reason)
}
Production Best Practices
1. Use conditional guards to avoid unnecessary approvals:
guard := func(ctx context.Context, view graph.View) (bool, string, error) {
if !needsReview(view) {
return false, "", nil // Auto-continue
}
return true, "Manual review required", nil
}
2. Set appropriate timeouts:
// Short timeout for routine approvals
graph.WithApprovalTimeout(5 * time.Minute)
// Long timeout for complex reviews
graph.WithApprovalTimeout(24 * time.Hour)
// No timeout (wait indefinitely)
graph.WithApprovalTimeout(0)
3. Use annotations for rich audit data:
approval := &graph.ApprovalResponse{
Decision: graph.ApprovalApproved,
User: "alice@example.com",
Annotations: map[string]any{
"department": "security",
"risk_level": "medium",
"reviewed_by": "Alice Smith",
"policy_version": "2.1",
},
}
See examples/human_approval for complete working examples with all approval scenarios.
Managed values
Managed values are ephemeral runtime state that is NOT included in checkpoints. They’re ideal for:
- API keys and authentication tokens
- Session state (user context, preferences)
- Runtime metrics collectors
- Cached computed values
- Resource handles (connections, caches)
Why use managed values?
Regular state (via graph.Get/graph.Set) is persisted to checkpoints. This is problematic for:
- Sensitive data - API keys shouldn’t be stored in checkpoints
- Runtime-only state - Metrics, counters, and handles that don’t survive restarts
- Computed values - State that should be recomputed on access
Types of managed values
Static managed value
Thread-safe storage for runtime configuration:
// Create with initial value
var configMV = graph.NewManagedValue("config", &Config{
APIKey: os.Getenv("API_KEY"),
Timeout: 30 * time.Second,
})
// Access in node
func myNode(ctx context.Context, view graph.View) (*graph.Command, error) {
config := graph.GetManaged(ctx, view, configMV)
// Use config.APIKey, config.Timeout, etc.
return graph.Set(resultKey, result).End()
}
Provider (always fresh)
Recomputed on every access:
var counterMV = graph.NewManagedValueProvider("counter", func(ctx context.Context) (int64, error) {
return atomic.AddInt64(&count, 1), nil
})
Provider with caching
Add WithCacheTTL to cache the computed value:
// Cached: reuses value for 5 seconds, then recomputes
var cachedTimeMV = graph.NewManagedValueProvider("cached_time", func(ctx context.Context) (time.Time, error) {
return time.Now(), nil
}, graph.WithCacheTTL(5*time.Second))
// Invalidate cache when needed
cachedTimeMV.Invalidate()
Using managed values
Pass managed values when running the graph:
// Define managed values
var apiKeyMV = graph.NewManagedValue("api_key", os.Getenv("API_KEY"))
var metricsMV = graph.NewManagedValueProvider("metrics", computeMetrics)
// Pass to Run
for output, err := range compiled.Run(ctx, input,
graph.WithManagedValues(apiKeyMV, metricsMV)) {
// ...
}
Checkpoint safety
Managed values never ride along in checkpoints, but the metadata does. Each checkpoint now stores a list of managed value descriptors (name and required flag) so the executor can validate restores before user code runs.
var runtimeConfigMV = graph.NewManagedValue(
"runtime_config",
&RuntimeConfig{APIKey: os.Getenv("API_KEY"), Timeout: 15 * time.Second},
graph.WithManagedValueRequired(), // resume fails if missing
graph.WithManagedValueRehydrator(func(ctx context.Context) error {
cfg, err := runtimeConfigMV.Get(ctx)
if err != nil {
return err
}
cfg.APIKey = os.Getenv("API_KEY") // refresh secrets after restore
return nil
}),
)
compiled.Run(ctx, input,
graph.WithManagedValues(runtimeConfigMV), // must be provided on resume
)
WithManagedValueRequired: Checkpoint restore aborts early if the managed value is missing, which protects nodes from nil pointers or stale config.WithManagedValueRehydrator: Runs after checkpoint restore and after cached providers refresh, which is ideal for rotating API keys, reopening DB connections, or syncing handles with the environment.
If you rely on graph.WithCheckpoints, make sure the same managed value registry is supplied when calling Resume. Missing required values will surface as descriptive errors before any graph nodes execute.
Comparison with regular state
| Feature | Regular State | Managed Values |
|---|---|---|
| Access | graph.Get(view, key) |
graph.GetManaged(ctx, view, mv) |
| Checkpointed | ✅ Yes | ❌ No |
| Survives restart | ✅ Yes | ❌ No |
| Type-safe | ✅ Yes | ✅ Yes |
| Thread-safe | ✅ Yes | ✅ Yes |
| Sensitive data | ❌ No | ✅ Yes |
| Computed values | ❌ No | ✅ Yes |
See examples/managed_values for a complete working example.
Best practices
Checkpoint management
Do:
- Set appropriate checkpoint intervals (balance performance vs recoverability)
- Use meaningful run IDs (workflow-{id}, user-{id}-session-{id})
- Clean up old checkpoints periodically
- Test recovery paths regularly
Don’t:
- Checkpoint every superstep in high-frequency workflows (performance impact)
- Store sensitive data in checkpoints without encryption
- Keep checkpoints indefinitely (storage costs)
Message retention
Guidelines:
- Start with 100 messages and adjust based on needs
- Monitor token usage and adjust limits
- Consider summarization for long conversations
- Use unlimited only when necessary
Time travel debugging
Tips:
- Add metadata to checkpoints for easier identification
- Use structured logging to correlate logs with supersteps
- Test time travel in development before relying on it
- Document expected superstep behavior for complex workflows
Next steps
- Checkpointing Guide - Deep dive into checkpoint lifecycle
- Streaming - Real-time execution updates
- Examples - Checkpointing, time travel, and human pause examples