Go 1.24+ GitHub

Graph Builder API

Construct graphs with a fluent, type-safe API.

Features

  • Fluent API: Chain method calls for readable graph construction
  • Type-Safe: Full Go generics with compile-time type checking
  • Command Pattern: Combine state updates and routing in single expressions
  • Typed Keys: Compile-time type safety for state access

Quick Start

Basic Usage

import (
    "context"
    "github.com/hupe1980/agentmesh/pkg/graph"
)

// Define typed state keys
var (
    StatusKey = graph.NewKey[string]("status", "")
    CountKey  = graph.NewKey[int]("count", 0)
)

// Create a graph with typed input/output and keys
g := graph.New[string, string](StatusKey, CountKey)

// Add nodes with fluent API
g.Node("process", func(ctx context.Context, view graph.View) (*graph.Command, error) {
    count := graph.Get(view, CountKey)
    return graph.Set(StatusKey, "done").
        Set(CountKey, count+1).
        To(graph.END), nil
}, graph.END)

// Set entry point
g.Start("process")

// Compile and run
compiled, err := g.Build()
if err != nil {
    log.Fatal(err)
}

for result, err := range compiled.Run(context.Background(), "input") {
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(graph.Get(result, StatusKey)) // "done"
}

Message Graph for Agents

Use message.NewGraphBuilder() for agent workflows with built-in message handling:

import (
    "github.com/hupe1980/agentmesh/pkg/graph"
    "github.com/hupe1980/agentmesh/pkg/message"
)

// Create message graph (auto-includes message.MessagesKey)
g := message.NewGraphBuilder()

// Add agent node
g.Node("agent", func(ctx context.Context, view graph.View) (*graph.Command, error) {
    messages := message.GetMessages(view)
    
    // Process messages with model...
    response := message.NewAIMessageFromText("Hello!")
    
    return graph.Append(message.MessagesKey, response).To(graph.END), nil
}, graph.END)

g.Start("agent")

compiled, _ := g.Build()

// Run with messages
input := []message.Message{message.NewHumanMessageFromText("Hi")}
for msg, err := range compiled.Run(context.Background(), input) {
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(msg.Content())
}

Conditional Routing

var RouteKey = graph.NewKey[string]("route", "")

g := graph.New[string, string](RouteKey)

g.Node("router", func(ctx context.Context, view graph.View) (*graph.Command, error) {
    route := graph.Get(view, RouteKey)
    if route == "left" {
        return graph.To("left"), nil
    }
    return graph.To("right"), nil
}, "left", "right")

g.Node("left", func(ctx context.Context, view graph.View) (*graph.Command, error) {
    return graph.Set(RouteKey, "went-left").To(graph.END), nil
}, graph.END)

g.Node("right", func(ctx context.Context, view graph.View) (*graph.Command, error) {
    return graph.Set(RouteKey, "went-right").To(graph.END), nil
}, graph.END)

g.Start("router")

Adding Nodes

Basic Nodes

Add nodes with the Node() method:

g.Node("name", func(ctx context.Context, view graph.View) (*graph.Command, error) {
    // Node logic here
    return graph.To("next"), nil
}, "next")

The last arguments are the declared targets - all possible nodes this node can route to.

Nodes with Retry

Add automatic retry behavior with NodeWithRetry():

g.NodeWithRetry("api_call",
    func(ctx context.Context, view graph.View) (*graph.Command, error) {
        result, err := callExternalAPI()
        if err != nil {
            return graph.Fail(err) // Will be retried
        }
        return graph.Set(ResultKey, result).To(graph.END), nil
    },
    graph.RetryPolicy{
        MaxAttempts:    5,
        InitialBackoff: time.Second,
        BackoffFactor:  2.0,
    },
    graph.END,
)

Namespaced Nodes

Add namespace-scoped nodes for state isolation:

var agentNS = state.MustNamespace("agent")
var AgentStatusKey = state.TypedKey[string](agentNS, "status", "")

g.NamespacedNode("agent", agentNS,
    func(ctx context.Context, view graph.View) (*graph.Command, error) {
        // Can only access keys in "agent" namespace
        return graph.Set(AgentStatusKey, "active").To(graph.END), nil
    },
    graph.END,
)

Subgraphs

Embed compiled graphs as nodes:

// Create and compile subgraph
sub := graph.New[string, string](ValueKey)
sub.Node("double", func(ctx context.Context, view graph.View) (*graph.Command, error) {
    val := graph.Get(view, ValueKey)
    return graph.Set(ValueKey, val*2).To(graph.END), nil
}, graph.END)
sub.Start("double")
compiledSub, _ := sub.Build()

// Embed in parent
parent := graph.New[string, string](ValueKey)
parent.Subgraph("doubler", compiledSub, graph.END)
parent.Start("doubler")

Commands

Commands combine state updates with routing in a single fluent expression.

Setting Values

// Set single value
return graph.Set(StatusKey, "done").To("next"), nil

// Set multiple values
return graph.Set(StatusKey, "done").
    Set(CountKey, 42).
    Set(ValidKey, true).
    To("next"), nil

Appending to Lists

// Append single item
return graph.Append(TagsKey, "new-tag").To("next"), nil

// Append multiple items
return graph.Append(MessagesKey, msg1, msg2, msg3).To("next"), nil

Routing

// Route to single target
return graph.To("next"), nil

// Route to multiple targets (parallel execution)
return graph.To("worker1", "worker2", "worker3"), nil

// Route to END
return graph.To(graph.END), nil

Error Handling

// Signal failure
return graph.Fail(err), nil

// Conditional failure
if err != nil {
    return graph.Fail(err), nil
}
return graph.To("next"), nil

Interrupts

// Pause for human intervention
return graph.Set(StatusKey, "awaiting_approval").Interrupt(), nil

Compilation

Basic Compilation

compiled, err := g.Build()
if err != nil {
    log.Fatal(err)
}

With Checkpointing

import "github.com/hupe1980/agentmesh/pkg/checkpoint"

checkpointer := checkpoint.NewInMemory()

compiled, err := g.Build(
    graph.WithCheckpointer(checkpointer),
)

With Callbacks

compiled, err := g.Build(
    graph.WithCallbacks(myCallbackHandler),
)

Execution

Streaming Results

for result, err := range compiled.Run(ctx, input) {
    if err != nil {
        log.Printf("Error: %v", err)
        continue
    }
    // Access state values from result view
    status := graph.Get(result, StatusKey)
    fmt.Println(status)
}

With Options

for result := range compiled.Run(ctx, input,
    graph.WithRunID("workflow-123"),
    graph.WithCheckpointInterval(1),
    graph.WithMaxSteps(100),
) {
    // Process results
}

API Reference

Graph Creation

Function Description
graph.New[I, O](keys...) Create typed graph with state keys
message.NewGraphBuilder(keys...) Create message-based graph for agents

Graph Methods

Method Description
g.Node(name, fn, targets...) Add node with function and declared targets
g.NodeWithRetry(name, fn, policy, targets...) Add node with retry policy
g.NamespacedNode(name, ns, fn, targets...) Add namespace-scoped node
g.Subgraph(name, compiled, targets...) Embed subgraph as node
g.Start(name) Set entry point
g.Build(opts...) Compile graph

State Keys

Function Description
graph.NewKey[T](name, default) Create typed single-value key
graph.NewListKey[T](name) Create typed list key
graph.Get(view, key) Read value from view
graph.GetList(view, key) Read list from view

Commands

Function Description
graph.Set(key, val) Set single value
graph.Append(key, items...) Append to list
graph.To(targets...) Route to targets
graph.Fail(err) Signal failure
cmd.Interrupt() Pause for human intervention

Execution

Function Description
compiled.Run(ctx, input, opts...) Returns iter.Seq2[View, error] for streaming results

Examples

See the builder_api example for a complete working example.

Architecture

The Builder API provides a clean interface for graph construction:

graph.New[I, O](keys...)
    │
    ├── g.Node("name", fn, targets...)
    ├── g.NodeWithRetry(...)
    ├── g.NamespacedNode(...)
    ├── g.Subgraph(...)
    │
    └── g.Build(opts...)
            │
            └── *Compiled[I, O]
                    │
                    └── Run(ctx, input, opts...) → iter.Seq2[View, error]

The graph handles the full lifecycle from construction to compilation to execution.