Go 1.24+ GitHub

Monitor graph execution

Track agent workflows with built-in OpenTelemetry metrics and distributed tracing support.

Quick Start

Enable observability by attaching providers to context:

import (
    "github.com/hupe1980/agentmesh/pkg/graph"
    "github.com/hupe1980/agentmesh/pkg/logging"
    "github.com/hupe1980/agentmesh/pkg/metrics"
    "github.com/hupe1980/agentmesh/pkg/trace"
)

// Configure providers (noop for development)
logger := logging.NoopLogger{}
metricsProvider := metrics.Noop()
traceProvider := trace.Noop()

// Attach providers to context
ctx = logging.WithLogger(ctx, logger)
ctx = trace.WithProvider(ctx, traceProvider)
ctx = metrics.WithProvider(ctx, metricsProvider)

// Execute with automatic instrumentation
for msg, err := range compiled.Run(ctx, messages) {
    if err != nil {
        log.Fatal(err)
    }
    // Process message
    fmt.Println(msg.Content())
}

Configuration

Development (Noop Providers)

For testing with zero overhead:

// Attach noop providers to context
ctx = logging.WithLogger(ctx, logging.NoopLogger{})
ctx = trace.WithProvider(ctx, trace.Noop())
ctx = metrics.WithProvider(ctx, metrics.Noop())

for msg, err := range compiled.Run(ctx, messages) {
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(msg.Content())
}

Production (OpenTelemetry)

For production with OpenTelemetry:

import (
    "log/slog"
    "os"
    "github.com/hupe1980/agentmesh/pkg/logging"
    "github.com/hupe1980/agentmesh/pkg/metrics/opentelemetry"
    "github.com/hupe1980/agentmesh/pkg/trace/opentelemetry"
)

// Configure structured logging (built-in slog adapter)
logger := logging.NewSlogLogger(
    logging.LogLevelInfo,      // Debug, Info, Warn, Error
    logging.LogFormatJSON,     // JSON or Text format
)

// Configure OpenTelemetry tracing
traceProvider := opentelemetry.NewProvider(
    opentelemetry.WithEndpoint("http://jaeger:4318"),
    opentelemetry.WithServiceName("my-agent-service"),
)

// Configure OpenTelemetry metrics
metricsProvider := opentelemetry.NewMetricsProvider(
    opentelemetry.WithEndpoint("http://prometheus:9090"),
)

// Attach providers to context
ctx = logging.WithLogger(ctx, logger)
ctx = trace.WithProvider(ctx, traceProvider)
ctx = metrics.WithProvider(ctx, metricsProvider)

// Execute with full observability
for msg, err := range compiled.Run(ctx, messages) {
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(msg.Content())
}

What Gets Instrumented

When you configure providers, AgentMesh automatically:

1. Emits Structured Logs

Throughout execution, the runtime emits structured logs using logging.FromContext():

Graph Runtime:

  • Graph execution start/completion (Info level)
  • Checkpoint save/restore operations (Info/Debug)
  • Checkpoint failures (Error level)
  • Graph execution failures (Error level)

Node Execution:

  • Node start/completion (Debug level)
  • Node failures (Error level)
  • Human pause events (Info level)

Pregel Runtime:

  • Superstep start/completion (Debug level)
  • Frontier consumption (Debug level)
  • Runtime failures (Error level)

All logs include structured attributes like run_id, superstep, node, duration_ms, etc.

2. Creates Trace Spans

  • Graph execution - Overall Run() duration
  • Node execution - Every node that runs, including timing
  • Checkpoint operations - Save and restore operations

Example trace hierarchy:

graph.execute (1.5s)
├── node.execute[step1] (500ms)
├── node.execute[step2] (700ms)
└── checkpoint.save (50ms)

3. Records Metrics

All metrics include relevant labels (node.name, superstep, etc.):

Node Metrics:

  • agentgraph.node.executions (counter) - Number of node executions
  • agentgraph.node.latency_ms (histogram) - Node execution duration
  • agentgraph.node.errors (counter) - Node execution errors

Graph Metrics:

  • agentgraph.graph.executions (counter) - Number of graph executions
  • agentgraph.superstep.latency_ms (histogram) - Superstep duration

4. Propagates Context

All providers are automatically attached to context and available in node functions:

func(ctx context.Context, view graph.View) (*graph.Command, error) {
    log := logging.FromContext(ctx)
    tp := trace.FromContext(ctx)
    mp := metrics.FromContext(ctx)
    
    // Use for custom instrumentation
    log.Info("Processing data", "count", len(data))
    // ... more below
    
    return graph.To("next"), nil
}

Custom Instrumentation

Access providers in your node RunFuncs for custom instrumentation:

Logging

func(ctx context.Context, view graph.View) (*graph.Command, error) {
    log := logging.FromContext(ctx)
    
    log.Info("Starting processing", "node", "data_processor")
    log.Debug("Details", "records", len(data))
    log.Warn("Slow operation detected", "duration_ms", elapsed)
    
    return graph.To("next"), nil
}

Custom Spans

func(ctx context.Context, view graph.View) (*graph.Command, error) {
    tp := trace.FromContext(ctx)
    tracer := tp.Tracer("my-service")
    
    // Create custom span for sub-operation
    ctx, span := tracer.Start(ctx, "database-query",
        trace.Attr{Key: "query", Value: sql},
    )
    defer span.End(nil)
    
    // Execute operation
    results := queryDatabase(ctx, sql)
    
    return graph.To("next"), nil
}

Custom Metrics

func(ctx context.Context, view graph.View) (*graph.Command, error) {
    mp := metrics.FromContext(ctx)
    
    // Record counter
    processedCounter := mp.Counter("records.processed")
    processedCounter.Add(ctx, int64(len(data)),
        metrics.Attr{Key: "type", Value: "user_data"},
    )
    
    // Record histogram
    duration := mp.Histogram("operation.duration_ms")
    duration.Record(ctx, float64(elapsed.Milliseconds()))
    
    return graph.To("next"), nil
}

Metrics Reference

Automatically Collected

Metric Type Description Labels
agentgraph.node.executions Counter Node execution count node.name
agentgraph.node.latency_ms Histogram Node execution time node.name
agentgraph.node.errors Counter Node execution errors node.name, error.type
agentgraph.graph.executions Counter Graph execution count graph.name, success
agentgraph.superstep.latency_ms Histogram Superstep duration superstep, node_count

Zero Overhead

If you don’t configure providers, AgentMesh uses noop implementations with zero performance overhead:

// No providers = zero overhead
for msg, err := range compiled.Run(ctx, messages) {
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(msg.Content())
}

Benefits

Automatic instrumentation - No code changes needed
Explicit configuration - Clear, type-safe options
Production ready - OpenTelemetry compatible
Zero overhead - Noop providers when not configured
Custom instrumentation - Full provider access in nodes
Context propagation - Providers automatically available everywhere

Event Stream Backpressure

AgentMesh streams execution events through a dedicated safeEventChan so UIs and CLIs can render progress in real time. The channel now uses a non-blocking fast path (no timers allocated when there is buffer space) and a cancellable 100 ms timeout when consumers fall behind. This keeps observability overhead near zero during steady-state runs while still providing bounded backpressure if you pause or slow a listener.

  • Fast path – Immediate sends avoid creating time.After timers entirely.
  • Bounded waits – When the buffer is full, the runtime waits up to 100 ms before dropping the event.
  • No leaks – Cancellable timers ensure there are no stray goroutines or wakeups once the send succeeds or times out.

Tune pregel.DefaultEventChanBufferSize or implement client-side sampling if you need deeper buffers.

Examples

  • Middleware System - Extend execution with caching, retries, rate limiting, circuit breakers, and more
  • Streaming - Real-time event streaming for responsive UIs