AgentMesh provides high-level agent constructors for common patterns like ReAct, RAG, and Supervisor multi-agent coordination, while also exposing the underlying graph builder for custom workflows. All agents are compiled into executable graphs that run on the Pregel BSP engine.
ReAct agent
The ReAct (Reasoning and Acting) pattern creates an agent that iteratively:
- Reasons about the task
- Decides which tool to use
- Observes the result
- Repeats until the answer is found
This is the most common pattern for multi-step problem solving with tool use.
import (
"github.com/hupe1980/agentmesh/pkg/agent"
"github.com/hupe1980/agentmesh/pkg/model/openai"
"github.com/hupe1980/agentmesh/pkg/tool"
)
// Create tools
searchTool, _ := tool.NewFuncTool("search", "Search the web", searchFunc)
calcTool, _ := tool.NewFuncTool("calculator", "Perform calculations", calcFunc)
// Create ReAct agent (returns *message.Graph)
reactAgent, err := agent.NewReAct(
openai.NewModel(),
agent.WithTools(searchTool, calcTool),
agent.WithMaxIterations(5),
)
// Execute with iterator pattern
for msg, err := range reactAgent.Run(ctx, messages) {
if err != nil {
log.Fatal(err)
}
// Process each message
fmt.Println(msg.Content())
}
Configuration options
agent.NewReAct(model,
agent.WithTools(searchTool, calcTool), // Add tools
agent.WithSupervisorMaxIterations(10), // Max reasoning-action cycles
agent.WithInstructions("You are helpful"), // Instructions
agent.WithOutputSchema(schema), // Structured output
agent.WithRunMiddleware(middleware...), // Run middleware
agent.WithModelMiddleware(middleware...), // Model middleware
agent.WithToolMiddleware(middleware...), // Tool middleware
)
See also: Middleware System for caching, retries, rate limiting, circuit breakers, and more. ```
Dynamic instructions
Instructions support Go text/template syntax with automatic state substitution. Placeholders are replaced with values from the graph state at runtime:
// Define state keys
var UserNameKey = graph.NewKey[string]("userName")
var TaskKey = graph.NewKey[string]("task")
// Use placeholders in instructions - they resolve from graph state
agent.NewReAct(model,
agent.WithInstructions("You are helping . Current task: "),
)
// At runtime, set state values before invoking the agent
state.Set(UserNameKey, "Alice")
state.Set(TaskKey, "analyze sales data")
// Instructions resolve to: "You are helping Alice. Current task: analyze sales data"
Available template features:
- `` - Substitute value from graph state
- `` - Use fallback if value is nil/empty
- `` - Convert to uppercase
- `` - Convert to lowercase
...- Conditionals
Dynamic provider alternative:
For complex logic, use a provider function:
agent.WithInstructionsFromFunc(func(ctx context.Context, scope graph.ReadOnlyScope) (string, error) {
userName := graph.Get(scope, UserNameKey)
if userName == "" {
return "You are a helpful assistant.", nil
}
return fmt.Sprintf("You are helping %s.", userName), nil
})
How it works
The ReAct agent compiles into a graph with a reasoning-action loop:
Architecture:
- Model node: Uses
model.Executorto generate response or tool calls- Delegates execution to executor (handles observability, streaming)
- Routes to “tool” (configurable via
WithToolTarget) if tool calls present - Otherwise routes to END (configurable via
WithNextTarget) - Optionally stores final response to state (via
WithModelResponseKey)
- Tool node: Uses
tool.Executorto execute requested tools- Parallel execution via
ParallelExecutorby default - Formats results as ToolMessages
- Routes back to model node
- Parallel execution via
- Executor pattern benefits:
- Clean separation: nodes handle orchestration, executors handle execution
- Reusable: same executors work in graphs, chains, or direct calls
- Extensible: custom executors (retry, caching) without modifying nodes
- Efficient: Arguments stay as JSON strings (no extra conversions)
Storing model responses in state
When building graphs with multiple model nodes, you may want to access a specific node’s response in downstream nodes. Use WithModelResponseKey to store the final response:
// Define typed state keys
var (
SummaryKey = graph.NewKey[message.Message]("summary")
AnalysisKey = graph.NewKey[message.Message]("analysis")
)
// Create model nodes that store their responses
summaryFn, _ := agent.NewModelNodeFunc(model,
agent.WithModelInstructions("Summarize the following text"),
agent.WithModelResponseKey(SummaryKey), // Store response in state
agent.WithNextTarget("analyzer"),
)
analyzerFn, _ := agent.NewModelNodeFunc(model,
agent.WithModelInstructions("Analyze sentiment"),
agent.WithModelResponseKey(AnalysisKey),
agent.WithNextTarget(graph.END),
)
// Build graph
g := graph.New(SummaryKey, AnalysisKey)
g.Node("summarize", summaryFn)
g.Node("analyzer", analyzerFn)
g.Start("summarize")
compiled, _ := g.Build()
// Execute and access stored responses
for result, err := range compiled.Run(ctx, input) {
if err != nil {
log.Fatal(err)
}
// Access stored responses from state
summary := graph.Get(result.Scope, SummaryKey)
analysis := graph.Get(result.Scope, AnalysisKey)
}
Key points:
- Only stores responses without tool calls (final answers only)
- Use typed keys for compile-time safety:
graph.NewKey[message.Message]("key_name") - Access via
graph.Get(scope, key)in downstream nodes - Useful for multi-stage processing, logging, and conditional routing
Supervisor agent
The Supervisor Agent Pattern creates a coordinator that routes tasks to specialized worker agents based on the user’s query. This enables building complex multi-agent systems with clean separation of concerns.
import (
"github.com/hupe1980/agentmesh/pkg/agent"
"github.com/hupe1980/agentmesh/pkg/model/openai"
)
model := openai.NewModel()
// Create specialized worker agents
mathAgent, _ := agent.NewReAct(
model,
agent.WithInstructions("You are a math expert. Solve problems with clear steps."),
agent.WithMaxIterations(5),
)
codeAgent, _ := agent.NewReAct(
model,
agent.WithInstructions("You are a programming expert. Write clean, documented code."),
agent.WithMaxIterations(5),
)
historyAgent, _ := agent.NewReAct(
model,
agent.WithInstructions("You are a history expert. Provide factual answers with dates."),
agent.WithMaxIterations(5),
)
// Create supervisor that routes to specialists
supervisor, err := agent.NewSupervisor(
model,
agent.WithWorker("math", "Expert in mathematics and calculations", mathAgent),
agent.WithWorker("code", "Expert in programming and software development", codeAgent),
agent.WithWorker("history", "Expert in historical facts and events", historyAgent),
agent.WithSupervisorInstructions("Route queries to the appropriate specialist"),
agent.WithSupervisorMaxIterations(10),
agent.WithWorkerRetries(2),
)
// Execute with iterator pattern
for msg, err := range supervisor.Run(ctx, []message.Message{
message.NewHumanMessageFromText("What is the derivative of x^2 + 3x?"),
}) {
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Content())
}
Configuration options
agent.NewSupervisor(model,
agent.WithWorker(name, description, agent), // Add worker agents
agent.WithInstructions(prompt), // Custom routing instructions
agent.WithMaxIterations(n), // Max routing iterations
agent.WithWorkerRetries(n), // Retry failed worker invocations
agent.WithWorkerValidation(bool), // Validate worker results
)
How it works
The supervisor agent uses tool-based handoffs to delegate work:
Routing Logic"] end Supervisor -->|"handoff_to_math"| Math["Math Agent"] Supervisor -->|"handoff_to_code"| Code["Code Agent"] Supervisor -->|"handoff_to_history"| History["History Agent"] Math --> Result Code --> Result History --> Result Result["Result"] --> User2["User Response"] style User fill:#22c55e,stroke:#16a34a,color:#fff style Supervisor fill:#3b82f6,stroke:#2563eb,color:#fff style Math fill:#8b5cf6,stroke:#7c3aed,color:#fff style Code fill:#8b5cf6,stroke:#7c3aed,color:#fff style History fill:#8b5cf6,stroke:#7c3aed,color:#fff style Result fill:#f59e0b,stroke:#d97706,color:#fff style User2 fill:#22c55e,stroke:#16a34a,color:#fff
Key benefits:
- 🎯 Automatic routing: Supervisor intelligently routes to the right specialist
- 🔧 Automatic tool creation: Each worker gets a
handoff_to_<name>tool - 🔄 Fresh context: Workers can receive only the task, not full conversation (configurable)
- ♻️ Retry logic: Configurable retries for robust execution
Tip: Add progress middleware to see which workers are invoked during execution. See Middleware for examples.
See examples/supervisor_agent for a basic demonstration and examples/blogwriter for a complete multi-agent workflow with progress tracking.
Reflection agent
The Reflection Agent is a composable wrapper that adds self-critique and iterative refinement to ANY agent type. It wraps another agent and automatically improves its outputs through reflection loops.
import (
"github.com/hupe1980/agentmesh/pkg/agent"
"github.com/hupe1980/agentmesh/pkg/model/openai"
)
model := openai.NewModel()
// Create a base agent (ReAct, RAG, Supervisor, or custom)
reactAgent, _ := agent.NewReAct(
model,
agent.WithInstructions("You are a helpful assistant."),
agent.WithTools(searchTool, calcTool),
)
// Wrap with reflection capabilities
wrappedAgent, err := agent.NewReflection(
reactAgent, // ANY agent type!
model, // Model for critique (can differ)
agent.WithReflectionMaxIterations(2), // Max refinement cycles
agent.WithReflectionPromptTemplate(template), // Custom critique prompt
)
// Execute - answers are automatically refined
for msg, err := range wrappedAgent.Run(ctx, messages) {
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Content())
}
Configuration options
agent.NewReflection(baseAgent, reflectionModel,
agent.WithReflectionMaxIterations(n), // Max refinement iterations
agent.WithReflectionPromptTemplate(template), // Custom critique prompt
agent.WithReflectionModelMiddleware(...), // Middleware for reflection
agent.WithReflectionGraphMiddleware(...), // Middleware for the graph
)
How it works
The reflection agent creates a wrapper graph with a critique loop:
Generate Answer"] Reflection["Reflection Node
Critique Answer"] Agent --> Decision{"Max iterations
reached?"} Decision -->|"No"| Reflection Reflection --> Agent Decision -->|"Yes"| Result end Result["Final Answer"] --> User2["User Response"] style User fill:#22c55e,stroke:#16a34a,color:#fff style Agent fill:#3b82f6,stroke:#2563eb,color:#fff style Reflection fill:#8b5cf6,stroke:#7c3aed,color:#fff style Decision fill:#f59e0b,stroke:#d97706,color:#fff style Result fill:#22c55e,stroke:#16a34a,color:#fff style User2 fill:#22c55e,stroke:#16a34a,color:#fff
Key benefits:
- 🔄 Iterative improvement: Automatically refines answers through self-critique
- 🎯 Composable: Works with ANY agent type (ReAct, RAG, Supervisor, custom)
- 🧠 Meta-reasoning: Agent reasons about its own reasoning
- 🔧 Flexible models: Use different models for generation and critique
- 📊 Transparent: Critique feedback visible in message stream
Use cases:
- Complex explanations requiring clarity and completeness
- Technical writing that needs accuracy verification
- Code generation with quality checks
- Research tasks requiring thoroughness
See examples/reflection_agent for a complete demonstration.
RAG agent
The RAG (Retrieval-Augmented Generation) pattern creates an agent that:
- Automatically rephrases follow-up questions in conversations (enabled by default)
- Retrieves relevant context from a knowledge base
- Generates a response using both the query and retrieved context
This is ideal for question-answering over large document collections.
import (
"github.com/hupe1980/agentmesh/pkg/agent"
"github.com/hupe1980/agentmesh/pkg/model/openai"
"github.com/hupe1980/agentmesh/pkg/retrieval/langchaingo"
)
// Create retriever from vector store
retriever := langchaingo.NewRetrieverFromVectorStore(vectorStore, func(o *langchaingo.Options) {
o.NumDocuments = 5
})
// Create RAG agent
ragAgent, err := agent.NewRAG(
openai.NewModel(),
retriever,
)
// Execute with iterator pattern
for msg, err := range ragAgent.Run(ctx, messages) {
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Content())
}
Configuration options
agent.NewRAG(model, retriever,
agent.WithContextPrompt(template), // Custom document formatting prompt
agent.WithRephrasePrompt(template), // Custom query rephrasing prompt
agent.WithSkipRephrasing(), // Disable automatic query rephrasing
)
| Option | Description | Default |
|---|---|---|
WithContextPrompt(t) |
Prompt for formatting retrieved documents | Built-in template |
WithRephrasePrompt(t) |
Prompt for rephrasing follow-up queries | Built-in template |
WithSkipRephrasing() |
Disable automatic query rephrasing | Enabled |
How it works
The RAG agent compiles into a graph with automatic conversational context detection:
START → rephrase → retrieve → generate → END
- Rephrase node: Automatically detects conversational context and rephrases follow-up questions to be standalone queries (e.g., “What about their pricing?” → “What is Acme Corp’s pricing?”). Skips rephrasing for standalone queries (zero overhead).
- Retrieve node: Fetches relevant documents based on the (rephrased) query
- Generate node: Creates a prompt with the query and retrieved context, then generates the response
Conversational RAG
When combined with the Conversational wrapper, RAG automatically handles follow-up questions:
// Create RAG agent (query rephrasing enabled by default)
ragAgent, _ := agent.NewRAG(model, retriever)
// Wrap with conversational memory
chatAgent, _ := agent.NewConversational(ragAgent, memory)
// First query: "Tell me about Acme Corp"
// Follow-up: "What about their pricing?"
// RAG automatically rephrases to: "What is Acme Corp's pricing?"
Without rephrasing, the query “What about their pricing?” would fail to retrieve relevant documents because the vector search has no context that “their” refers to “Acme Corp”.
Conversational agent
The Conversational Agent is a composable wrapper that adds long-term memory to ANY agent type. It automatically recalls relevant context from memory before the agent runs and stores the conversation after completion.
import (
"github.com/hupe1980/agentmesh/pkg/agent"
"github.com/hupe1980/agentmesh/pkg/graph"
"github.com/hupe1980/agentmesh/pkg/memory"
"github.com/hupe1980/agentmesh/pkg/model/openai"
)
model := openai.NewModel()
// Create embedder for semantic memory
embedder := openai.NewEmbedder(client)
// Create vector memory for semantic search
mem := memory.NewVectorMemory(embedder)
// Create a base agent (ReAct, RAG, Supervisor, or custom)
reactAgent, _ := agent.NewReAct(
model,
agent.WithInstructions("You are a helpful assistant."),
agent.WithTools(tools...),
)
// Wrap with conversational memory
// Uses dual-memory approach: short-term (recent) + long-term (semantic)
chatAgent, err := agent.NewConversational(
reactAgent, // ANY agent type!
mem,
agent.WithShortTermMessages(5), // Last 5 messages for immediate context
agent.WithLongTermMessages(5), // 5 semantically similar messages
agent.WithMinSimilarityScore(0.5), // Threshold for long-term recall
agent.WithFailOnStoreError(false), // Don't fail if memory store fails
)
// Execute with a session ID (required)
for msg, err := range chatAgent.Run(ctx, messages,
graph.WithInitialValue(agent.SessionIDKey, "user-123-session"),
) {
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Content())
}
Configuration options
The conversational agent uses a dual-memory approach:
- Short-term memory: Recent N messages (recency-based) - immediate context
- Long-term memory: Semantically similar messages from history (relevance-based)
agent.NewConversational(baseAgent, memory,
agent.WithShortTermMessages(5), // Recent messages (default: 5)
agent.WithLongTermMessages(5), // Semantic search results (default: 5)
agent.WithMinSimilarityScore(0.5), // Threshold for long-term recall (default: 0.5)
agent.WithFailOnStoreError(false), // Fail if memory storage fails (default: false)
)
| Option | Description | Default |
|---|---|---|
WithShortTermMessages(n) |
Number of recent messages to always include | 5 |
WithLongTermMessages(n) |
Number of semantically similar messages | 5 |
WithMinSimilarityScore(s) |
Minimum similarity for long-term recall | 0.5 |
WithFailOnStoreError(b) |
Fail if memory storage fails | false |
Session ID
A session ID must be provided at runtime using graph.WithInitialValue:
// Each user/session gets its own memory context
chatAgent.Run(ctx, messages,
graph.WithInitialValue(agent.SessionIDKey, "user-123-session"),
)
How it works
The conversational agent creates a wrapper graph with memory integration:
Semantic Search"] Recall --> Agent["Wrapped Agent
ReAct/RAG/etc."] Agent --> Store["Memory Store
Save Exchange"] Store --> END((END)) style START fill:#22c55e,stroke:#16a34a,color:#fff style END fill:#ef4444,stroke:#dc2626,color:#fff style Recall fill:#8b5cf6,stroke:#7c3aed,color:#fff style Agent fill:#3b82f6,stroke:#2563eb,color:#fff style Store fill:#8b5cf6,stroke:#7c3aed,color:#fff
Key benefits:
- 🧠 Semantic recall: Finds relevant past messages by meaning, not keywords
- 🔄 Composable: Works with ANY agent type (ReAct, RAG, Supervisor, Reflection)
- 📝 Automatic storage: Stores conversation exchanges after each interaction
- 🔒 Session isolation: Each session has its own memory context
- ⚡ Non-blocking: Memory errors don’t fail the agent by default
Use cases:
- Multi-turn conversations requiring context
- Personalized assistants that remember user preferences
- Customer support with conversation history
- Long-running agent sessions
See the Memory Guide for more on memory types and configuration.
Utility functions
The agent package provides utility functions for working with messages and detecting conversational context:
Message utilities
import "github.com/hupe1980/agentmesh/pkg/agent"
// Get all messages from scope
messages := agent.GetMessages(scope)
// Get the last message (or nil if empty)
lastMsg := agent.LastMessage(scope)
Conversational context detection
Detect when an agent is operating in a conversational context (useful for custom agents):
// Check if conversation history exists
if agent.IsConversationalContext(scope) {
// Handle as follow-up question
// - Has AI responses (prior exchange)
// - Has multiple human messages
// - Has memory context from Conversational wrapper
} else {
// Handle as standalone query
}
// Get conversation history (excludes current query)
history := agent.GetConversationHistory(messages)
// Returns prior messages for context-aware processing
Use cases:
- Custom agents that need context-aware behavior
- Query rephrasing for retrieval systems
- Detecting multi-turn conversations
- Building custom memory integrations
Structured output
Agents can return structured JSON responses matching a defined schema. This is useful when you need predictable, type-safe output from your agent.
Native structured output
If your model supports native structured output (like OpenAI’s response_format), the agent uses it directly:
import (
"github.com/hupe1980/agentmesh/pkg/agent"
"github.com/hupe1980/agentmesh/pkg/schema"
)
// Define your output structure
type AnalysisResult struct {
Sentiment string `json:"sentiment" jsonschema:"required,enum=positive,neutral,negative"`
Confidence float64 `json:"confidence" jsonschema:"required,minimum=0,maximum=1"`
Keywords []string `json:"keywords" jsonschema:"required"`
}
// Create output schema
outputSchema, err := schema.NewOutputSchema("analysis_result", AnalysisResult{})
if err != nil {
log.Fatal(err)
}
// Create agent with structured output
reactAgent, err := agent.NewReAct(model,
agent.WithOutputSchema(&outputSchema),
)
OpenAI strict mode requirements
OpenAI’s Structured Output API has non-standard JSON Schema requirements when using strict mode (the default). AgentMesh automatically handles these for you:
OpenAI’s requirements:
- All properties must be in the
requiredarray - even optional fields - Optional fields use nullable types -
type: ["string", "null"]instead oftype: "string" additionalPropertiesmust befalse- at all nesting levels
How AgentMesh handles this:
When you use schema.NewOutputSchema() with OpenAI and strict mode is enabled (the default), AgentMesh automatically transforms your schema:
// Your Go struct with an optional field
type Person struct {
Name string `json:"name" jsonschema:"required"`
Age int `json:"age,omitempty"` // Optional field
}
// Create schema - strict mode is enabled by default
schema, err := schema.NewOutputSchema("person", Person{})
// Standard JSON Schema generated:
// {
// "properties": { "name": {...}, "age": {...} },
// "required": ["name"]
// }
// AgentMesh automatically transforms to OpenAI-compatible:
// {
// "properties": {
// "name": {"type": "string"},
// "age": {"type": ["integer", "null"]} // Made nullable
// },
// "required": ["name", "age"], // All fields required
// "additionalProperties": false
// }
Disabling strict mode:
If you prefer standard JSON Schema behavior (not recommended for OpenAI), disable strict mode:
schema, err := schema.NewOutputSchema("person", Person{},
schema.WithStrict(false),
)
Note: Other providers (Anthropic, Gemini, Ollama, Bedrock) use tool calling for structured output and follow standard JSON Schema - no transformation is applied.
Tool-based fallback
For models that don’t support native structured output but do support tool calling, AgentMesh automatically uses a tool-based fallback. It injects a set_model_response tool that instructs the model to return structured data via tool calling:
// This works even with models that don't have native structured output support
// as long as they support tool calling
reactAgent, err := agent.NewReAct(model,
agent.WithOutputSchema(&outputSchema),
agent.WithTools(searchTool, calcTool), // Your other tools work alongside
)
How it works:
- Agent checks model capabilities at creation time
- If
StructuredOutput: falsebutTools: true, injectsSetModelResponseTool - The tool instructs the model to call it with the final response matching the schema
- The tool validates and returns the structured response
Behavior matrix:
| Model Capabilities | Behavior |
|---|---|
StructuredOutput: true |
Uses native structured output |
StructuredOutput: false, Tools: true |
Uses set_model_response tool fallback |
StructuredOutput: false, Tools: false |
Schema passed to model (may not work) |
Custom set_model_response tool
If you need custom behavior, you can provide your own set_model_response tool - the agent won’t add a duplicate:
import "github.com/hupe1980/agentmesh/pkg/tool"
// Create custom set_model_response tool
customTool, err := tool.NewSetModelResponseTool(&outputSchema)
if err != nil {
log.Fatal(err)
}
// Agent uses your tool instead of creating one
reactAgent, err := agent.NewReAct(model,
agent.WithOutputSchema(&outputSchema),
agent.WithTools(customTool, searchTool),
)
Custom graphs
For complete control over workflow logic, build custom graphs using the graph API:
import (
"github.com/hupe1980/agentmesh/pkg/graph"
"github.com/hupe1980/agentmesh/pkg/message"
)
// Define typed keys
var CategoryKey = graph.NewKey[string]("category")
// Create message graph for agent workflows
g := message.NewGraphBuilder(CategoryKey)
// Add nodes using fluent API
g.Node("classify", func(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
messages := message.GetMessages(scope)
category := classifyIntent(messages)
if category == "support" {
return graph.Set(CategoryKey, category).To("handle_support"), nil
}
return graph.Set(CategoryKey, category).To("handle_sales"), nil
}, "handle_support", "handle_sales")
g.Node("handle_support", func(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
response := message.NewAIMessageFromText("Support response...")
return graph.Set(message.MessagesKey, []message.Message{response}).To(graph.END), nil
}, graph.END)
g.Node("handle_sales", func(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
response := message.NewAIMessageFromText("Sales response...")
return graph.Set(message.MessagesKey, []message.Message{response}).To(graph.END), nil
}, graph.END)
g.Start("classify")
// Compile and execute
compiled, _ := g.Build()
for result, err := range compiled.Run(ctx, messages) {
if err != nil {
log.Fatal(err)
}
// Process results
fmt.Println(result.Content())
}
Node functions
Nodes receive a Scope (which embeds ReadOnlyScope) and return a Command:
g.Node("process", func(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
// Read state with typed keys
previousValue := graph.Get(scope, MyKey)
messages := message.GetMessages(scope)
// Process...
// Return updates and routing
return graph.Set(MyKey, newValue).
Append(message.MessagesKey, newMessage).
To("next_node"), nil
}, "next_node")
Conditional routing
Direct execution flow dynamically using commands:
g.Node("router", func(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
action := graph.Get(scope, ActionKey)
switch action {
case "approve":
return graph.To("approver"), nil
case "reject":
return graph.To("rejector"), nil
case "escalate":
return graph.To("human_review"), nil
default:
return graph.To("default_handler"), nil
}
}, "approver", "rejector", "human_review", "default_handler")
Nodes can route to multiple targets for parallel execution:
g.Node("fanout", func(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
// Route to all three analysts in parallel
return graph.To("analyst_a", "analyst_b", "analyst_c"), nil
}, "analyst_a", "analyst_b", "analyst_c")
Parallel execution
Nodes can fan out to parallel execution by routing to multiple targets:
// Entry node fans out to three concurrent tasks
g.Node("start", func(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
return graph.To("fetch_data_a", "fetch_data_b", "fetch_data_c"), nil
}, "fetch_data_a", "fetch_data_b", "fetch_data_c")
// Each fetch task routes to aggregator
g.Node("fetch_data_a", fetchAFunc, "aggregator")
g.Node("fetch_data_b", fetchBFunc, "aggregator")
g.Node("fetch_data_c", fetchCFunc, "aggregator")
g.Node("aggregator", aggregateFunc, graph.END)
g.Start("start")
The aggregator waits for all incoming nodes to complete before executing.
Subgraphs
Compose complex workflows from reusable graph components using graph.Subgraph():
// Create a research subgraph
researchSub := createResearchGraph()
compiledResearch, _ := researchSub.Build()
// Create parent graph
parent := message.NewGraphBuilder()
// Embed subgraph as a node using graph.Subgraph with mappers
parent.Node("research", graph.Subgraph(
compiledResearch,
// InputMapper: transform parent messages to subgraph input
func(ctx context.Context, scope graph.Scope) ([]message.Message, error) {
messages := message.GetMessages(scope)
// Filter or transform messages for research subgraph
return messages, nil
},
// OutputMapper: merge research results back to parent
func(ctx context.Context, output message.Message) (graph.Updates, error) {
return graph.Updates{message.MessagesKey.Name(): []message.Message{output}}, nil
},
), "synthesize")
parent.Node("synthesize", func(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
messages := message.GetMessages(scope)
// Synthesize research results...
return graph.Set(message.MessagesKey, []message.Message{summary}).To(graph.END)
}, graph.END)
parent.Start("research")
compiled, _ := parent.Build()
Type Safety: The InputMapper[SI] and OutputMapper[SO] type aliases provide clear signatures for state transformation functions.
See examples/subgraph for a complete demonstration.