Overview
AgentMesh provides a comprehensive middleware system for extending and customizing execution behavior across all layers: graph, model, and tool execution.
The middleware system enables you to:
- Observe execution with logging and events
- Modify behavior with caching, retries, and rate limiting
- Protect services with circuit breakers and timeouts
- Monitor usage with token counting and audit logs
- Visualize execution in real-time
Architecture
┌─────────────────────────────────────────────────────┐
│ Agent │
│ ┌───────────────────────────────────────────────┐ │
│ │ Graph Middleware Stack │ │
│ │ • Logging │ │
│ │ • Events │ │
│ │ • Visualization │ │
│ └─────────────────┬─────────────────────────────┘ │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ │ │
│ ┌──────▼─────────┐ ┌──────▼─────────┐ │
│ │ Model Middleware│ │ Tool Middleware │ │
│ │ • Cache │ │ • Cache │ │
│ │ • Retry │ │ • Timeout │ │
│ │ • Rate Limit │ │ • Circuit │ │
│ │ • Token Count │ │ • Audit │ │
│ └────────────────┘ └────────────────┘ │
└─────────────────────────────────────────────────────┘
Middleware Types
Graph Middleware
Graph middleware wraps node execution, providing observability and lifecycle hooks. These functions are in the graph package.
Available Middleware:
- LoggingMiddleware - Structured logging of node execution
- TimingMiddleware - Tracks execution time with callbacks
- RecoveryMiddleware - Recovers from panics and converts them to errors
- ConditionalMiddleware - Applies middleware only when conditions are met
- NodeMiddleware - Applies middleware only to specific nodes
- VizMiddleware - Integrates with visualization server (in
pkg/viz/middleware)
Usage:
import (
"log/slog"
"github.com/hupe1980/agentmesh/pkg/graph"
)
// Apply logging middleware
b.WithMiddleware(graph.LoggingMiddleware(slog.Default()))
// Track timing
b.WithMiddleware(graph.TimingMiddleware(func(node string, d time.Duration) {
metrics.RecordLatency(node, d)
}))
// Recover from panics
b.WithMiddleware(graph.RecoveryMiddleware(func(node string, r any) {
log.Printf("panic in %s: %v", node, r)
}))
// Apply middleware to specific nodes only
b.WithMiddleware(graph.NodeMiddleware(
[]string{"slow_node", "external_api"},
graph.LoggingMiddleware(slog.Default()),
))
Model Middleware
Model middleware wraps LLM calls, enabling caching, retries, and usage tracking.
Available Middleware:
- CacheMiddleware - Caches responses to reduce API calls
- RetryMiddleware - Retries failed calls with exponential backoff
- RateLimitMiddleware - Prevents quota exhaustion
- TokenCounterMiddleware - Tracks token usage
Usage:
import modelmw "github.com/hupe1980/agentmesh/pkg/model/middleware"
// Create middleware instances
cache := modelmw.NewCacheMiddleware()
retry := modelmw.NewRetryMiddleware(
modelmw.WithMaxRetries(3),
modelmw.WithInitialBackoff(100*time.Millisecond),
)
rateLimit := modelmw.NewRateLimitMiddleware(10, 100*time.Millisecond)
tokenCounter := modelmw.NewTokenCounterMiddleware()
// Apply to agent
agent.NewReAct(model,
agent.WithModelMiddleware(cache, retry, rateLimit, tokenCounter),
)
// Access statistics
stats := tokenCounter.Stats()
fmt.Printf("Total tokens: %d\n", stats.TotalTokens)
fmt.Printf("API calls: %d\n", stats.CallCount)
Tool Middleware
Tool middleware wraps tool executions, providing timeouts, circuit breakers, and audit logs.
Available Middleware:
- CacheMiddleware - Caches deterministic tool results
- TimeoutMiddleware - Enforces execution timeouts
- CircuitBreakerMiddleware - Implements circuit breaker pattern
- AuditMiddleware - Logs all tool executions
Usage:
import toolmw "github.com/hupe1980/agentmesh/pkg/tool/middleware"
cache := toolmw.NewCacheMiddleware()
timeout := toolmw.NewTimeoutMiddleware(5*time.Second)
cb := toolmw.NewCircuitBreakerMiddleware(3, 30*time.Second)
audit := toolmw.NewAuditMiddleware(logger)
agent.NewReAct(model,
agent.WithTools(tools...),
agent.WithToolMiddleware(cache, timeout, cb, audit),
)
// Check circuit breaker state
fmt.Printf("Circuit state: %s\n", cb.State())
Event Bus Integration
The middleware system integrates with a powerful event bus for loose coupling and observability.
Subscribing to Events
import "github.com/hupe1980/agentmesh/pkg/event"
// Create event bus
eventBus := event.NewBus()
ctx = event.WithBus(ctx, eventBus)
// Subscribe to all events
eventBus.Subscribe(event.HandlerFunc(func(ctx context.Context, evt event.Event) error {
log.Printf("[%s] %s at node %s", evt.Type, evt.Timestamp, evt.Node)
return nil
}))
// Subscribe to specific event types
eventBus.Subscribe(handler,
event.EventNodeStart,
event.EventNodeComplete,
event.EventNodeError,
)
Throughput tip: The bus snapshots handler lists under a short mutex and invokes handlers after releasing it. Slow handlers should spin up their own goroutines or buffering if they need to perform blocking I/O so publishers remain fast.
Available Event Types
Graph Events:
EventGraphStart- Graph execution startedEventGraphComplete- Graph execution completedEventGraphError- Graph execution failed
Node Events:
EventNodeStart- Node started executingEventNodeComplete- Node completed successfullyEventNodeError- Node execution failed
Model Events:
EventModelStart- Model call startedEventModelComplete- Model call completedEventModelError- Model call failed
Tool Events:
EventToolStart- Tool execution startedEventToolComplete- Tool execution completedEventToolError- Tool execution failed
Checkpoint Events:
EventCheckpointSave- Checkpoint savedEventCheckpointLoad- Checkpoint loadedEventCheckpointError- Checkpoint operation failed
Interrupt Events:
EventInterrupt- Execution interruptedEventResume- Execution resumed
Custom Middleware
You can create custom middleware to extend execution behavior.
Custom Graph Middleware
Graph middleware is a function of type func(next NodeFunc) NodeFunc:
// Custom middleware function
func MyCustomMiddleware(logger *slog.Logger) graph.Middleware {
return func(next graph.NodeFunc) graph.NodeFunc {
return func(ctx context.Context, view graph.View) (*graph.Command, error) {
nodeName := graph.NodeNameFromContext(ctx)
// Pre-execution logic
logger.Info("node starting", "node", nodeName)
start := time.Now()
// Execute the wrapped node
cmd, err := next(ctx, view)
// Post-execution logic
duration := time.Since(start)
if err != nil {
logger.Error("node failed", "node", nodeName, "error", err, "duration", duration)
} else {
logger.Info("node completed", "node", nodeName, "duration", duration)
}
return cmd, err
}
}
}
// Usage
b.WithMiddleware(MyCustomMiddleware(slog.Default()))
Custom Model Middleware
type CustomModelMiddleware struct {
// Your fields
}
func (m *CustomModelMiddleware) Wrap(next model.Executor) model.Executor {
return model.WrapFunc(func(ctx context.Context, req *model.Request) iter.Seq2[*model.Response, error] {
return func(yield func(*model.Response, error) bool) {
// Pre-execution
log.Println("Model call starting")
// Execute and wrap responses
for resp, err := range next.Generate(ctx, req) {
// Process/modify response
if resp != nil {
log.Printf("Received %d tokens", resp.Usage.TotalTokens)
}
if !yield(resp, err) {
return
}
}
}
})
}
Custom Tool Middleware
type CustomToolMiddleware struct {
// Your fields
}
func (m *CustomToolMiddleware) Wrap(next tool.Executor) tool.Executor {
return tool.WrapFunc(func(ctx context.Context, calls []tool.Call) ([]tool.ExecutionResult, error) {
// Pre-execution
log.Printf("Executing %d tools", len(calls))
// Execute
results, err := next.Execute(ctx, calls)
// Post-execution
if err == nil {
for _, result := range results {
if result.Error != nil {
log.Printf("Tool %s failed: %v", result.ToolName, result.Error)
}
}
}
return results, err
})
}
Middleware Composition
Middleware is composed using the Chain function, which applies middleware in reverse order:
executor = graph.Chain(executor,
middleware1, // Applied first (outermost)
middleware2,
middleware3, // Applied last (innermost)
)
// Execution flow:
// middleware1 → middleware2 → middleware3 → executor
When using agent options, middleware is automatically chained:
agent.WithModelMiddleware(cache, retry, rateLimit)
// Results in: cache(retry(rateLimit(executor)))
Best Practices
- Order Matters: Place logging/observability middleware first, then caching, then retry logic
- Resource Cleanup: Use
deferfor cleanup (e.g.,defer rateLimit.Close()) - Error Handling: Middleware should be resilient - don’t fail execution on observability errors
- State Management: Use context for request-scoped state, use middleware fields for global state
- Performance: Cache and rate limit middleware can significantly reduce costs
- Testing: Test middleware independently before composing them
Examples
See the following examples for complete implementations:
- Basic Usage:
examples/middleware/- Demonstrates all middleware types - Observability:
examples/observability/- Event bus and monitoring - Visualization:
examples/viz_ui_demo/- Real-time visualization integration - Custom Middleware:
examples/guardrails/- Building custom middleware for content filtering