Message-Based Graphs: AgentMesh graphs use
[]message.Messageas input andmessage.Messageas output. Usegraph.New(keys...)for building graphs andgraph.Build()to compile them. Usemessage.NewGraphBuilder()for conversational agents.
Runnable interface
The graph Run method is the core abstraction for executable workflows in AgentMesh. Only compiled graphs can be executed - this separation ensures graphs are validated before running.
Builder vs Graph
AgentMesh separates graph building from graph execution:
// Builder is for construction - define nodes and edges
b := graph.New(keys...)
b.Node("fetch", fetchFunc, "process")
b.Start("fetch")
// Graph is the compiled executor - run the workflow
compiled, err := b.Build() // Validates and compiles
if err != nil {
return err
}
// Only compiled Graph has Run()
for output, err := range compiled.Run(ctx, input) {
// process outputs...
}
Interface pattern
// Graph.Run returns an iterator of outputs
func (g *Graph) Run(ctx context.Context, input []message.Message, opts ...RunOption) iter.Seq2[message.Message, error]
Message types:
- Input:
[]message.Message- Conversation history - Output:
message.Message- Response messages
Common type aliases
For conversational agents, AgentMesh provides:
// GraphBuilder is a builder for message-processing workflows
type GraphBuilder = graph.Builder
// Graph is an executable message-processing workflow
type Graph = graph.Graph
Usage example
All agent constructors return *message.Graph (already compiled):
import (
"github.com/hupe1980/agentmesh/pkg/agent"
"github.com/hupe1980/agentmesh/pkg/message"
)
// Agent constructors return *message.Graph (ready to run)
reactAgent, err := agent.NewReAct(model, agent.WithTools(tools...))
if err != nil {
return err
}
// Execute with iterator pattern - no Build() needed!
for msg, err := range reactAgent.Run(ctx, messages) {
if err != nil {
return err
}
fmt.Println(msg.Content())
}
Benefits
Compile-time type safety:
// ✅ Type-safe: message.Graph accepts []message.Message
reactAgent.Run(ctx, messages)
// ❌ Compile error: won't accept wrong input type
reactAgent.Run(ctx, "invalid input")
// ❌ Compile error: can't run uncompiled builder
b := message.NewGraphBuilder()
b.Run(ctx, messages) // Error: Builder has no Run method
Easy composition:
// All agents are *message.Graph - compose freely
worker1, _ := agent.NewReAct(model)
worker2, _ := agent.NewReAct(model)
supervisor, _ := agent.NewSupervisor(model,
agent.WithWorker("researcher", "Does research", worker1),
agent.WithWorker("writer", "Writes content", worker2),
)
Graphs and nodes
AgentMesh uses a directed graph model where computation flows through connected nodes.
What is a graph?
A graph consists of:
- Nodes - Computational units that process data
- Edges - Connections that define execution order (declared as node targets)
- State - Shared context accessible via typed keys
Building a graph
import "github.com/hupe1980/agentmesh/pkg/graph"
// Define typed state keys (zero value is automatic)
var (
RawDataKey = graph.NewKey[string]("raw_data")
ProcessedDataKey = graph.NewKey[string]("processed_data")
StatusKey = graph.NewKey[string]("status")
)
// Create graph with state keys
g := graph.New(RawDataKey, ProcessedDataKey, StatusKey)
// Add nodes with fluent API - targets are declared inline
g.Node("fetch", fetchDataFunc, "process").
Node("process", processDataFunc, "save").
Node("save", saveDataFunc, graph.END).
Start("fetch")
// Compile into executable graph
compiled, err := g.Build()
if err != nil {
return err
}
// Run the graph
for output, err := range compiled.Run(ctx, "input data") {
if err != nil {
return err
}
fmt.Println(output)
}
Node functions
Nodes receive a Scope (providing state access and streaming) and return a Command with updates and next targets:
// NodeFunc signature
type NodeFunc func(ctx context.Context, scope Scope) (*Command, error)
// Example node function
func processDataFunc(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
// Read from state using typed keys
rawData := graph.Get(scope, RawDataKey)
// Process the data
processed := strings.ToUpper(rawData)
// Return updates and next target using fluent API
return graph.Set(ProcessedDataKey, processed).
Set(StatusKey, "processed").
To("save"), nil
}
Special nodes
- START - Entry point (set via
Start()) - END - Terminal node constant (
graph.END)
Conditional routing
Dynamically route to different nodes based on state:
var CategoryKey = graph.NewKey[string]("category")
func classifierFunc(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
category := graph.Get(scope, CategoryKey)
switch category {
case "urgent":
return graph.To("urgent_handler"), nil
case "normal":
return graph.To("normal_handler"), nil
default:
return graph.To("default_handler"), nil
}
}
// Node declares all possible targets
g.Node("classifier", classifierFunc, "urgent_handler", "normal_handler", "default_handler")
State management
State is shared across all nodes using typed keys for compile-time safety.
Defining state keys
import "github.com/hupe1980/agentmesh/pkg/graph"
// Key[T] - single value with ReplaceReducer (last write wins)
var (
CounterKey = graph.NewKey[int]("counter")
StatusKey = graph.NewKey[string]("status")
)
// ListKey[T] - append-only list with AppendReducer
var MessagesKey = graph.NewListKey[message.Message]("messages")
// CounterKey - sum reducer for numeric accumulation
var TotalKey = graph.NewCounterKey("total")
// MapKey - merge reducer for map values
var MetadataKey = graph.NewMapKey[string, any]("metadata")
Reading state
Nodes receive immutable state views with typed access:
func myNode(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
// Type-safe reads - returns the correct type
counter := graph.Get(scope, CounterKey) // int
status := graph.Get(scope, StatusKey) // string
messages := graph.GetList(scope, MessagesKey) // []message.Message
return graph.To("next_node"), nil
}
Updating state
Use the fluent Command builder for type-safe updates:
func myNode(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
counter := graph.Get(scope, CounterKey)
// Fluent, type-safe updates
return graph.Set(CounterKey, counter + 1).
With(graph.SetValue(StatusKey, "processing")).
To("next_node")
}
// For list keys, Set with a slice - the reducer handles appending
func addMessageNode(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
newMsg := message.NewAIMessage(message.NewTextPart("Hello!"))
return graph.Set(MessagesKey, []message.Message{newMsg}).
To("next_node")
}
Reducers
Reducers define how state values merge when updated. Each key type has a default reducer:
| Key Type | Default Reducer | Behavior |
|---|---|---|
NewKey[T]() |
ReplaceReducer |
Overwrites (last write wins) |
NewListKey[T]() |
AppendReducer |
Appends to existing slice |
NewCounterKey() |
SumReducer |
Adds values together |
NewMapKey[K,V]() |
MergeMapReducer |
Merges maps |
// Custom reducer example
var HighScoreKey = graph.NewKey[int]("high_score",
graph.WithReducer(graph.MaxReducer[int]{})) // Keep maximum value
var LogsKey = graph.NewListKey[string]("logs",
graph.WithReducer(graph.PrependReducer[string]{})) // Insert at front
See State Management: Reducers for the complete reducer reference.
MessageGraph convenience
For conversational agents, use message.NewGraphBuilder() which automatically includes the messages key:
// Creates a graph with MessagesKey pre-registered
g := message.NewGraphBuilder()
// MessagesKey is available
g.Node("chat", func(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
messages := message.GetMessages(scope)
// ... process messages
return graph.Set(message.MessagesKey, []message.Message{response}).To(graph.END), nil
}, graph.END)
Execution flow
AgentMesh executes graphs using Pregel-style bulk synchronous parallel (BSP) processing.
Supersteps
Execution proceeds in discrete supersteps:
- Identify ready nodes - Nodes with satisfied dependencies
- Execute in parallel - Ready nodes run concurrently
- Apply updates - State changes applied atomically
- Repeat - Until END node or max iterations
Parallel execution
Nodes can fan out to multiple parallel tasks:
g := graph.New(ResultKey)
// Entry node fans out to three parallel tasks
g.Node("start", func(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
return graph.To("fetch_a", "fetch_b", "fetch_c"), nil
}, "fetch_a", "fetch_b", "fetch_c")
// Each fetch task routes to aggregator
g.Node("fetch_a", fetchAFunc, "aggregator").
Node("fetch_b", fetchBFunc, "aggregator").
Node("fetch_c", fetchCFunc, "aggregator").
Node("aggregator", aggregateFunc, graph.END).
Start("start")
Cycles and loops
Unlike DAG-based systems, AgentMesh supports cycles for iterative workflows:
var (
DraftKey = graph.NewKey[string]("draft")
IterationKey = graph.NewCounterKey("iteration") // Sum reducer for incrementing
)
func writerFunc(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
iteration := graph.Get(scope, IterationKey)
draft := generateDraft(iteration)
return graph.Set(DraftKey, draft).
Set(IterationKey, iteration + 1).
To("evaluator"), nil
}
func evaluatorFunc(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
draft := graph.Get(scope, DraftKey)
iteration := graph.Get(scope, IterationKey)
if isGoodEnough(draft) || iteration >= 5 {
return graph.To(graph.END), nil
}
// Loop back for refinement
return graph.To("writer"), nil
}
g := graph.New(DraftKey, IterationKey)
g.Node("writer", writerFunc, "evaluator").
Node("evaluator", evaluatorFunc, graph.END, "writer"). // declares both targets
Start("writer")
Max iterations
Prevent infinite loops with run options:
for output, err := range compiled.Run(ctx, input,
graph.WithMaxIterations(10),
) {
// ...
}
Messages
Messages represent conversation turns between users, AI, and tools.
Message types
import "github.com/hupe1980/agentmesh/pkg/message"
// Human input (simple text)
humanMsg := message.NewHumanMessageFromText("What's the weather?")
// AI response (simple text)
aiMsg := message.NewAIMessageFromText("It's sunny and 72°F")
// System prompt (simple text)
systemMsg := message.NewSystemMessageFromText("You are a helpful assistant")
// For multi-part messages, use Parts slice
multiPart := message.NewHumanMessage([]message.Part{
message.TextPart{Text: "Describe this image:"},
message.FilePart{MimeType: "image/png", File: message.FileURI{URI: imageURL}},
})
// Tool call
toolCall := message.ToolCall{
ID: "call_123",
Name: "get_weather",
Type: "function",
Arguments: `{"location":"Paris"}`, // JSON string
}
aiWithTool := message.NewAIMessage(
[]message.Part{message.TextPart{Text: "Let me check"}},
message.WithToolCalls(toolCall),
)
// Tool result
toolMsg := message.NewToolMessage("call_123", "Sunny, 22°C")
Message parts
Messages can contain multiple parts:
aiMsg := message.NewAIMessage([]message.Part{
message.TextPart{Text: "Here's the weather"},
message.FilePart{
MimeType: "image/png",
File: message.FileURI{URI: imageURL},
},
})
// Quick text extraction using String()
fmt.Println(aiMsg.String()) // "Here's the weather[file: (image/png)]"
// Access individual parts when you need type-specific handling
for _, part := range aiMsg.Parts() {
switch p := part.(type) {
case message.TextPart:
fmt.Println("Text:", p.Text)
case message.FilePart:
fmt.Println("File:", p.Name, p.MimeType)
}
}
Error handling
AgentMesh uses sentinel errors with errors.Is() support for programmatic error checking.
Sentinel errors
import "github.com/hupe1980/agentmesh/pkg/graph"
for output, err := range compiled.Run(ctx, input) {
if err != nil {
switch {
case errors.Is(err, graph.ErrNotBuilt):
log.Error("Graph not compiled - call Build() first")
case errors.Is(err, graph.ErrNoEntryPoint):
log.Error("No entry point set - call Start()")
case errors.Is(err, graph.ErrNodeNotFound):
log.Error("Referenced node doesn't exist")
case errors.Is(err, graph.ErrDuplicateNode):
log.Error("Node name already used")
case errors.Is(err, graph.ErrDuplicateKey):
log.Error("State key name already registered")
default:
return err
}
}
}
Available sentinel errors
| Error | Description |
|---|---|
ErrNoEntryPoint |
No entry point defined (call Start()) |
ErrNodeNotFound |
Node not found in graph |
ErrDuplicateNode |
Duplicate node name |
ErrDuplicateKey |
Duplicate state key name |
ErrInvalidTarget |
Invalid target node reference |
ErrNotBuilt |
Graph not built (call Build() first) |
InterruptError
For human-in-the-loop workflows:
var interruptErr *graph.InterruptError
if errors.As(err, &interruptErr) {
fmt.Printf("Interrupted at node %s (before=%v)\n",
interruptErr.NodeName, interruptErr.Before)
// Resume with approval (WithApproval is a ResumeOption)
for output, err := range compiled.Resume(ctx, runID,
graph.WithApproval(interruptErr.NodeName, &graph.ApprovalResponse{
Decision: graph.ApprovalApproved,
}),
) {
// ...
}
}
Next steps
- Agents - Build ReAct, Supervisor, and RAG agents
- Tools - Create function tools for agent capabilities
- Checkpointing - State persistence and time travel debugging
- Streaming - Real-time execution events
- Observability - OpenTelemetry metrics and tracing
- Architecture - Understand Pregel BSP internals