Features
- Fluent API: Chain method calls for readable graph construction
- Type-Safe: Full Go generics with compile-time type checking
- Command Pattern: Combine state updates and routing in single expressions
- Typed Keys: Compile-time type safety for state access
Quick Start
Basic Usage
import (
"context"
"github.com/hupe1980/agentmesh/pkg/graph"
)
// Define typed state keys
var (
StatusKey = graph.NewKey[string]("status", "")
CountKey = graph.NewKey[int]("count", 0)
)
// Create a graph with typed input/output and keys
g := graph.New[string, string](StatusKey, CountKey)
// Add nodes with fluent API
g.Node("process", func(ctx context.Context, view graph.View) (*graph.Command, error) {
count := graph.Get(view, CountKey)
return graph.Set(StatusKey, "done").
Set(CountKey, count+1).
To(graph.END), nil
}, graph.END)
// Set entry point
g.Start("process")
// Compile and run
compiled, err := g.Build()
if err != nil {
log.Fatal(err)
}
for result, err := range compiled.Run(context.Background(), "input") {
if err != nil {
log.Fatal(err)
}
fmt.Println(graph.Get(result, StatusKey)) // "done"
}
Message Graph for Agents
Use message.NewGraphBuilder() for agent workflows with built-in message handling:
import (
"github.com/hupe1980/agentmesh/pkg/graph"
"github.com/hupe1980/agentmesh/pkg/message"
)
// Create message graph (auto-includes message.MessagesKey)
g := message.NewGraphBuilder()
// Add agent node
g.Node("agent", func(ctx context.Context, view graph.View) (*graph.Command, error) {
messages := message.GetMessages(view)
// Process messages with model...
response := message.NewAIMessageFromText("Hello!")
return graph.Append(message.MessagesKey, response).To(graph.END), nil
}, graph.END)
g.Start("agent")
compiled, _ := g.Build()
// Run with messages
input := []message.Message{message.NewHumanMessageFromText("Hi")}
for msg, err := range compiled.Run(context.Background(), input) {
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Content())
}
Conditional Routing
var RouteKey = graph.NewKey[string]("route", "")
g := graph.New[string, string](RouteKey)
g.Node("router", func(ctx context.Context, view graph.View) (*graph.Command, error) {
route := graph.Get(view, RouteKey)
if route == "left" {
return graph.To("left"), nil
}
return graph.To("right"), nil
}, "left", "right")
g.Node("left", func(ctx context.Context, view graph.View) (*graph.Command, error) {
return graph.Set(RouteKey, "went-left").To(graph.END), nil
}, graph.END)
g.Node("right", func(ctx context.Context, view graph.View) (*graph.Command, error) {
return graph.Set(RouteKey, "went-right").To(graph.END), nil
}, graph.END)
g.Start("router")
Adding Nodes
Basic Nodes
Add nodes with the Node() method:
g.Node("name", func(ctx context.Context, view graph.View) (*graph.Command, error) {
// Node logic here
return graph.To("next"), nil
}, "next")
The last arguments are the declared targets - all possible nodes this node can route to.
Nodes with Retry
Add automatic retry behavior with NodeWithRetry():
g.NodeWithRetry("api_call",
func(ctx context.Context, view graph.View) (*graph.Command, error) {
result, err := callExternalAPI()
if err != nil {
return graph.Fail(err) // Will be retried
}
return graph.Set(ResultKey, result).To(graph.END), nil
},
graph.RetryPolicy{
MaxAttempts: 5,
InitialBackoff: time.Second,
BackoffFactor: 2.0,
},
graph.END,
)
Namespaced Nodes
Add namespace-scoped nodes for state isolation:
var agentNS = state.MustNamespace("agent")
var AgentStatusKey = state.TypedKey[string](agentNS, "status", "")
g.NamespacedNode("agent", agentNS,
func(ctx context.Context, view graph.View) (*graph.Command, error) {
// Can only access keys in "agent" namespace
return graph.Set(AgentStatusKey, "active").To(graph.END), nil
},
graph.END,
)
Subgraphs
Embed compiled graphs as nodes:
// Create and compile subgraph
sub := graph.New[string, string](ValueKey)
sub.Node("double", func(ctx context.Context, view graph.View) (*graph.Command, error) {
val := graph.Get(view, ValueKey)
return graph.Set(ValueKey, val*2).To(graph.END), nil
}, graph.END)
sub.Start("double")
compiledSub, _ := sub.Build()
// Embed in parent
parent := graph.New[string, string](ValueKey)
parent.Subgraph("doubler", compiledSub, graph.END)
parent.Start("doubler")
Commands
Commands combine state updates with routing in a single fluent expression.
Setting Values
// Set single value
return graph.Set(StatusKey, "done").To("next"), nil
// Set multiple values
return graph.Set(StatusKey, "done").
Set(CountKey, 42).
Set(ValidKey, true).
To("next"), nil
Appending to Lists
// Append single item
return graph.Append(TagsKey, "new-tag").To("next"), nil
// Append multiple items
return graph.Append(MessagesKey, msg1, msg2, msg3).To("next"), nil
Routing
// Route to single target
return graph.To("next"), nil
// Route to multiple targets (parallel execution)
return graph.To("worker1", "worker2", "worker3"), nil
// Route to END
return graph.To(graph.END), nil
Error Handling
// Signal failure
return graph.Fail(err), nil
// Conditional failure
if err != nil {
return graph.Fail(err), nil
}
return graph.To("next"), nil
Interrupts
// Pause for human intervention
return graph.Set(StatusKey, "awaiting_approval").Interrupt(), nil
Compilation
Basic Compilation
compiled, err := g.Build()
if err != nil {
log.Fatal(err)
}
With Checkpointing
import "github.com/hupe1980/agentmesh/pkg/checkpoint"
checkpointer := checkpoint.NewInMemory()
compiled, err := g.Build(
graph.WithCheckpointer(checkpointer),
)
With Callbacks
compiled, err := g.Build(
graph.WithCallbacks(myCallbackHandler),
)
Execution
Streaming Results
for result, err := range compiled.Run(ctx, input) {
if err != nil {
log.Printf("Error: %v", err)
continue
}
// Access state values from result view
status := graph.Get(result, StatusKey)
fmt.Println(status)
}
With Options
for result := range compiled.Run(ctx, input,
graph.WithRunID("workflow-123"),
graph.WithCheckpointInterval(1),
graph.WithMaxSteps(100),
) {
// Process results
}
API Reference
Graph Creation
| Function | Description |
|---|---|
graph.New[I, O](keys...) |
Create typed graph with state keys |
message.NewGraphBuilder(keys...) |
Create message-based graph for agents |
Graph Methods
| Method | Description |
|---|---|
g.Node(name, fn, targets...) |
Add node with function and declared targets |
g.NodeWithRetry(name, fn, policy, targets...) |
Add node with retry policy |
g.NamespacedNode(name, ns, fn, targets...) |
Add namespace-scoped node |
g.Subgraph(name, compiled, targets...) |
Embed subgraph as node |
g.Start(name) |
Set entry point |
g.Build(opts...) |
Compile graph |
State Keys
| Function | Description |
|---|---|
graph.NewKey[T](name, default) |
Create typed single-value key |
graph.NewListKey[T](name) |
Create typed list key |
graph.Get(view, key) |
Read value from view |
graph.GetList(view, key) |
Read list from view |
Commands
| Function | Description |
|---|---|
graph.Set(key, val) |
Set single value |
graph.Append(key, items...) |
Append to list |
graph.To(targets...) |
Route to targets |
graph.Fail(err) |
Signal failure |
cmd.Interrupt() |
Pause for human intervention |
Execution
| Function | Description |
|---|---|
compiled.Run(ctx, input, opts...) |
Returns iter.Seq2[View, error] for streaming results |
Examples
See the builder_api example for a complete working example.
Architecture
The Builder API provides a clean interface for graph construction:
graph.New[I, O](keys...)
│
├── g.Node("name", fn, targets...)
├── g.NodeWithRetry(...)
├── g.NamespacedNode(...)
├── g.Subgraph(...)
│
└── g.Build(opts...)
│
└── *Compiled[I, O]
│
└── Run(ctx, input, opts...) → iter.Seq2[View, error]
The graph handles the full lifecycle from construction to compilation to execution.