Type Safety with Generics (Go 1.24+): AgentMesh provides full compile-time type safety through generic graph types. Use
graph.New[I, O](keys...)for building graphs andgraph.Build()to compile them. Usemessage.NewGraphBuilder()for conversational agents.
Runnable interface
The graph Run method is the core abstraction for executable workflows in AgentMesh. Only compiled graphs can be executed - this separation ensures graphs are validated before running.
Builder vs Graph
AgentMesh separates graph building from graph execution:
// Builder is for construction - define nodes and edges
b := graph.New[string, string](keys...)
b.Node("fetch", fetchFunc, "process")
b.Start("fetch")
// Graph is the compiled executor - run the workflow
compiled, err := b.Build() // Validates and compiles
if err != nil {
return err
}
// Only compiled Graph has Run()
for output, err := range compiled.Run(ctx, input) {
// process outputs...
}
Interface pattern
// Graph.Run returns an iterator of outputs
func (g *Graph[I, O]) Run(ctx context.Context, input I, opts ...RunOption) iter.Seq2[O, error]
Type parameters:
I- Input type (e.g.,[]message.Message,string, custom struct)O- Output type (e.g.,message.Message, custom result type)
Common type aliases
For conversational agents, AgentMesh provides:
// GraphBuilder is a builder for message-processing workflows
type GraphBuilder = graph.Builder[[]message.Message, message.Message]
// Graph is an executable message-processing workflow
type Graph = graph.Graph[[]message.Message, message.Message]
Usage example
All agent constructors return *message.Graph (already compiled):
import (
"github.com/hupe1980/agentmesh/pkg/agent"
"github.com/hupe1980/agentmesh/pkg/message"
)
// Agent constructors return *message.Graph (ready to run)
reactAgent, err := agent.NewReAct(model, agent.WithTools(tools...))
if err != nil {
return err
}
// Execute with iterator pattern - no Build() needed!
for msg, err := range reactAgent.Run(ctx, messages) {
if err != nil {
return err
}
fmt.Println(msg.Content())
}
Benefits
Compile-time type safety:
// ✅ Type-safe: message.Graph accepts []message.Message
reactAgent.Run(ctx, messages)
// ❌ Compile error: won't accept wrong input type
reactAgent.Run(ctx, "invalid input")
// ❌ Compile error: can't run uncompiled builder
b := message.NewGraphBuilder()
b.Run(ctx, messages) // Error: Builder has no Run method
Easy composition:
// All agents are *message.Graph - compose freely
worker1, _ := agent.NewReAct(model)
worker2, _ := agent.NewReAct(model)
supervisor, _ := agent.NewSupervisor(model,
agent.WithWorker("researcher", "Does research", worker1),
agent.WithWorker("writer", "Writes content", worker2),
)
Graphs and nodes
AgentMesh uses a directed graph model where computation flows through connected nodes.
What is a graph?
A graph consists of:
- Nodes - Computational units that process data
- Edges - Connections that define execution order (declared as node targets)
- State - Shared context accessible via typed keys
Building a graph
import "github.com/hupe1980/agentmesh/pkg/graph"
// Define typed state keys
var (
RawDataKey = graph.NewKey[string]("raw_data", "")
ProcessedDataKey = graph.NewKey[string]("processed_data", "")
StatusKey = graph.NewKey[string]("status", "pending")
)
// Create graph with state keys
g := graph.New[string, string](RawDataKey, ProcessedDataKey, StatusKey)
// Add nodes with fluent API - targets are declared inline
g.Node("fetch", fetchDataFunc, "process").
Node("process", processDataFunc, "save").
Node("save", saveDataFunc, graph.END).
Start("fetch")
// Compile into executable graph
compiled, err := g.Build()
if err != nil {
return err
}
// Run the graph
for output, err := range compiled.Run(ctx, "input data") {
if err != nil {
return err
}
fmt.Println(output)
}
Node functions
Nodes receive a read-only view of state and return a Command with updates and next targets:
// NodeFunc signature
type NodeFunc func(ctx context.Context, view View) (*Command, error)
// Example node function
func processDataFunc(ctx context.Context, view graph.View) (*graph.Command, error) {
// Read from state using typed keys
rawData := graph.Get(view, RawDataKey)
// Process the data
processed := strings.ToUpper(rawData)
// Return updates and next target using fluent API
return graph.Set(ProcessedDataKey, processed).
Set(StatusKey, "processed").
To("save"), nil
}
Special nodes
- START - Entry point (set via
Start()) - END - Terminal node constant (
graph.END)
Conditional routing
Dynamically route to different nodes based on state:
var CategoryKey = graph.NewKey[string]("category", "")
func classifierFunc(ctx context.Context, view graph.View) (*graph.Command, error) {
category := graph.Get(view, CategoryKey)
switch category {
case "urgent":
return graph.To("urgent_handler"), nil
case "normal":
return graph.To("normal_handler"), nil
default:
return graph.To("default_handler"), nil
}
}
// Node declares all possible targets
g.Node("classifier", classifierFunc, "urgent_handler", "normal_handler", "default_handler")
State management
State is shared across all nodes using typed keys for compile-time safety.
Defining state keys
import "github.com/hupe1980/agentmesh/pkg/graph"
// Key[T] - single value, overwrites on update
var (
CounterKey = graph.NewKey[int]("counter", 0) // with default
StatusKey = graph.NewKey[string]("status", "pending")
)
// ListKey[T] - append-only list
var MessagesKey = graph.NewListKey[message.Message]("messages")
Reading state
Nodes receive immutable state views with typed access:
func myNode(ctx context.Context, view graph.View) (*graph.Command, error) {
// Type-safe reads - returns the correct type
counter := graph.Get(view, CounterKey) // int
status := graph.Get(view, StatusKey) // string
messages := graph.GetList(view, MessagesKey) // []message.Message
return graph.To("next_node"), nil
}
Updating state
Use the fluent Command builder for type-safe updates:
func myNode(ctx context.Context, view graph.View) (*graph.Command, error) {
counter := graph.Get(view, CounterKey)
// Fluent, type-safe updates
return graph.Set(CounterKey, counter + 1).
Set(StatusKey, "processing").
To("next_node"), nil
}
// For list keys, use Append
func addMessageNode(ctx context.Context, view graph.View) (*graph.Command, error) {
newMsg := message.NewAIMessage(message.NewTextPart("Hello!"))
return graph.Append(MessagesKey, newMsg).
To("next_node"), nil
}
MessageGraph convenience
For conversational agents, use message.NewGraphBuilder() which automatically includes the messages key:
// Creates a graph with MessagesKey pre-registered
g := message.NewGraphBuilder()
// MessagesKey is available
g.Node("chat", func(ctx context.Context, view graph.View) (*graph.Command, error) {
messages := message.GetMessages(view)
// ... process messages
return graph.Append(message.MessagesKey, response).To(graph.END), nil
}, graph.END)
Execution flow
AgentMesh executes graphs using Pregel-style bulk synchronous parallel (BSP) processing.
Supersteps
Execution proceeds in discrete supersteps:
- Identify ready nodes - Nodes with satisfied dependencies
- Execute in parallel - Ready nodes run concurrently
- Apply updates - State changes applied atomically
- Repeat - Until END node or max iterations
Parallel execution
Nodes can fan out to multiple parallel tasks:
g := graph.New[string, string](ResultKey)
// Entry node fans out to three parallel tasks
g.Node("start", func(ctx context.Context, view graph.View) (*graph.Command, error) {
return graph.To("fetch_a", "fetch_b", "fetch_c"), nil
}, "fetch_a", "fetch_b", "fetch_c")
// Each fetch task routes to aggregator
g.Node("fetch_a", fetchAFunc, "aggregator").
Node("fetch_b", fetchBFunc, "aggregator").
Node("fetch_c", fetchCFunc, "aggregator").
Node("aggregator", aggregateFunc, graph.END).
Start("start")
Cycles and loops
Unlike DAG-based systems, AgentMesh supports cycles for iterative workflows:
var (
DraftKey = graph.NewKey[string]("draft", "")
IterationKey = graph.NewKey[int]("iteration", 0)
)
func writerFunc(ctx context.Context, view graph.View) (*graph.Command, error) {
iteration := graph.Get(view, IterationKey)
draft := generateDraft(iteration)
return graph.Set(DraftKey, draft).
Set(IterationKey, iteration + 1).
To("evaluator"), nil
}
func evaluatorFunc(ctx context.Context, view graph.View) (*graph.Command, error) {
draft := graph.Get(view, DraftKey)
iteration := graph.Get(view, IterationKey)
if isGoodEnough(draft) || iteration >= 5 {
return graph.To(graph.END), nil
}
// Loop back for refinement
return graph.To("writer"), nil
}
g := graph.New[string, string](DraftKey, IterationKey)
g.Node("writer", writerFunc, "evaluator").
Node("evaluator", evaluatorFunc, graph.END, "writer"). // declares both targets
Start("writer")
Max iterations
Prevent infinite loops with run options:
for output, err := range compiled.Run(ctx, input,
graph.WithMaxIterations(10),
) {
// ...
}
Messages
Messages represent conversation turns between users, AI, and tools.
Message types
import "github.com/hupe1980/agentmesh/pkg/message"
// Human input (simple text)
humanMsg := message.NewHumanMessageFromText("What's the weather?")
// AI response (simple text)
aiMsg := message.NewAIMessageFromText("It's sunny and 72°F")
// System prompt (simple text)
systemMsg := message.NewSystemMessageFromText("You are a helpful assistant")
// For multi-part messages, use Parts slice
multiPart := message.NewHumanMessage([]message.Part{
message.TextPart{Text: "Describe this image:"},
message.FilePart{MimeType: "image/png", File: message.FileURI{URI: imageURL}},
})
// Tool call
toolCall := message.ToolCall{
ID: "call_123",
Name: "get_weather",
Type: "function",
Arguments: `{"location":"Paris"}`, // JSON string
}
aiWithTool := message.NewAIMessage(
[]message.Part{message.TextPart{Text: "Let me check"}},
message.WithToolCalls(toolCall),
)
// Tool result
toolMsg := message.NewToolMessage("call_123", "Sunny, 22°C")
Message parts
Messages can contain multiple parts:
aiMsg := message.NewAIMessage([]message.Part{
message.TextPart{Text: "Here's the weather"},
message.FilePart{
MimeType: "image/png",
File: message.FileURI{URI: imageURL},
},
})
// Quick text extraction using String()
fmt.Println(aiMsg.String()) // "Here's the weather[file: (image/png)]"
// Access individual parts when you need type-specific handling
for _, part := range aiMsg.Parts() {
switch p := part.(type) {
case message.TextPart:
fmt.Println("Text:", p.Text)
case message.FilePart:
fmt.Println("File:", p.Name, p.MimeType)
}
}
Error handling
AgentMesh uses sentinel errors with errors.Is() support for programmatic error checking.
Sentinel errors
import "github.com/hupe1980/agentmesh/pkg/graph"
for output, err := range compiled.Run(ctx, input) {
if err != nil {
switch {
case errors.Is(err, graph.ErrNotBuilt):
log.Error("Graph not compiled - call Build() first")
case errors.Is(err, graph.ErrNoEntryPoint):
log.Error("No entry point set - call Start()")
case errors.Is(err, graph.ErrNodeNotFound):
log.Error("Referenced node doesn't exist")
case errors.Is(err, graph.ErrDuplicateNode):
log.Error("Node name already used")
case errors.Is(err, graph.ErrDuplicateKey):
log.Error("State key name already registered")
default:
return err
}
}
}
Available sentinel errors
| Error | Description |
|---|---|
ErrNoEntryPoint |
No entry point defined (call Start()) |
ErrNodeNotFound |
Node not found in graph |
ErrDuplicateNode |
Duplicate node name |
ErrDuplicateKey |
Duplicate state key name |
ErrInvalidTarget |
Invalid target node reference |
ErrNotBuilt |
Graph not built (call Build() first) |
InterruptError
For human-in-the-loop workflows:
var interruptErr *graph.InterruptError
if errors.As(err, &interruptErr) {
fmt.Printf("Interrupted at node %s (before=%v)\n",
interruptErr.NodeName, interruptErr.Before)
// Resume with approval
for output, err := range compiled.Run(ctx, input,
graph.WithApproval(interruptErr.NodeName, &graph.ApprovalResponse{
Decision: graph.ApprovalApproved,
}),
) {
// ...
}
}
Next steps
- Agents - Build ReAct, Supervisor, and RAG agents
- Tools - Create function tools for agent capabilities
- Checkpointing - State persistence and time travel debugging
- Streaming - Real-time execution events
- Observability - OpenTelemetry metrics and tracing
- Architecture - Understand Pregel BSP internals