Go 1.24+ GitHub

Middleware Architecture

Build resilient, observable workflows with composable middleware for caching, retries, circuit breakers, and more.

Overview

AgentMesh provides a comprehensive middleware system for extending and customizing execution behavior across all layers: graph, model, and tool execution.

The middleware system enables you to:

  • Observe execution with logging and events
  • Modify behavior with caching, retries, and rate limiting
  • Protect services with circuit breakers and timeouts
  • Monitor usage with token counting and audit logs
  • Visualize execution in real-time

Architecture

┌─────────────────────────────────────────────────────┐
│                     Agent                            │
│  ┌───────────────────────────────────────────────┐  │
│  │         Graph Middleware Stack                 │  │
│  │  • Logging                                     │  │
│  │  • Events                                      │  │
│  │  • Visualization                               │  │
│  └─────────────────┬─────────────────────────────┘  │
│                    │                                 │
│         ┌──────────┴──────────┐                     │
│         │                     │                     │
│  ┌──────▼─────────┐   ┌──────▼─────────┐           │
│  │ Model Middleware│   │ Tool Middleware │           │
│  │  • Cache        │   │  • Cache       │           │
│  │  • Retry        │   │  • Timeout     │           │
│  │  • Rate Limit   │   │  • Circuit     │           │
│  │  • Token Count  │   │  • Audit       │           │
│  └────────────────┘   └────────────────┘           │
└─────────────────────────────────────────────────────┘

Middleware Types

Graph Middleware

Graph middleware wraps node execution, providing observability and lifecycle hooks. These functions are in the graph package.

Available Middleware:

  • LoggingMiddleware - Structured logging of node execution
  • TimingMiddleware - Tracks execution time with callbacks
  • RecoveryMiddleware - Recovers from panics and converts them to errors
  • ConditionalMiddleware - Applies middleware only when conditions are met
  • NodeNameMiddleware - Applies middleware only to specific nodes by name
  • VizMiddleware - Integrates with visualization server (in pkg/viz/middleware)

Usage:

import (
    "log/slog"
    "github.com/hupe1980/agentmesh/pkg/graph"
    graphmw "github.com/hupe1980/agentmesh/pkg/graph/middleware"
)

// Apply logging middleware
b.WithNodeMiddleware(graphmw.LoggingMiddleware(slog.Default()))

// Track timing
b.WithNodeMiddleware(graphmw.TimingMiddleware(func(node string, d time.Duration) {
    metrics.RecordLatency(node, d)
}))

// Recover from panics
b.WithNodeMiddleware(graphmw.RecoveryMiddleware(func(node string, r any) {
    log.Printf("panic in %s: %v", node, r)
}))

// Apply middleware to specific nodes only
b.WithNodeMiddleware(graphmw.NodeNameMiddleware(
    []string{"slow_node", "external_api"},
    graphmw.LoggingMiddleware(slog.Default()),
))

// Chain multiple middleware
b.WithNodeMiddleware(graph.ChainNodeMiddleware(
    graphmw.LoggingMiddleware(slog.Default()),
    graphmw.TimingMiddleware(timingCallback),
    graphmw.RecoveryMiddleware(panicHandler),
))

Model Middleware

Model middleware wraps LLM calls, enabling caching, retries, and usage tracking.

Available Middleware:

  • CacheMiddleware - Caches responses to reduce API calls
  • RetryMiddleware - Retries failed calls with exponential backoff
  • RateLimitMiddleware - Prevents quota exhaustion
  • TokenCounterMiddleware - Tracks token usage

Usage:

import modelmw "github.com/hupe1980/agentmesh/pkg/model/middleware"

// Create middleware instances
cache := modelmw.NewCacheMiddleware()
retry := modelmw.NewRetryMiddleware(
    modelmw.WithMaxRetries(3),
    modelmw.WithInitialBackoff(100*time.Millisecond),
)
rateLimit := modelmw.NewRateLimitMiddleware(10, 100*time.Millisecond)
tokenCounter := modelmw.NewTokenCounterMiddleware()

// Apply to agent
agent.NewReAct(model,
    agent.WithModelMiddleware(cache, retry, rateLimit, tokenCounter),
)

// Access statistics
stats := tokenCounter.Stats()
fmt.Printf("Total tokens: %d\n", stats.TotalTokens)
fmt.Printf("API calls: %d\n", stats.CallCount)

Tool Middleware

Tool middleware wraps tool executions, providing timeouts, circuit breakers, and audit logs.

Available Middleware:

  • CacheMiddleware - Caches deterministic tool results
  • TimeoutMiddleware - Enforces execution timeouts
  • CircuitBreakerMiddleware - Implements circuit breaker pattern
  • AuditMiddleware - Logs all tool executions

Usage:

import toolmw "github.com/hupe1980/agentmesh/pkg/tool/middleware"

cache := toolmw.NewCacheMiddleware()
timeout := toolmw.NewTimeoutMiddleware(5*time.Second)
cb := toolmw.NewCircuitBreakerMiddleware(3, 30*time.Second)
audit := toolmw.NewAuditMiddleware(logger)

agent.NewReAct(model,
    agent.WithTools(tools...),
    agent.WithToolMiddleware(cache, timeout, cb, audit),
)

// Check circuit breaker state
fmt.Printf("Circuit state: %s\n", cb.State())

Event Bus Integration

The middleware system integrates with a powerful event bus for loose coupling and observability.

Subscribing to Events

import "github.com/hupe1980/agentmesh/pkg/event"

// Create event bus
eventBus := event.NewBus()
ctx = event.WithBus(ctx, eventBus)

// Subscribe to all events
eventBus.Subscribe(event.HandlerFunc(func(ctx context.Context, evt event.Event) error {
    log.Printf("[%s] %s at node %s", evt.Type, evt.Timestamp, evt.Node)
    return nil
}))

// Subscribe to specific event types
eventBus.Subscribe(handler,
    event.EventNodeStart,
    event.EventNodeComplete,
    event.EventNodeError,
)

Throughput tip: The bus snapshots handler lists under a short mutex and invokes handlers after releasing it. Slow handlers should spin up their own goroutines or buffering if they need to perform blocking I/O so publishers remain fast.

Available Event Types

Graph Events:

  • EventGraphStart - Graph execution started
  • EventGraphComplete - Graph execution completed
  • EventGraphError - Graph execution failed

Node Events:

  • EventNodeQueued - Node queued for execution
  • EventNodeStart - Node started executing
  • EventNodeComplete - Node completed successfully
  • EventNodeError - Node execution failed
  • EventNodeStream - Partial output streamed (e.g., LLM chunks during streaming)

Model Events:

  • EventModelStart - Model call started
  • EventModelComplete - Model call completed
  • EventModelError - Model call failed

Tool Events:

  • EventToolStart - Tool execution started
  • EventToolComplete - Tool execution completed
  • EventToolError - Tool execution failed

State & Execution Events:

  • EventStateUpdate - State updated after BSP barrier commit
  • EventSuperstepStart - Superstep started
  • EventSuperstepComplete - Superstep completed
  • EventCheckpointSave - Checkpoint saved
  • EventCheckpointLoad - Checkpoint loaded
  • EventCheckpointError - Checkpoint operation failed
  • EventInterrupt - Execution interrupted (human-in-the-loop)
  • EventResume - Execution resumed after interrupt

Checkpoint Events:

  • EventCheckpointSave - Checkpoint saved
  • EventCheckpointLoad - Checkpoint loaded
  • EventCheckpointError - Checkpoint operation failed

Interrupt Events:

  • EventInterrupt - Execution interrupted
  • EventResume - Execution resumed

Custom Middleware

You can create custom middleware to extend execution behavior.

Custom Graph Middleware

Graph middleware is a function of type func(next NodeFunc) NodeFunc:

// Custom middleware function
func MyCustomMiddleware(logger *slog.Logger) graph.NodeMiddleware {
    return func(next graph.NodeFunc) graph.NodeFunc {
        return func(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
            nodeName := scope.NodeName()
            
            // Pre-execution logic
            logger.Info("node starting", "node", nodeName)
            start := time.Now()
            
            // Execute the wrapped node
            cmd, err := next(ctx, scope)
            
            // Post-execution logic
            duration := time.Since(start)
            if err != nil {
                logger.Error("node failed", "node", nodeName, "error", err, "duration", duration)
            } else {
                logger.Info("node completed", "node", nodeName, "duration", duration)
            }
            
            return cmd, err
        }
    }
}

// Usage
b.WithNodeMiddleware(MyCustomMiddleware(slog.Default()))

Progress Middleware Example

For multi-agent systems like supervisors, you can create middleware that shows which agent is being invoked:

// progressMiddleware shows which tools/agents are being called
func progressMiddleware() graph.NodeMiddleware {
    return func(next graph.NodeFunc) graph.NodeFunc {
        return func(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
            nodeName := scope.NodeName()
            start := time.Now()
            messages := message.GetMessages(scope)

            switch nodeName {
            case "model":
                fmt.Printf("🤖 Thinking...\n")
            case "tool":
                // Find the last AI message to see which tools are being called
                for i := len(messages) - 1; i >= 0; i-- {
                    if aiMsg, ok := messages[i].(*message.AIMessage); ok && len(aiMsg.ToolCalls) > 0 {
                        for _, tc := range aiMsg.ToolCalls {
                            fmt.Printf("🔧 Calling: %s\n", tc.Name)
                        }
                        break
                    }
                }
            }

            result, err := next(ctx, scope)

            if nodeName == "tool" {
                duration := time.Since(start)
                if err != nil {
                    fmt.Printf("   ❌ Failed after %s\n", duration.Round(time.Millisecond))
                } else {
                    fmt.Printf("   ✅ Done (%s)\n", duration.Round(time.Millisecond))
                }
            }

            return result, err
        }
    }
}

// Apply to a supervisor
supervisor, _ := agent.NewSupervisor(
    model,
    agent.WithWorker("researcher", "Research expert", researchAgent),
    agent.WithWorker("writer", "Content writer", writerAgent),
    agent.WithRunMiddleware(progressMiddleware()),
)

Output:

🤖 Thinking...
🔧 Calling: handoff_to_researcher
   ✅ Done (5.734s)
🤖 Thinking...
🔧 Calling: handoff_to_writer
   ✅ Done (8.123s)

See the examples/blogwriter/ example for a complete implementation.

Custom Model Middleware

type CustomModelMiddleware struct {
    // Your fields
}

func (m *CustomModelMiddleware) Wrap(next model.Executor) model.Executor {
    return model.WrapFunc(func(ctx context.Context, req *model.Request) iter.Seq2[*model.Response, error] {
        return func(yield func(*model.Response, error) bool) {
            // Pre-execution
            log.Println("Model call starting")
            
            // Execute and wrap responses
            for resp, err := range next.Generate(ctx, req) {
                // Process/modify response
                if resp != nil {
                    log.Printf("Received %d tokens", resp.Usage.TotalTokens)
                }
                if !yield(resp, err) {
                    return
                }
            }
        }
    })
}

Custom Tool Middleware

type CustomToolMiddleware struct {
    // Your fields
}

func (m *CustomToolMiddleware) Wrap(next tool.Executor) tool.Executor {
    return tool.WrapFunc(func(ctx context.Context, calls []tool.Call) ([]tool.ExecutionResult, error) {
        // Pre-execution
        log.Printf("Executing %d tools", len(calls))
        
        // Execute
        results, err := next.Execute(ctx, calls)
        
        // Post-execution
        if err == nil {
            for _, result := range results {
                if result.Error != nil {
                    log.Printf("Tool %s failed: %v", result.ToolName, result.Error)
                }
            }
        }
        
        return results, err
    })
}

Middleware Composition

Middleware is composed using the ChainNodeMiddleware function, which applies middleware in reverse order:

executor = graph.ChainNodeMiddleware(
    middleware1, // Applied first (outermost)
    middleware2,
    middleware3, // Applied last (innermost)
)(executor)

// Execution flow:
// middleware1 → middleware2 → middleware3 → executor

When using agent options, middleware is automatically chained:

agent.WithModelMiddleware(cache, retry, rateLimit)
// Results in: cache(retry(rateLimit(executor)))

Best Practices

  1. Order Matters: Place logging/observability middleware first, then caching, then retry logic
  2. Resource Cleanup: Use defer for cleanup (e.g., defer rateLimit.Close())
  3. Error Handling: Middleware should be resilient - don’t fail execution on observability errors
  4. State Management: Use context for request-scoped state, use middleware fields for global state
  5. Performance: Cache and rate limit middleware can significantly reduce costs
  6. Testing: Test middleware independently before composing them

Examples

See the following examples for complete implementations:

  • Basic Usage: examples/middleware/ - Demonstrates all middleware types
  • Observability: examples/observability/ - Event bus and monitoring
  • Visualization: examples/viz_ui_demo/ - Real-time visualization integration
  • Progress Middleware: examples/blogwriter/ - Shows progress during multi-agent workflows