AgentMesh is built on a Pregel-inspired bulk-synchronous parallel (BSP) graph execution engine. This architecture enables deterministic, scalable multi-agent workflows with parallel execution and efficient state management.
Component Architecture Overview
AgentMesh follows a clean, interface-based architecture with strict separation of concerns:
Reasoning + Acting"] Supervisor["SupervisorAgent
Multi-agent coordination"] RAG["RAGAgent
Retrieval-Augmented"] end subgraph GraphLayer["Graph Layer"] Builder["Graph Builder
β’ Validates topology
β’ Fluent API"] Compiled["Compiled
β’ Immutable topology
β’ Run() β events
β’ Pure delegation"] end subgraph Interfaces["Core Interfaces"] Structure["Structure
β’ Nodes/edges
β’ Topology queries"] Executor["Executor
β’ Run()
β’ CurrentSuperstep()"] StateManager["StateManager
β’ State persistence
β’ Message handling"] end subgraph Executors["Executor Implementations"] Pregel["PregelExecutor
β’ BSP Supersteps
β’ Parallel workers
β’ Message bus"] Simple["SequentialExecutor
β’ Topological order
β’ Single-threaded"] end Application --> GraphLayer Builder -->|"Build()"| Compiled Compiled --> Structure Compiled --> Executor Compiled --> StateManager Executor --> Pregel Executor --> Simple style Application fill:#1e40af,stroke:#3b82f6,color:#fff style GraphLayer fill:#0f766e,stroke:#14b8a6,color:#fff style Interfaces fill:#7c3aed,stroke:#a78bfa,color:#fff style Executors fill:#b45309,stroke:#f59e0b,color:#fff
Key Design Principles:
- Clean Architecture: No special cases or type switching
Compiled.Run()simply delegates toexecutor.Run()- All executors treated uniformly through interface
- No coupling between Compiled and specific executor implementations
- Interface-Based Design: Both State and Execution are abstracted
Structure: Read-only topology access for executorsStateManager: Mutable state managementExecutor: Pluggable execution strategies
- Self-Contained Executors: Each executor owns its execution logic
- PregelExecutor manages BSP coordination, workers, message bus
- SequentialExecutor manages sequential execution
- No shared execution state between executor types
- Separation of Concerns:
- Graph: Construction and validation
- Compiled: Topology storage and coordination
- Executor: Execution strategy and runtime management
- StateManager: State persistence and message handling
- Extensibility: Easy to add new execution strategies
- Implement
Executorinterface - No changes to Compiled or Graph needed
- Full access to topology via Structure interface
- Implement
Execution Abstraction Layer
AgentMesh uses an executor pattern to separate execution concerns from orchestration:
Model Execution (pkg/model/executor.go):
model.Executorinterface: Handles model generation lifecycleDefaultExecutor: Standard implementation with plugins, observability, streaming- Custom executors: Retry, caching, rate limiting, circuit breakers
- Unified Interface:
iter.Seq2[*Response, error]for streaming and non-streaming
Tool Execution (pkg/tool/executor.go):
tool.Executorinterface: Handles tool execution lifecycletool.NewExecutor: Default sequential executor, one tool at a timeParallelExecutor: Concurrent execution with optional concurrency limits- Custom executors: Caching, batching, circuit breakers
- Arguments as JSON Strings:
Call.Argumentsisstring(notmap[string]any)- Eliminates wasteful marshal/unmarshal cycles
- Arguments flow as JSON from LLM β ToolCall β Executor β Tool
Benefits:
- β Reusability: Use executors in graphs, chains, or direct calls
- β Testability: Test execution independently from graph/state
- β Extensibility: Custom implementations without modifying core
- β Performance: Arguments stay as JSON strings (no extra conversions)
- β Clean Boundaries: Nodes are thin orchestration layers (~130-180 lines)
The rest of this document explores the Pregel BSP execution engine (PregelExecutor) that powers the framework.
Pregel BSP model
Inspired by Googleβs Pregel paper, AgentMesh executes graphs using a Bulk Synchronous Parallel (BSP) execution model. This provides a powerful foundation for complex agent workflows with loops, conditions, and parallel execution.
What is BSP?
BSP divides computation into discrete supersteps, each consisting of three phases:
- Compute Phase β All ready vertices (nodes) execute in parallel
- Message Passing β Vertices send messages to other vertices via a mailbox system
- Synchronization Barrier β Wait for all vertices to complete before the next superstep
This model provides:
- β‘ Parallel execution of independent nodes (~6ΞΌs overhead per node)
- π Deterministic ordering within supersteps
- π Easy reasoning about distributed state
- π Automatic checkpointing at superstep boundaries
- π Natural support for iterative algorithms (loops, refinement)
Superstep Execution Flow
ready nodes"] B["2. Execute nodes
in parallel"] C["3. Apply state
updates"] D["4. Message delivery
phase"] E["5. Synchronization
barrier"] end A --> B B --> C C --> D D --> E E -->|"Next superstep"| A E -->|"END reached"| F["Complete"] style A fill:#1e40af,stroke:#3b82f6,color:#fff style B fill:#059669,stroke:#10b981,color:#fff style C fill:#7c3aed,stroke:#a78bfa,color:#fff style D fill:#b45309,stroke:#f59e0b,color:#fff style E fill:#dc2626,stroke:#f87171,color:#fff style F fill:#16a34a,stroke:#22c55e,color:#fff
Phase Details:
- Scheduler identifies ready nodes (nodes with all dependencies met)
- Execute nodes in parallel using worker pool; each vertex reads its mailbox
- Apply state updates immediately to shared StateManager (in-memory)
- Message delivery evaluates conditional routes and sends to downstream mailboxes
- Synchronization barrier waits for all vertices, saves checkpoint
Why BSP for Agent Workflows?
Traditional agent frameworks use sequential DAG execution, which limits expressiveness:
β Sequential DAG limitations:
- No loops (canβt retry or refine)
- No cycles (canβt implement feedback)
- No iterative refinement
β BSP advantages:
- β Natural loops via mailbox messages and multiple supersteps
- β Conditional routing can create cycles
- β Iterative refinement (agent tries β evaluator judges β agent retries)
- β Parallel execution when dependencies allow
- β Deterministic execution despite parallelism
Example: Iterative Refinement
import "github.com/hupe1980/agentmesh/pkg/graph"
var DraftKey = graph.NewKey[string]("draft", "")
var FeedbackKey = graph.NewKey[string]("feedback", "")
var DoneKey = graph.NewKey[bool]("done", false)
g := graph.New[string, string](DraftKey, FeedbackKey, DoneKey)
// Writer node generates drafts
g.Node("writer", func(ctx context.Context, view graph.View) (*graph.Command, error) {
feedback := graph.Get(view, FeedbackKey)
draft := generateDraft(feedback)
return graph.Set(DraftKey, draft).To("evaluator"), nil
}, "evaluator")
// Evaluator checks quality and creates a cycle!
g.Node("evaluator", func(ctx context.Context, view graph.View) (*graph.Command, error) {
draft := graph.Get(view, DraftKey)
if isGoodEnough(draft) {
return graph.Set(DoneKey, true).To(graph.END), nil
}
// Loop back to writer - creates a cycle!
return graph.Set(FeedbackKey, "improve clarity").To("writer"), nil
}, "writer", graph.END)
g.Start("writer")
compiled, _ := g.Build()
This creates a loop where the writer improves the draft based on evaluator feedback, executing over multiple supersteps until quality is acceptable.
Graph Introspection
AgentMesh provides a comprehensive introspection API for debugging, monitoring, and visualizing compiled graphs:
Introspection Methods
// Basic inspection
nodes := compiled.GetNodes() // List all node names
info, _ := compiled.GetNodeInfo("my_node") // Node metadata
// Topology analysis
topo := compiled.GetTopology()
fmt.Printf("Entry points: %v\n", topo.EntryPoints)
fmt.Printf("Exit points: %v\n", topo.ExitPoints)
fmt.Printf("Max depth: %d\n", topo.MaxDepth)
// Graph metrics
metrics := compiled.GetMetrics()
fmt.Printf("Cyclomatic complexity: %d\n", metrics.CyclomaticComplexity)
fmt.Printf("Completed nodes: %v\n", metrics.CompletedNodes)
// Execution paths
paths := compiled.GetExecutionPath(100)
for i, path := range paths {
fmt.Printf("Path %d: %v\n", i+1, path)
}
Mermaid Flowchart Generation
Generate visual diagrams of your graph structure:
// Generate flowchart with top-down layout
flowchart := compiled.GenerateMermaidFlowchart("TD")
os.WriteFile("graph.mmd", []byte(flowchart), 0644)
// Supported directions: TD (top-down), LR (left-right), BT, RL
The generated Mermaid syntax includes:
- Stadium shapes for START/END nodes
- Diamond shapes for conditional nodes
- Rectangle shapes for standard nodes
- Solid arrows for direct edges
- Dashed arrows for conditional branches
See the graph_introspection example for complete usage.
Scheduler architecture
The scheduler is the brain of the execution engine, determining which nodes can execute in each superstep. Itβs composed of four specialized components that work together:
Component Overview
βββββββββββββββββββββββββββββββββββββββββββββββββββββ
β vertexScheduler (Orchestrator) β
β Coordinates all scheduling decisions β
βββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
ββββ Delegates to ββββ
β β
ββββββΌβββββββββββββ βββββΌβββββββββββββββ
β Topology β β Conditional β
β Scheduler β β Evaluator β
β β β β
β - DAG tracking β β - Route logic β
β - In-degrees β β - Gate checks β
β - Dependencies β β - Dynamic edges β
ββββββ¬βββββββββββββ βββββ¬βββββββββββββββ
β β
ββββββΌβββββββββββββ βββββΌβββββββββββββββ
β Execution β β Pause State β
β Tracker β β β
β β β - HITL support β
β - History β β - Manual gates β
β - Completed β β - Debugging β
βββββββββββββββββββ ββββββββββββββββββββ
1. TopologyScheduler: DAG Dependency Tracking
The TopologyScheduler maintains the directed acyclic graph (DAG) structure and tracks dependencies using in-degree counting:
How it works:
- Each node has an in-degree (number of incoming edges)
- A node is ready when its in-degree reaches 0 (all dependencies satisfied)
- When a node executes, it decrements the in-degree of its successors
- This naturally handles parallel execution (multiple nodes can reach in-degree 0 simultaneously)
Example:
Initial state:
START β A (in-degree: 1)
START β B (in-degree: 1)
A β C (in-degree: 1)
B β C (in-degree: 2) β Note: C depends on both A and B
Superstep 0:
Ready: [START] (in-degree = 0)
Execute: START
After execution: A (in-degree: 0), B (in-degree: 0)
Superstep 1:
Ready: [A, B] β Both ready simultaneously
Execute: A, B in parallel
After execution: C (in-degree: 0) β Now both dependencies satisfied
Superstep 2:
Ready: [C]
Execute: C
2. ConditionalEvaluator: Dynamic Routing
The ConditionalEvaluator handles conditional edges that determine routing at runtime:
How it works:
- Conditional edges are evaluated based on state
- The evaluator maintains βgate statusβ for conditional branches
- A node with conditional incoming edges only executes when its gate is open
Example:
g.Node("classifier", func(ctx context.Context, view graph.View) (*graph.Command, error) {
messages := message.GetMessages(view)
category := analyzeInput(messages)
// Return different paths based on runtime data
if category == "urgent" {
return graph.Set(CategoryKey, category).To("urgent_handler"), nil
}
return graph.Set(CategoryKey, category).To("standard_handler"), nil
}, "urgent_handler", "standard_handler")
3. ExecutionTracker: History and State
The ExecutionTracker maintains execution history for:
- Observability: Track execution order for debugging
- Cycle Detection: Detect infinite loops (node executed too many times)
- Resume Support: When resuming from checkpoint, replay history to restore state
- Metrics: Count executions per node for performance analysis
4. Pause State: Human-in-the-Loop Support
The pause mechanism enables human-in-the-loop workflows:
Use cases:
- Manual approval gates
- Human review of agent decisions
- Interactive debugging
- A/B testing (pause and compare branches)
Scheduler Coordination
The vertexScheduler orchestrates all components:
func (s *vertexScheduler) Ready() []string {
// 1. Get topologically ready nodes
candidates := s.topology.Ready()
ready := []string{}
for _, name := range candidates {
// 2. Filter out paused nodes
if s.paused[name] {
continue
}
// 3. Filter out nodes with closed conditional gates
if !s.evaluator.IsGateOpen(name) {
continue
}
ready = append(ready, name)
}
return ready
}
Order of evaluation:
- β Topology: Is the nodeβs DAG dependencies satisfied?
- β Pause: Is the node manually paused?
- β Conditional: Is the nodeβs conditional gate open?
Only nodes passing all three checks are ready to execute.
Runtime execution engine
The Pregel Runtime is the low-level execution engine that orchestrates superstep execution, manages mailboxes, and coordinates worker threads.
Superstep Execution
Each superstep follows this precise sequence:
1. Initialize
ββ Get ready vertices from scheduler
ββ Create worker pool (size = MaxWorkers)
ββ Prepare mailboxes for reading
2. Compute Phase (Parallel)
ββ Worker 1: Execute vertex A
β ββ Read mailbox messages
β ββ Read shared state
β ββ Call node's function
β ββ Produce Command
β
ββ Worker 2: Execute vertex B (concurrent)
ββ Worker N: Execute vertex N (concurrent)
3. Synchronization
ββ Wait for all workers to complete
ββ Collect all Commands
4. State Update (Sequential)
ββ Apply state updates atomically
ββ Update aggregators
5. Message Delivery (Sequential)
ββ Evaluate conditional routes
ββ Determine next nodes for each result
ββ Update scheduler state
6. Checkpoint (if configured)
ββ Save state snapshot
ββ Store superstep number
7. Check Termination
ββ END node reached?
ββ Max iterations exceeded?
ββ Context cancelled?
ββ No ready vertices remaining?
8. Emit Stream Event
ββ Send superstep completion event
Key invariants:
- β All vertices in a superstep execute with the same state snapshot
- β State updates are applied atomically after all vertices complete
- β Messages sent in superstep N are not visible until superstep N+1
- β No vertex sees partial state from another vertexβs execution
Worker Pool Pattern
The runtime uses a fixed worker pool to control parallelism and prevent unbounded goroutine creation:
Benefits:
- β‘ Parallel execution when topology allows
- π Resource control - Creates exactly
MaxWorkersgoroutines - π Predictable performance - No unbounded goroutine spawning
- πΎ Fixed memory usage - Stack memory scales with
MaxWorkers, not frontier size
Tuning guidance:
- CPU-bound nodes:
MaxWorkers = runtime.NumCPU() - I/O-bound nodes (API calls):
MaxWorkers = 2-4x runtime.NumCPU() - Mixed workload:
MaxWorkers = runtime.NumCPU() + small buffer
Graph API
The graph package provides a fluent API for constructing agent workflows:
Basic Structure
import "github.com/hupe1980/agentmesh/pkg/graph"
// Define typed state keys
var StatusKey = graph.NewKey[string]("status", "")
var CountKey = graph.NewKey[int]("count", 0)
// Create graph with keys
g := graph.New[string, string](StatusKey, CountKey)
// Add nodes with fluent API
g.Node("process", func(ctx context.Context, view graph.View) (*graph.Command, error) {
count := graph.Get(view, CountKey)
return graph.Set(StatusKey, "done").
Set(CountKey, count+1).
To(graph.END), nil
}, graph.END)
// Set entry point
g.Start("process")
// Compile into executable graph
compiled, err := g.Build()
MessageGraph for Agents
For agent workflows with message handling:
g := message.NewGraphBuilder()
g.Node("agent", func(ctx context.Context, view graph.View) (*graph.Command, error) {
messages := message.GetMessages(view)
response := processMessages(messages)
return graph.Append(message.MessagesKey, response).To(graph.END), nil
}, graph.END)
g.Start("agent")
compiled, _ := g.Build()
Conditional Routing
Routes are determined dynamically using commands:
g.Node("classifier", func(ctx context.Context, view graph.View) (*graph.Command, error) {
category := graph.Get(view, CategoryKey)
switch category {
case "urgent":
return graph.To("urgent_handler"), nil
case "research":
return graph.To("researcher"), nil
default:
return graph.To("default_handler"), nil
}
}, "urgent_handler", "researcher", "default_handler")
Parallel Execution
Independent nodes automatically execute in parallel based on topology:
// START fans out to three parallel workers
g.Start("start")
g.Node("start", func(ctx context.Context, view graph.View) (*graph.Command, error) {
return graph.To("analyst_a", "analyst_b", "analyst_c"), nil
}, "analyst_a", "analyst_b", "analyst_c")
// All converge to aggregator
g.Node("analyst_a", analyzeA, "aggregator")
g.Node("analyst_b", analyzeB, "aggregator")
g.Node("analyst_c", analyzeC, "aggregator")
State management
AgentMesh uses a type-safe state system with compile-time guarantees.
Type-Safe Keys
Define typed state keys for compile-time type safety:
// Single value keys
var StatusKey = graph.NewKey[string]("status", "")
var CounterKey = graph.NewKey[int]("counter", 0)
var ConfigKey = graph.NewKey[Config]("config", Config{})
// List keys
var TagsKey = graph.NewListKey[string]("tags")
var MessagesKey = message.MessagesKey // Built-in message list key
Reading State
Nodes receive immutable state views:
g.Node("reader", func(ctx context.Context, view graph.View) (*graph.Command, error) {
// Type-safe reads (no type assertions)
status := graph.Get(view, StatusKey) // string
counter := graph.Get(view, CounterKey) // int
tags := graph.GetList(view, TagsKey) // []string
return graph.To("next"), nil
}, "next")
Updating State
Nodes return commands with state updates:
g.Node("updater", func(ctx context.Context, view graph.View) (*graph.Command, error) {
return graph.Set(StatusKey, "complete").
Set(CounterKey, 42).
Append(TagsKey, "new-tag").
To(graph.END), nil
}, graph.END)
Channel Types
Internally, state uses different channel types:
- TopicChannel β Accumulates messages (append-only list)
- LastValueChannel β Stores only the most recent value (overwrite semantics)
- BinaryOpChannel β Merges values using custom operators (sum, max, concat, etc.)
Execution lifecycle
1. Initialization
compiled, err := g.Build()
The compiler validates the graph topology, checks for cycles, and prepares the execution scheduler.
2. Invocation
// Streaming execution - process results as they arrive
for result, err := range compiled.Run(ctx, input) {
if err != nil {
log.Fatal(err)
}
// Access state from result view
status := graph.Get(result, StatusKey)
fmt.Printf("Status: %s\n", status)
}
3. Superstep Execution
For each superstep:
- Scheduler identifies ready nodes (all dependencies satisfied)
- Nodes execute in parallel (up to worker pool size)
- State updates are applied atomically
- Conditional routes are evaluated
- Checkpoint is saved (if configured)
- Process repeats until END or max iterations
4. Result Collection
Final output is returned from the graph:
// Get last result using iterator
var lastResult graph.View
for result, err := range compiled.Run(ctx, input) {
if err != nil {
log.Fatal(err)
}
lastResult = result
}
// lastResult contains the final state view
Performance characteristics
The graph engine is optimized for low-latency, high-throughput execution:
- ~6ΞΌs overhead per node β Minimal execution overhead from the scheduler
- O(1) ready vertex lookup β Maintained ready queue for constant-time vertex retrieval
- O(1) aggregate updates β Lazy copy-on-write caching for aggregate snapshots
- Lock-free state reads β
sync.Map-based ChannelRegistry for concurrent reads without contention - Parallel node execution β Independent nodes run concurrently
- Lock splitting β Reduced contention via channel-specific locks
- Efficient checkpointing β Copy-on-write state snapshots
- Configurable workers β Tune parallelism based on workload
Scheduler Optimization
The TopologyScheduler uses a maintained ready queue for constant-time vertex lookup:
| Operation | Complexity | Notes |
|---|---|---|
Ready() |
O(1) | Returns pre-maintained queue |
MarkExecuted() |
O(d log n) | d = out-degree, maintains sorted queue |
| Memory | O(n + k) | k = ready vertices (typically small) |
Benefits for large graphs:
- Constant-time ready vertex retrieval eliminates per-superstep iteration
- Especially beneficial for iterative algorithms with many supersteps
- Memory overhead negligible (only ready vertices in queue)
- 10,000 nodes Γ 100 supersteps: ~1M iterations avoided
Lock-Free Channel Registry
The ChannelRegistry uses sync.Map for lock-free concurrent reads, enabling true parallel state access:
| Operation | Implementation | Characteristics |
|---|---|---|
GetChannel() |
Lock-free read | O(1), no contention |
GetChannelValue() |
Lock-free read | O(1), no contention |
RegisterChannel() |
Lightweight mutex | Write operations use simple mutex |
BSP Execution Pattern:
In bulk-synchronous parallel execution, all workers read state simultaneously at superstep boundaries:
Parallel State Reads (sync.Map):
Worker 1: [read]βββββββββββββββββββββββββββββββββ
Worker 2: [read]βββββββββββββββββββββββββββββββββ
Worker 3: [read]βββββββββββββββββββββββββββββββββ
Worker N: [read]βββββββββββββββββββββββββββββββββ
β All workers read concurrently without locks
Performance characteristics:
- Lock-free reads: No mutex acquisition for read operations
- Linear scalability: Read throughput scales with worker count
- BSP-optimized: Designed for read-heavy workloads with burst access patterns
Benchmark Results
BenchmarkOptimized 100000 6147 ns/op ~6ΞΌs per node
BenchmarkChannelOnly 100000 7432 ns/op
BenchmarkBaseline 100000 12891 ns/op