Quick Start
Enable observability by attaching providers to context:
import (
"github.com/hupe1980/agentmesh/pkg/graph"
"github.com/hupe1980/agentmesh/pkg/logging"
"github.com/hupe1980/agentmesh/pkg/metrics"
"github.com/hupe1980/agentmesh/pkg/trace"
)
// Configure providers (noop for development)
logger := logging.NoopLogger{}
metricsProvider := metrics.Noop()
traceProvider := trace.Noop()
// Attach providers to context
ctx = logging.WithLogger(ctx, logger)
ctx = trace.WithProvider(ctx, traceProvider)
ctx = metrics.WithProvider(ctx, metricsProvider)
// Execute with automatic instrumentation
for msg, err := range compiled.Run(ctx, messages) {
if err != nil {
log.Fatal(err)
}
// Process message
fmt.Println(msg.Content())
}
Configuration
Development (Noop Providers)
For testing with zero overhead:
// Attach noop providers to context
ctx = logging.WithLogger(ctx, logging.NoopLogger{})
ctx = trace.WithProvider(ctx, trace.Noop())
ctx = metrics.WithProvider(ctx, metrics.Noop())
for msg, err := range compiled.Run(ctx, messages) {
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Content())
}
Production (OpenTelemetry)
For production with OpenTelemetry:
import (
"log/slog"
"os"
"github.com/hupe1980/agentmesh/pkg/logging"
"github.com/hupe1980/agentmesh/pkg/metrics/opentelemetry"
"github.com/hupe1980/agentmesh/pkg/trace/opentelemetry"
)
// Configure structured logging (built-in slog adapter)
logger := logging.NewSlogLogger(
logging.LogLevelInfo, // Debug, Info, Warn, Error
logging.LogFormatJSON, // JSON or Text format
)
// Configure OpenTelemetry tracing
traceProvider := opentelemetry.NewProvider(
opentelemetry.WithEndpoint("http://jaeger:4318"),
opentelemetry.WithServiceName("my-agent-service"),
)
// Configure OpenTelemetry metrics
metricsProvider := opentelemetry.NewMetricsProvider(
opentelemetry.WithEndpoint("http://prometheus:9090"),
)
// Attach providers to context
ctx = logging.WithLogger(ctx, logger)
ctx = trace.WithProvider(ctx, traceProvider)
ctx = metrics.WithProvider(ctx, metricsProvider)
// Execute with full observability
for msg, err := range compiled.Run(ctx, messages) {
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Content())
}
What Gets Instrumented
When you configure providers, AgentMesh automatically:
1. Emits Structured Logs
Throughout execution, the runtime emits structured logs using logging.FromContext():
Graph Runtime:
- Graph execution start/completion (Info level)
- Checkpoint save/restore operations (Info/Debug)
- Checkpoint failures (Error level)
- Graph execution failures (Error level)
Node Execution:
- Node start/completion (Debug level)
- Node failures (Error level)
- Human pause events (Info level)
Pregel Runtime:
- Superstep start/completion (Debug level)
- Frontier consumption (Debug level)
- Runtime failures (Error level)
All logs include structured attributes like run_id, superstep, node, duration_ms, etc.
2. Creates Trace Spans
- Graph execution - Overall
Run()duration - Node execution - Every node that runs, including timing
- Checkpoint operations - Save and restore operations
Example trace hierarchy:
graph.execute (1.5s)
├── node.execute[step1] (500ms)
├── node.execute[step2] (700ms)
└── checkpoint.save (50ms)
3. Records Metrics
All metrics include relevant labels (node.name, superstep, etc.):
Node Metrics:
agentgraph.node.executions(counter) - Number of node executionsagentgraph.node.latency_ms(histogram) - Node execution durationagentgraph.node.errors(counter) - Node execution errors
Graph Metrics:
agentgraph.graph.executions(counter) - Number of graph executionsagentgraph.superstep.latency_ms(histogram) - Superstep duration
4. Propagates Context
All providers are automatically attached to context and available in node functions:
func(ctx context.Context, view graph.View) (*graph.Command, error) {
log := logging.FromContext(ctx)
tp := trace.FromContext(ctx)
mp := metrics.FromContext(ctx)
// Use for custom instrumentation
log.Info("Processing data", "count", len(data))
// ... more below
return graph.To("next"), nil
}
Custom Instrumentation
Access providers in your node RunFuncs for custom instrumentation:
Logging
func(ctx context.Context, view graph.View) (*graph.Command, error) {
log := logging.FromContext(ctx)
log.Info("Starting processing", "node", "data_processor")
log.Debug("Details", "records", len(data))
log.Warn("Slow operation detected", "duration_ms", elapsed)
return graph.To("next"), nil
}
Custom Spans
func(ctx context.Context, view graph.View) (*graph.Command, error) {
tp := trace.FromContext(ctx)
tracer := tp.Tracer("my-service")
// Create custom span for sub-operation
ctx, span := tracer.Start(ctx, "database-query",
trace.Attr{Key: "query", Value: sql},
)
defer span.End(nil)
// Execute operation
results := queryDatabase(ctx, sql)
return graph.To("next"), nil
}
Custom Metrics
func(ctx context.Context, view graph.View) (*graph.Command, error) {
mp := metrics.FromContext(ctx)
// Record counter
processedCounter := mp.Counter("records.processed")
processedCounter.Add(ctx, int64(len(data)),
metrics.Attr{Key: "type", Value: "user_data"},
)
// Record histogram
duration := mp.Histogram("operation.duration_ms")
duration.Record(ctx, float64(elapsed.Milliseconds()))
return graph.To("next"), nil
}
Metrics Reference
Automatically Collected
| Metric | Type | Description | Labels |
|---|---|---|---|
agentgraph.node.executions |
Counter | Node execution count | node.name |
agentgraph.node.latency_ms |
Histogram | Node execution time | node.name |
agentgraph.node.errors |
Counter | Node execution errors | node.name, error.type |
agentgraph.graph.executions |
Counter | Graph execution count | graph.name, success |
agentgraph.superstep.latency_ms |
Histogram | Superstep duration | superstep, node_count |
Zero Overhead
If you don’t configure providers, AgentMesh uses noop implementations with zero performance overhead:
// No providers = zero overhead
for msg, err := range compiled.Run(ctx, messages) {
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Content())
}
Benefits
✅ Automatic instrumentation - No code changes needed
✅ Explicit configuration - Clear, type-safe options
✅ Production ready - OpenTelemetry compatible
✅ Zero overhead - Noop providers when not configured
✅ Custom instrumentation - Full provider access in nodes
✅ Context propagation - Providers automatically available everywhere
Event Stream Backpressure
AgentMesh streams execution events through a dedicated safeEventChan so UIs and CLIs can render progress in real time. The channel now uses a non-blocking fast path (no timers allocated when there is buffer space) and a cancellable 100 ms timeout when consumers fall behind. This keeps observability overhead near zero during steady-state runs while still providing bounded backpressure if you pause or slow a listener.
- Fast path – Immediate sends avoid creating
time.Aftertimers entirely. - Bounded waits – When the buffer is full, the runtime waits up to 100 ms before dropping the event.
- No leaks – Cancellable timers ensure there are no stray goroutines or wakeups once the send succeeds or times out.
Tune pregel.DefaultEventChanBufferSize or implement client-side sampling if you need deeper buffers.
Examples
- observability - Automatic instrumentation setup
- custom_observability - Custom instrumentation in nodes
Related
- Middleware System - Extend execution with caching, retries, rate limiting, circuit breakers, and more
- Streaming - Real-time event streaming for responsive UIs