Go 1.24+ GitHub

Checkpointing & State Persistence

Save execution state for resumption, time-travel debugging, and audit trails.

Overview

Checkpointing in AgentMesh enables automatic state persistence during graph execution. Every superstep (iteration) can be saved, allowing you to:

  • 🔄 Resume interrupted workflows from the last checkpoint
  • 🐛 Debug production issues by replaying exact execution states
  • Time-travel to any previous superstep for analysis
  • 📊 Audit agent decisions with complete execution history
  • 🔀 Branch from checkpoints to test alternative paths

RunID is required for checkpointing. When using checkpointing, you must provide WithRunID() with a stable identifier. This ensures checkpoints can be properly saved and resumed. Auto-generated UUIDs would create new checkpoint streams instead of resuming existing ones.


Checkpoint Lifecycle

1. Automatic Checkpointing

When enabled, AgentMesh automatically saves state after each superstep:

import (
    "github.com/hupe1980/agentmesh/pkg/checkpoint"
    "github.com/hupe1980/agentmesh/pkg/graph"
    "github.com/hupe1980/agentmesh/pkg/message"
)

// Define state keys
var StatusKey = graph.NewKey[string]("status", "")

// Create graph
g := message.NewGraphBuilder(StatusKey)

g.Node("process", func(ctx context.Context, view graph.View) (*graph.Command, error) {
    return graph.Set(StatusKey, "done").To(graph.END), nil
}, graph.END)

g.Start("process")

// Create checkpointer
checkpointer := checkpoint.NewInMemory()

// Build with checkpointer
compiled, err := g.Build(graph.WithCheckpointer(checkpointer))
if err != nil {
    log.Fatal(err)
}

// Execute with automatic checkpointing
runID := "workflow-123"
messages := []message.Message{
    message.NewHumanMessageFromText("Start workflow"),
}

for result := range compiled.Run(ctx, messages,
    graph.WithRunID(runID),
    graph.WithCheckpointInterval(1),  // Save every superstep
    graph.WithAutoRestore(true),
) {
    if result.Error != nil {
        log.Fatal(result.Error)
    }
}

2. Checkpoint Contents

Each checkpoint captures complete execution state:

type Checkpoint struct {
    RunID          string                 // Unique execution identifier
    Superstep      int64                  // Iteration number (0, 1, 2, ...)
    Timestamp      time.Time              // When checkpoint was created
    State          map[string]any         // Graph state snapshot
    PendingWrites  []PendingWrite         // Uncommitted state updates
    Committed      bool                   // Whether PendingWrites already applied
    CompletedNodes []string               // Nodes that finished execution
    PausedNodes    []string               // Nodes paused for human input
    ApprovalMetadata *ApprovalMetadata    // Pending approvals and history
    Metadata       map[string]any         // Custom execution metadata
}

type PendingWrite struct {
    NodeName  string    // Vertex that produced the write
    Channel   string    // State channel being updated
    Value     any       // Buffered value
    Timestamp time.Time // When the write was produced
}

Each pending write therefore captures **who** made the change, **what** channel it targets, and **when** it was staged. Approval dashboards surface that provenance so reviewers can make per-node decisions before the update is committed.

3. Superstep Progression

Understanding how supersteps map to execution:

Superstep 0: Initial state (before any node execution)
Superstep 1: After first batch of nodes completes
Superstep 2: After second batch completes
Superstep 3: After third batch completes
...
Superstep N: Final state (all nodes complete)

Example Workflow:

var StatusKey = graph.NewKey[string]("status", "")

g := graph.New[string, string](StatusKey)

g.Node("start", startFunc, "process")
g.Node("process", processFunc, "finish")
g.Node("finish", finishFunc, graph.END)

g.Start("start")

// Execution produces 4 checkpoints:
// - Superstep 0: Initial state
// - Superstep 1: After "start" completes
// - Superstep 2: After "process" completes  
// - Superstep 3: After "finish" completes (final)

4. Resume from Checkpoint

Continue execution from the last saved state:

checkpointer := checkpoint.NewInMemory()

// Build graph with checkpointer
compiled, _ := g.Build(graph.WithCheckpointer(checkpointer))

// First execution (gets interrupted)
for result := range compiled.Run(ctx, messages,
    graph.WithRunID("workflow-123"),
    graph.WithCheckpointInterval(1),
) {
    // Handle results
}

// Later: Resume from last checkpoint
for result := range compiled.Run(ctx, messages,
    graph.WithRunID("workflow-123"),
    graph.WithAutoRestore(true),
) {
    // Continues from where it left off
}

Transactional Semantics

AgentMesh implements a two-phase commit protocol to ensure transactional consistency between checkpoints and state updates:

How It Works

Phase 1: Collect updates from nodes (buffered, not applied)
Phase 2: Save checkpoint with PendingWrites → Apply updates to state

This guarantees that:

  • Atomicity: State updates only applied after checkpoint is safely persisted
  • Consistency: Checkpoint always reflects uncommitted state
  • Crash Recovery: If crash occurs between save and apply, PendingWrites are replayed

Crash Recovery Scenarios

Scenario 1: Crash BEFORE checkpoint save

❌ Updates never saved → Resume from previous superstep → Re-execute node
✅ No data loss, no inconsistency

Scenario 2: Crash AFTER checkpoint save, BEFORE updates applied

✅ Checkpoint contains PendingWrites
✅ Resume applies PendingWrites to state
✅ Continues from next superstep

Scenario 3: Crash AFTER updates applied

✅ Normal checkpoint resume
✅ Continue from next superstep

Storage Backends

AgentMesh supports multiple checkpoint storage backends:

Memory (Development)

Best for: Testing, development, short-lived workflows

import "github.com/hupe1980/agentmesh/pkg/checkpoint"

checkpointer := checkpoint.NewInMemory()

compiled, _ := g.Build(graph.WithCheckpointer(checkpointer))

for result := range compiled.Run(ctx, input,
    graph.WithRunID("test-run"),
    graph.WithCheckpointInterval(1),
) {
    // Process results
}

Characteristics:

  • ✅ Zero setup - works out of the box
  • ✅ Fastest performance
  • ✅ Perfect for unit tests
  • ⚠️ Data lost when process exits
  • ⚠️ No persistence across restarts

DynamoDB (Cloud Production)

Best for: AWS deployments, serverless, distributed systems

import (
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/dynamodb"
    checkpointdb "github.com/hupe1980/agentmesh/pkg/checkpoint/dynamodb"
)

// Load AWS config
cfg, _ := config.LoadDefaultConfig(ctx)
client := dynamodb.NewFromConfig(cfg)

// Create DynamoDB checkpointer
checkpointer := checkpointdb.NewCheckpointer(client,
    checkpointdb.WithTableName("agentmesh-checkpoints"),
)

// Auto-create table if needed
err := checkpointer.CreateTable(ctx)

compiled, _ := g.Build(graph.WithCheckpointer(checkpointer))

for result := range compiled.Run(ctx, input,
    graph.WithRunID("workflow-123"),
    graph.WithCheckpointInterval(1),
) {
    // Process results
}

Characteristics:

  • ✅ Fully managed (no servers to maintain)
  • ✅ Automatic scaling
  • ✅ Multi-region replication available
  • ⚠️ Network latency overhead
  • ⚠️ AWS costs (per-request pricing)

SQL (On-Premise Production)

Best for: Self-hosted deployments, existing SQL infrastructure

import (
    "database/sql"
    _ "github.com/mattn/go-sqlite3"
    checkpointsql "github.com/hupe1980/agentmesh/pkg/checkpoint/sql"
)

// SQLite
db, _ := sql.Open("sqlite3", "./checkpoints.db")
checkpointer, _ := checkpointsql.NewSQLiteCheckpointer(ctx, db)

// PostgreSQL
db, _ := sql.Open("postgres", "postgres://user:pass@localhost/agentmesh")
checkpointer, _ := checkpointsql.NewPostgreSQLCheckpointer(ctx, db)

// MySQL
db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/agentmesh")
checkpointer, _ := checkpointsql.NewMySQLCheckpointer(ctx, db)

compiled, _ := g.Build(graph.WithCheckpointer(checkpointer))

Characteristics:

  • ✅ Works with existing databases
  • ✅ Full ACID compliance
  • ✅ Familiar tooling
  • ⚠️ Requires database management

Time Travel

Debug workflows by replaying from any superstep.

List Checkpoints

checkpoints, err := checkpointer.List(ctx, "workflow-123")

for _, cp := range checkpoints {
    fmt.Printf("Superstep %d at %v\n", cp.Superstep, cp.Timestamp)
    fmt.Printf("  Completed nodes: %v\n", cp.CompletedNodes)
}

Resume from Specific Superstep

// Resume from superstep 5
for result := range compiled.Run(ctx, input,
    graph.WithRunID("workflow-123"),
    graph.WithResumeFromSuperstep(5),
) {
    // Execution resumes from superstep 5
}

Debugging Workflow

  1. Identify problematic superstep from logs or errors
  2. List checkpoints to find the superstep before the issue
  3. Resume execution from that checkpoint with modifications
  4. Compare results to understand what changed

Example:

// Original execution failed at superstep 10
// Resume from superstep 8 with debug logging enabled
ctx = context.WithValue(ctx, "debug", true)

for result := range compiled.Run(ctx, input,
    graph.WithRunID(runID),
    graph.WithResumeFromSuperstep(8),
) {
    // Debug and analyze
}

Security

Checkpoint Encryption

Encrypt sensitive checkpoint data:

import "github.com/hupe1980/agentmesh/pkg/checkpoint"

// Generate or load 32-byte encryption key
key := []byte("your-32-byte-encryption-key-here")

// Create encrypted checkpointer
baseCheckpointer := checkpoint.NewInMemory()
encryptedCheckpointer := checkpoint.NewEncrypted(baseCheckpointer, key)

compiled, _ := g.Build(graph.WithCheckpointer(encryptedCheckpointer))

Characteristics:

  • Uses AES-256-GCM encryption
  • Key must be exactly 32 bytes
  • Works with any storage backend

Checkpoint Signing

Verify checkpoint integrity:

import "github.com/hupe1980/agentmesh/pkg/checkpoint"

// Secret for HMAC signing
secret := []byte("your-signing-secret")

// Create signed checkpointer
baseCheckpointer := checkpoint.NewInMemory()
signedCheckpointer := checkpoint.NewSigned(baseCheckpointer, secret)

compiled, _ := g.Build(graph.WithCheckpointer(signedCheckpointer))

Characteristics:

  • Uses HMAC-SHA256 for signing
  • Detects tampering or corruption
  • Works with any storage backend

Combined Security

Use both encryption and signing:

encryptionKey := []byte("your-32-byte-encryption-key-here")
signingSecret := []byte("your-signing-secret")

baseCheckpointer := checkpoint.NewInMemory()

// Layer 1: Encryption
encrypted := checkpoint.NewEncrypted(baseCheckpointer, encryptionKey)

// Layer 2: Signing
signedAndEncrypted := checkpoint.NewSigned(encrypted, signingSecret)

compiled, _ := g.Build(graph.WithCheckpointer(signedAndEncrypted))

Best Practices

Checkpoint Interval

Choose appropriate intervals based on your workload:

// High-value workflows: Save every superstep
graph.WithCheckpointInterval(1)

// Performance-sensitive: Save every 5 supersteps
graph.WithCheckpointInterval(5)

// Manual control: Save only at specific points
graph.WithCheckpointInterval(0)  // Then use checkpoint.Save() manually

Run ID Management

Use meaningful, stable run IDs:

// ✅ Good: Meaningful identifiers
graph.WithRunID("order-processing-" + orderID)
graph.WithRunID("user-" + userID + "-session-" + sessionID)
graph.WithRunID("workflow-" + workflowType + "-" + timestamp)

// ❌ Bad: Random UUIDs (can't resume)
graph.WithRunID(uuid.New().String())

Cleanup

Periodically clean up old checkpoints:

// List checkpoints older than 7 days
checkpoints, _ := checkpointer.List(ctx, "")

for _, cp := range checkpoints {
    if time.Since(cp.Timestamp) > 7*24*time.Hour {
        checkpointer.Delete(ctx, cp.RunID)
    }
}

Monitoring

Track checkpoint performance:

// Checkpoint timing
start := time.Now()
for result := range compiled.Run(ctx, input,
    graph.WithRunID(runID),
    graph.WithCheckpointInterval(1),
) {
    if result.Checkpoint != nil {
        metrics.RecordCheckpointLatency(time.Since(start))
        start = time.Now()
    }
}

// Checkpoint size
cp, _ := checkpointer.Load(ctx, runID)
data, _ := json.Marshal(cp)
metrics.RecordCheckpointSize(len(data))

Examples

See complete working examples: