Overview
AgentMesh provides a comprehensive middleware system for extending and customizing execution behavior across all layers: graph, model, and tool execution.
The middleware system enables you to:
- Observe execution with logging and events
- Modify behavior with caching, retries, and rate limiting
- Protect services with circuit breakers and timeouts
- Monitor usage with token counting and audit logs
- Visualize execution in real-time
Architecture
┌─────────────────────────────────────────────────────┐
│ Agent │
│ ┌───────────────────────────────────────────────┐ │
│ │ Graph Middleware Stack │ │
│ │ • Logging │ │
│ │ • Events │ │
│ │ • Visualization │ │
│ └─────────────────┬─────────────────────────────┘ │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ │ │
│ ┌──────▼─────────┐ ┌──────▼─────────┐ │
│ │ Model Middleware│ │ Tool Middleware │ │
│ │ • Cache │ │ • Cache │ │
│ │ • Retry │ │ • Timeout │ │
│ │ • Rate Limit │ │ • Circuit │ │
│ │ • Token Count │ │ • Audit │ │
│ └────────────────┘ └────────────────┘ │
└─────────────────────────────────────────────────────┘
Middleware Types
Graph Middleware
Graph middleware wraps node execution, providing observability and lifecycle hooks. These functions are in the graph package.
Available Middleware:
- LoggingMiddleware - Structured logging of node execution
- TimingMiddleware - Tracks execution time with callbacks
- RecoveryMiddleware - Recovers from panics and converts them to errors
- ConditionalMiddleware - Applies middleware only when conditions are met
- NodeNameMiddleware - Applies middleware only to specific nodes by name
- VizMiddleware - Integrates with visualization server (in
pkg/viz/middleware)
Usage:
import (
"log/slog"
"github.com/hupe1980/agentmesh/pkg/graph"
graphmw "github.com/hupe1980/agentmesh/pkg/graph/middleware"
)
// Apply logging middleware
b.WithNodeMiddleware(graphmw.LoggingMiddleware(slog.Default()))
// Track timing
b.WithNodeMiddleware(graphmw.TimingMiddleware(func(node string, d time.Duration) {
metrics.RecordLatency(node, d)
}))
// Recover from panics
b.WithNodeMiddleware(graphmw.RecoveryMiddleware(func(node string, r any) {
log.Printf("panic in %s: %v", node, r)
}))
// Apply middleware to specific nodes only
b.WithNodeMiddleware(graphmw.NodeNameMiddleware(
[]string{"slow_node", "external_api"},
graphmw.LoggingMiddleware(slog.Default()),
))
// Chain multiple middleware
b.WithNodeMiddleware(graph.ChainNodeMiddleware(
graphmw.LoggingMiddleware(slog.Default()),
graphmw.TimingMiddleware(timingCallback),
graphmw.RecoveryMiddleware(panicHandler),
))
Model Middleware
Model middleware wraps LLM calls, enabling caching, retries, and usage tracking.
Available Middleware:
- CacheMiddleware - Caches responses to reduce API calls
- RetryMiddleware - Retries failed calls with exponential backoff
- RateLimitMiddleware - Prevents quota exhaustion
- TokenCounterMiddleware - Tracks token usage
Usage:
import modelmw "github.com/hupe1980/agentmesh/pkg/model/middleware"
// Create middleware instances
cache := modelmw.NewCacheMiddleware()
retry := modelmw.NewRetryMiddleware(
modelmw.WithMaxRetries(3),
modelmw.WithInitialBackoff(100*time.Millisecond),
)
rateLimit := modelmw.NewRateLimitMiddleware(10, 100*time.Millisecond)
tokenCounter := modelmw.NewTokenCounterMiddleware()
// Apply to agent
agent.NewReAct(model,
agent.WithModelMiddleware(cache, retry, rateLimit, tokenCounter),
)
// Access statistics
stats := tokenCounter.Stats()
fmt.Printf("Total tokens: %d\n", stats.TotalTokens)
fmt.Printf("API calls: %d\n", stats.CallCount)
Tool Middleware
Tool middleware wraps tool executions, providing timeouts, circuit breakers, and audit logs.
Available Middleware:
- CacheMiddleware - Caches deterministic tool results
- TimeoutMiddleware - Enforces execution timeouts
- CircuitBreakerMiddleware - Implements circuit breaker pattern
- AuditMiddleware - Logs all tool executions
Usage:
import toolmw "github.com/hupe1980/agentmesh/pkg/tool/middleware"
cache := toolmw.NewCacheMiddleware()
timeout := toolmw.NewTimeoutMiddleware(5*time.Second)
cb := toolmw.NewCircuitBreakerMiddleware(3, 30*time.Second)
audit := toolmw.NewAuditMiddleware(logger)
agent.NewReAct(model,
agent.WithTools(tools...),
agent.WithToolMiddleware(cache, timeout, cb, audit),
)
// Check circuit breaker state
fmt.Printf("Circuit state: %s\n", cb.State())
Event Bus Integration
The middleware system integrates with a powerful event bus for loose coupling and observability.
Subscribing to Events
import "github.com/hupe1980/agentmesh/pkg/event"
// Create event bus
eventBus := event.NewBus()
ctx = event.WithBus(ctx, eventBus)
// Subscribe to all events
eventBus.Subscribe(event.HandlerFunc(func(ctx context.Context, evt event.Event) error {
log.Printf("[%s] %s at node %s", evt.Type, evt.Timestamp, evt.Node)
return nil
}))
// Subscribe to specific event types
eventBus.Subscribe(handler,
event.EventNodeStart,
event.EventNodeComplete,
event.EventNodeError,
)
Throughput tip: The bus snapshots handler lists under a short mutex and invokes handlers after releasing it. Slow handlers should spin up their own goroutines or buffering if they need to perform blocking I/O so publishers remain fast.
Available Event Types
Graph Events:
EventGraphStart- Graph execution startedEventGraphComplete- Graph execution completedEventGraphError- Graph execution failed
Node Events:
EventNodeQueued- Node queued for executionEventNodeStart- Node started executingEventNodeComplete- Node completed successfullyEventNodeError- Node execution failedEventNodeStream- Partial output streamed (e.g., LLM chunks during streaming)
Model Events:
EventModelStart- Model call startedEventModelComplete- Model call completedEventModelError- Model call failed
Tool Events:
EventToolStart- Tool execution startedEventToolComplete- Tool execution completedEventToolError- Tool execution failed
State & Execution Events:
EventStateUpdate- State updated after BSP barrier commitEventSuperstepStart- Superstep startedEventSuperstepComplete- Superstep completedEventCheckpointSave- Checkpoint savedEventCheckpointLoad- Checkpoint loadedEventCheckpointError- Checkpoint operation failedEventInterrupt- Execution interrupted (human-in-the-loop)EventResume- Execution resumed after interrupt
Checkpoint Events:
EventCheckpointSave- Checkpoint savedEventCheckpointLoad- Checkpoint loadedEventCheckpointError- Checkpoint operation failed
Interrupt Events:
EventInterrupt- Execution interruptedEventResume- Execution resumed
Custom Middleware
You can create custom middleware to extend execution behavior.
Custom Graph Middleware
Graph middleware is a function of type func(next NodeFunc) NodeFunc:
// Custom middleware function
func MyCustomMiddleware(logger *slog.Logger) graph.NodeMiddleware {
return func(next graph.NodeFunc) graph.NodeFunc {
return func(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
nodeName := scope.NodeName()
// Pre-execution logic
logger.Info("node starting", "node", nodeName)
start := time.Now()
// Execute the wrapped node
cmd, err := next(ctx, scope)
// Post-execution logic
duration := time.Since(start)
if err != nil {
logger.Error("node failed", "node", nodeName, "error", err, "duration", duration)
} else {
logger.Info("node completed", "node", nodeName, "duration", duration)
}
return cmd, err
}
}
}
// Usage
b.WithNodeMiddleware(MyCustomMiddleware(slog.Default()))
Progress Middleware Example
For multi-agent systems like supervisors, you can create middleware that shows which agent is being invoked:
// progressMiddleware shows which tools/agents are being called
func progressMiddleware() graph.NodeMiddleware {
return func(next graph.NodeFunc) graph.NodeFunc {
return func(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
nodeName := scope.NodeName()
start := time.Now()
messages := message.GetMessages(scope)
switch nodeName {
case "model":
fmt.Printf("🤖 Thinking...\n")
case "tool":
// Find the last AI message to see which tools are being called
for i := len(messages) - 1; i >= 0; i-- {
if aiMsg, ok := messages[i].(*message.AIMessage); ok && len(aiMsg.ToolCalls) > 0 {
for _, tc := range aiMsg.ToolCalls {
fmt.Printf("🔧 Calling: %s\n", tc.Name)
}
break
}
}
}
result, err := next(ctx, scope)
if nodeName == "tool" {
duration := time.Since(start)
if err != nil {
fmt.Printf(" ❌ Failed after %s\n", duration.Round(time.Millisecond))
} else {
fmt.Printf(" ✅ Done (%s)\n", duration.Round(time.Millisecond))
}
}
return result, err
}
}
}
// Apply to a supervisor
supervisor, _ := agent.NewSupervisor(
model,
agent.WithWorker("researcher", "Research expert", researchAgent),
agent.WithWorker("writer", "Content writer", writerAgent),
agent.WithRunMiddleware(progressMiddleware()),
)
Output:
🤖 Thinking...
🔧 Calling: handoff_to_researcher
✅ Done (5.734s)
🤖 Thinking...
🔧 Calling: handoff_to_writer
✅ Done (8.123s)
See the examples/blogwriter/ example for a complete implementation.
Custom Model Middleware
type CustomModelMiddleware struct {
// Your fields
}
func (m *CustomModelMiddleware) Wrap(next model.Executor) model.Executor {
return model.WrapFunc(func(ctx context.Context, req *model.Request) iter.Seq2[*model.Response, error] {
return func(yield func(*model.Response, error) bool) {
// Pre-execution
log.Println("Model call starting")
// Execute and wrap responses
for resp, err := range next.Generate(ctx, req) {
// Process/modify response
if resp != nil {
log.Printf("Received %d tokens", resp.Usage.TotalTokens)
}
if !yield(resp, err) {
return
}
}
}
})
}
Custom Tool Middleware
type CustomToolMiddleware struct {
// Your fields
}
func (m *CustomToolMiddleware) Wrap(next tool.Executor) tool.Executor {
return tool.WrapFunc(func(ctx context.Context, calls []tool.Call) ([]tool.ExecutionResult, error) {
// Pre-execution
log.Printf("Executing %d tools", len(calls))
// Execute
results, err := next.Execute(ctx, calls)
// Post-execution
if err == nil {
for _, result := range results {
if result.Error != nil {
log.Printf("Tool %s failed: %v", result.ToolName, result.Error)
}
}
}
return results, err
})
}
Middleware Composition
Middleware is composed using the ChainNodeMiddleware function, which applies middleware in reverse order:
executor = graph.ChainNodeMiddleware(
middleware1, // Applied first (outermost)
middleware2,
middleware3, // Applied last (innermost)
)(executor)
// Execution flow:
// middleware1 → middleware2 → middleware3 → executor
When using agent options, middleware is automatically chained:
agent.WithModelMiddleware(cache, retry, rateLimit)
// Results in: cache(retry(rateLimit(executor)))
Best Practices
- Order Matters: Place logging/observability middleware first, then caching, then retry logic
- Resource Cleanup: Use
deferfor cleanup (e.g.,defer rateLimit.Close()) - Error Handling: Middleware should be resilient - don’t fail execution on observability errors
- State Management: Use context for request-scoped state, use middleware fields for global state
- Performance: Cache and rate limit middleware can significantly reduce costs
- Testing: Test middleware independently before composing them
Examples
See the following examples for complete implementations:
- Basic Usage:
examples/middleware/- Demonstrates all middleware types - Observability:
examples/observability/- Event bus and monitoring - Visualization:
examples/viz_ui_demo/- Real-time visualization integration - Progress Middleware:
examples/blogwriter/- Shows progress during multi-agent workflows