Go 1.24+ GitHub

Streaming

πŸ€–πŸ•ΈοΈ Production-grade multi-agent orchestration framework powered by Pregel BSP. Build sophisticated AI workflows with parallel execution, state management, and observability.

Streaming

AgentMesh provides built-in support for streaming graph execution results in real-time. This enables responsive UIs, progress monitoring, and handling long-running workflows.


Overview

Streaming in AgentMesh works through:

  1. Iterators: The Run() method returns an iterator (iter.Seq2) that yields execution results
  2. Intermediate Updates: Nodes emit intermediate results via the StreamWriter
  3. Event Processing: Consumers iterate over results as they become available
import (
    "github.com/hupe1980/agentmesh/pkg/graph"
)

// Define state keys
var StatusKey = graph.NewKey("status", "")

// Create graph
g := graph.New[string, string](StatusKey)

g.Node("process", func(ctx context.Context, view graph.View) (*graph.Command, error) {
    // Get the stream writer from context
    streamWriter := graph.GetStreamWriter(ctx)
    
    // Emit intermediate updates
    if streamWriter != nil {
        streamWriter(graph.Updates{"progress": 0.5})
    }
    
    return graph.Set(StatusKey, "complete").To(graph.END), nil
}, graph.END)

g.Start("process")

compiled, _ := g.Build()

// Stream results
for result, err := range compiled.Run(ctx, "start") {
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Node: %s, Updates: %v\n", result.Node, result.Updates)
}

Core Concepts

ExecutionResult

Each yielded result contains:

type ExecutionResult[O any] struct {
    Node    string         // Name of the node that produced this result
    Updates map[string]any // State updates from this node
    Output  O              // Final output (on final iteration only)
}

StreamWriter

The StreamWriter allows nodes to emit intermediate results during execution:

type StreamWriter func(Updates)

type Updates map[string]any

---

## Basic Streaming

### Processing Results

```go
var CounterKey = graph.NewKey("counter", 0)

g := graph.New[int, int](CounterKey)

g.Node("counter", func(ctx context.Context, view graph.View) (*graph.Command, error) {
    streamWriter := graph.GetStreamWriter(ctx)
    
    for i := 1; i <= 10; i++ {
        time.Sleep(100 * time.Millisecond)
        
        // Emit progress updates
        if streamWriter != nil {
            streamWriter(graph.Updates{
                "progress": i * 10,
                "step":     i,
            })
        }
    }
    
    return graph.Set(CounterKey, 10).To(graph.END), nil
}, graph.END)

g.Start("counter")

compiled, _ := g.Build()

// Process each result as it arrives
for result, err := range compiled.Run(ctx, 0) {
    if err != nil {
        log.Fatal(err)
    }
    
    if result.Updates != nil {
        fmt.Printf("Progress: %v%%\n", result.Updates["progress"])
    }
}

Collecting All Results

// Process all results using iterator pattern
var results []graph.View
for result, err := range compiled.Run(ctx, input) {
    if err != nil {
        log.Fatal(err)
    }
    results = append(results, result)
}

for _, result := range results {
    // Access state from result view
    status := graph.Get(result, StatusKey)
    fmt.Printf("Status: %s\n", status)
}

Getting Final Result Only

// Get only the last (final) result
result, err := graph.Last(compiled.Run(ctx, input))
if err != nil {
    log.Fatal(err)
}

fmt.Printf("Final output: %v\n", result.Output)

Message Graph Streaming

For conversational AI workflows using MessageGraph:

g := message.NewGraphBuilder()

g.Node("assistant", func(ctx context.Context, view graph.View) (*graph.Command, error) {
    streamWriter := graph.GetStreamWriter(ctx)
    messages := message.GetMessages(view)
    
    // Stream LLM response tokens
    var fullResponse strings.Builder
    
    llmStream := model.StreamChat(ctx, messages)
    for chunk := range llmStream {
        fullResponse.WriteString(chunk.Content)
        
        // Stream each token to the client
        if streamWriter != nil {
            streamWriter(graph.Updates{
                "type":    "token",
                "content": chunk.Content,
            })
        }
    }
    
    response := message.NewAIMessageFromText(fullResponse.String())
    return graph.Append(message.MessagesKey, response).To(graph.END), nil
}, graph.END)

g.Start("assistant")

compiled, _ := g.Build()

// Stream chat tokens
for result, err := range compiled.Run(ctx, []message.Message{userMsg}) {
    if err != nil {
        log.Fatal(err)
    }
    
    if result.Updates["type"] == "token" {
        fmt.Print(result.Updates["content"]) // Print tokens as they arrive
    }
}

Multi-Node Streaming

Stream results across multiple nodes:

var (
    DataKey   = graph.NewListKey[string]("data")
    StatusKey = graph.NewKey("status", "")
)

g := graph.New[string, string](DataKey, StatusKey)

// First node: fetch data
g.Node("fetch", func(ctx context.Context, view graph.View) (*graph.Command, error) {
    streamWriter := graph.GetStreamWriter(ctx)
    
    if streamWriter != nil {
        streamWriter(graph.Updates{"stage": "fetching"})
    }
    
    time.Sleep(1 * time.Second) // Simulate fetch
    
    return graph.Set(StatusKey, "fetched").Append(DataKey, "item1", "item2").To("process"), nil
}, "process")

// Second node: process data
g.Node("process", func(ctx context.Context, view graph.View) (*graph.Command, error) {
    streamWriter := graph.GetStreamWriter(ctx)
    data := graph.GetList(view, DataKey)
    
    for i, item := range data {
        if streamWriter != nil {
            streamWriter(graph.Updates{
                "stage":    "processing",
                "item":     item,
                "progress": float64(i+1) / float64(len(data)),
            })
        }
        time.Sleep(500 * time.Millisecond)
    }
    
    return graph.Set(StatusKey, "complete").To(graph.END), nil
}, graph.END)

g.Start("fetch")

compiled, _ := g.Build()

// Track progress across all nodes
for result, err := range compiled.Run(ctx, "start") {
    if err != nil {
        log.Fatal(err)
    }
    
    if stage, ok := result.Updates["stage"]; ok {
        fmt.Printf("[%s] Stage: %s\n", result.Node, stage)
    }
}

Context Cancellation

Streaming respects context cancellation:

// Create cancellable context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Iteration stops when context is cancelled
for result, err := range compiled.Run(ctx, input) {
    if err != nil {
        if errors.Is(err, context.DeadlineExceeded) {
            fmt.Println("Execution timed out")
        }
        break
    }
    processResult(result)
}

Best Practices

1. Always Check StreamWriter

func processNode(ctx context.Context, view graph.View) (*graph.Command, error) {
    streamWriter := graph.GetStreamWriter(ctx)
    
    // Always check if streaming is enabled
    if streamWriter != nil {
        streamWriter(graph.Updates{"status": "starting"})
    }
    
    return graph.Set(StatusKey, "done").To(graph.END), nil
}

2. Throttle High-Frequency Updates

// ❌ Bad - too many events
for i := 0; i < 1000000; i++ {
    if streamWriter != nil {
        streamWriter(graph.Updates{...})  // Don't do this!
    }
}

// βœ… Good - throttle updates
for i := 0; i < 1000000; i++ {
    if i % 1000 == 0 && streamWriter != nil {
        streamWriter(graph.Updates{"progress": i})
    }
}

3. Use Structured Updates

// βœ… Good - structured, predictable updates
streamWriter(graph.Updates{
    "stage":    "processing",
    "progress": 0.5,
    "message":  "Processing batch 2/4",
})

// ❌ Bad - inconsistent structure
streamWriter(graph.Updates{"status": "working"})
streamWriter(graph.Updates{"pct": 50})

4. Emit Meaningful Events

// βœ… Good - provides useful information
streamWriter(graph.Updates{
    "operation":    "database_query",
    "rows_fetched": 1500,
    "duration_ms":  234,
})

// ❌ Bad - not useful
streamWriter(graph.Updates{"x": 1})

Integration with UI Frameworks

React/Web Frontend

// Frontend code to consume stream
async function executeGraph(messages: Message[]) {
    const response = await fetch('/api/graph/stream', {
        method: 'POST',
        body: JSON.stringify({ messages }),
    });
    
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    
    while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        const chunk = decoder.decode(value);
        const event = JSON.parse(chunk);
        
        // Update UI based on event
        if (event.Updates) {
            updateProgress(event.Updates);
        }
    }
}

HTTP Server (SSE)

func streamHandler(w http.ResponseWriter, r *http.Request) {
    // Set SSE headers
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "Streaming not supported", http.StatusInternalServerError)
        return
    }

    // Execute graph and stream events
    seq := compiled.Run(r.Context(), messages)

    // Forward events to SSE
    for event, err := range seq {
        if err != nil {
            // Handle error, maybe send an SSE error event
            break
        }
        
        data, _ := json.Marshal(event)
        fmt.Fprintf(w, "data: %s\n\n", data)
        flusher.Flush()
    }
}

Performance Considerations

Memory Usage

  • Each execution result allocates memory for the ExecutionResult struct
  • Results are yielded via iterator - minimal memory overhead
  • For high-frequency updates, consider throttling or batching

Iterator Benefits

  • Lazy Evaluation: Results processed on-demand
  • Low Memory: Only current result in memory
  • Early Termination: Break from loop to stop execution
  • Context Cancellation: Respects context cancellation immediately

Concurrency

  • The iterator pattern is inherently sequential for the consumer
  • Internal execution (e.g., Pregel workers) runs in parallel
  • StreamWriter can be called from multiple goroutines safely
  • Results are serialized through the iterator

Execution Patterns

All graph execution uses the same Run() method that returns an iterator:

// Process all results
for result, err := range compiled.Run(ctx, messages) {
    if err != nil {
        return err
    }
    handleResult(result)
}

// Collect all results manually
var results []message.Message
for msg, err := range compiled.Run(ctx, messages) {
    if err != nil {
        return err
    }
    results = append(results, msg)
}

// Get only the last result
var lastMsg message.Message
for msg, err := range compiled.Run(ctx, messages) {
    if err != nil {
        return err
    }
    lastMsg = msg
}

Advanced Topics

Custom Event Types

You can include custom metadata in intermediate updates:

streamWriter(graph.Updates{
    "event_type":  "custom_metric",
    "metric_name": "tokens_per_second",
    "value":       125.3,
    "timestamp":   time.Now(),
})

Conditional Streaming

var VerboseKey = graph.NewKey("verbose", false)

g.Node("process", func(ctx context.Context, view graph.View) (*graph.Command, error) {
    streamWriter := graph.GetStreamWriter(ctx)
    verbose := graph.Get(view, VerboseKey)
    
    for _, item := range items {
        processItem(item)
        
        // Only stream if verbose mode enabled
        if verbose && streamWriter != nil {
            streamWriter(graph.Updates{"processed": item})
        }
    }
    
    return graph.Set(StatusKey, "done").To(graph.END), nil
}, graph.END)

Error Handling

g.Node("processor", func(ctx context.Context, view graph.View) (*graph.Command, error) {
    streamWriter := graph.GetStreamWriter(ctx)
    
    // Errors in intermediate updates don't stop execution
    if streamWriter != nil {
        streamWriter(graph.Updates{
            "warning": "Rate limit approaching",
        })
    }
    
    // Returning error stops execution
    if criticalError != nil {
        return nil, fmt.Errorf("critical: %w", criticalError)
    }
    
    return graph.Set(StatusKey, "done").To(graph.END), nil
}, graph.END)

See Also