Overview
AgentMesh abstracts language models behind a common model.Model interface that uses Go 1.24+ iterators for unified streaming:
type Model interface {
Generate(ctx context.Context, req *Request) iter.Seq2[*Response, error]
Capabilities() Capabilities
}
The Request struct bundles messages, tools, instructions, and other options:
type Request struct {
Messages []message.Message // Conversation history
Tools []tool.Tool // Available tools for function calling
Instructions string // Per-request instructions
OutputSchema *schema.OutputSchema // Structured output schema
Stream bool // Enable streaming mode
Metadata map[string]any // Provider-specific options
}
The iterator-based API unifies streaming and blocking modes:
- Streaming: Iterate over partial responses as they arrive
- Blocking: Use
model.Last()to get only the final response - Batch collection: Use
model.Collect()to gather all responses
Response Structure
Models return a *model.Response with rich metadata:
type Response struct {
Message message.Message // The actual message content
Reasoning string // Native reasoning (o1/o3, Gemini 2.0, Claude)
FinishReason string // Why generation stopped
Logprobs *Logprobs // Token probabilities (OpenAI)
Usage *UsageInfo // Token consumption tracking
Metadata map[string]any // Provider-specific metadata
Partial bool // true for streaming chunks, false for final
}
type UsageInfo struct {
PromptTokens int // Input tokens
CompletionTokens int // Output tokens
ReasoningTokens int // Reasoning tokens (o1/o3)
TotalTokens int // Sum of all tokens
}
Model Capabilities
All models expose their features via Capabilities():
caps := model.Capabilities()
// Discover what the model supports
if caps.Tools {
// Can pass tools in Request.Tools
}
if caps.NativeReasoning {
// Response.Reasoning will be populated
}
if caps.Vision {
// Can send message.FilePart with image MIME type
}
type Capabilities struct {
Streaming bool // Supports incremental responses
Tools bool // Supports function calling
StructuredOutput bool // Supports JSON schema
NativeReasoning bool // Exposes internal reasoning
Logprobs bool // Provides token probabilities
Vision bool // Accepts images
Audio bool // Accepts audio
MaxContextTokens int // Context window size
MaxOutputTokens int // Max generation length
SupportedModalities []string // Input types: "text", "image", "audio"
}
Checking Capabilities
Always check Capabilities() before using features to ensure the model supports them:
caps := model.Capabilities()
if caps.Tools {
// Safe to pass tools in Request.Tools
}
if caps.Vision {
// Safe to include image parts in messages
}
Available models
OpenAI
The OpenAI adapter wraps the official openai-go SDK for Chat Completions:
import (
"github.com/hupe1980/agentmesh/pkg/agent"
"github.com/hupe1980/agentmesh/pkg/model/openai"
)
model := openai.NewModel(
openai.WithModel("gpt-4o"),
openai.WithTemperature(0.7),
openai.WithMaxCompletionTokens(1000),
)
compiled, err := agent.NewReAct(model, tools)
Configuration options:
openai.NewModel(
openai.WithModel("gpt-4o-mini"), // Model name
openai.WithTemperature(0.2), // Randomness (0-2)
openai.WithMaxCompletionTokens(500), // Max output tokens
)
The adapter supports:
- ✅ Streaming responses
- ✅ Function calling via
Request.Tools - ✅ Parallel tool calls
- ✅ Vision models (pass
message.FilePartwith image MIME type)
Anthropic
The Anthropic adapter integrates Claude models via the official SDK:
import "github.com/hupe1980/agentmesh/pkg/model/anthropic"
model := anthropic.NewModel(
anthropic.WithModel("claude-3-5-sonnet-20241022"),
anthropic.WithMaxTokens(1024),
anthropic.WithTemperature(1.0),
)
compiled, err := agent.NewReAct(model, tools)
Configuration options:
anthropic.NewModel(
anthropic.WithModel("claude-3-5-sonnet-20241022"),
anthropic.WithMaxTokens(2048),
anthropic.WithTemperature(0.5),
anthropic.WithAPIKey("your-api-key"), // Optional if set in env
)
The adapter supports:
- ✅ Streaming responses
- ✅ Function calling via
Request.Tools - ✅ Vision models
- ✅ System prompts
Gemini
The Gemini adapter integrates Google’s Gemini models via the official SDK:
import "github.com/hupe1980/agentmesh/pkg/model/gemini"
model, err := gemini.NewModel(ctx,
gemini.WithModel("gemini-2.0-flash-exp"),
gemini.WithMaxOutputTokens(4096),
gemini.WithTemperature(0.7),
)
if err != nil {
log.Fatal(err)
}
compiled, err := agent.NewReAct(model, tools)
Configuration options:
gemini.NewModel(ctx,
gemini.WithModel("gemini-2.0-flash-exp"), // Model name
gemini.WithMaxOutputTokens(4096), // Max output tokens
gemini.WithTemperature(0.7), // Randomness (0-1)
gemini.WithTopP(0.95), // Nucleus sampling
gemini.WithTopK(40), // Top-k sampling
gemini.WithAPIKey("your-api-key"), // Optional if set in env
)
The adapter supports:
- ✅ Streaming responses
- ✅ Function calling via
Request.Tools - ✅ Vision models (multimodal)
- ✅ Native reasoning (Gemini 2.0)
Ollama
The Ollama adapter enables local model execution with no API keys or cloud dependencies:
import "github.com/hupe1980/agentmesh/pkg/model/ollama"
// Connect to local Ollama instance (default: http://localhost:11434)
model := ollama.NewModel(
ollama.WithModel("llama3.2"),
ollama.WithTemperature(0.7),
ollama.WithNumPredict(1000),
)
compiled, err := agent.NewReAct(model, tools)
Configuration options:
ollama.NewModel(
ollama.WithModel("llama3.2"), // Model name (llama3.2, mistral, codellama, etc.)
ollama.WithTemperature(0.7), // Randomness (0-2)
ollama.WithNumPredict(1000), // Max tokens (-1 = unlimited)
ollama.WithTopK(40), // Top-k sampling
ollama.WithTopP(0.9), // Nucleus sampling
ollama.WithSeed(42), // Reproducible output
)
The adapter supports:
- ✅ Streaming responses
- ✅ Function calling via
Request.Tools - ✅ Local execution (no API keys needed)
- ✅ Any Ollama-compatible model (llama3.2, mistral, codellama, gemma, phi, etc.)
- ❌ Structured output (JSON schema)
- ❌ Vision (model-dependent, e.g., llava)
Prerequisites:
- Install Ollama: https://ollama.ai
- Pull a model:
ollama pull llama3.2 - Start the server:
ollama serve
Custom Ollama host:
import "github.com/ollama/ollama/api"
client, _ := api.ClientFromEnvironment()
// Or: client := api.NewClient("http://remote-host:11434", nil)
model, _ := ollama.NewModelFromClient(client,
ollama.WithModel("llama3.2"),
)
Popular Ollama models:
llama3.2- Meta’s latest LLaMA (default)mistral- Mistral AI’s efficient 7B modelcodellama- Specialized for code generationgemma- Google’s open Gemma modelphi- Microsoft’s compact Phi modelllava- Multimodal vision model
See Ollama model library for the full list.
LangChainGo
Wrap any LangChainGo LLM to reuse existing integrations:
import (
"github.com/hupe1980/agentmesh/pkg/model/langchaingo"
"github.com/tmc/langchaingo/llms/openai"
)
llm, _ := openai.New(openai.WithModel("gpt-4"))
model, _ := langchaingo.NewModel(llm)
compiled, err := agent.NewReAct(model, tools)
This adapter enables:
- Integration with LangChainGo’s 50+ model providers
- Reuse of existing LangChainGo configurations
- Gradual migration from LangChainGo to AgentMesh
Amazon Bedrock
The Amazon Bedrock adapter integrates foundation models from multiple providers (Anthropic Claude, Meta Llama, Amazon Nova, Mistral, etc.) via the AWS SDK’s Converse API:
import (
"context"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/bedrockruntime"
"github.com/hupe1980/agentmesh/pkg/model/amazonbedrock"
)
cfg, _ := config.LoadDefaultConfig(context.Background())
client := bedrockruntime.NewFromConfig(cfg)
model := amazonbedrock.NewModel(client,
amazonbedrock.WithModelID("eu.amazon.nova-pro-v1:0"),
amazonbedrock.WithTemperature(0.7),
amazonbedrock.WithMaxTokens(1024),
)
compiled, err := agent.NewReAct(model, tools)
Configuration options:
amazonbedrock.NewModel(client,
amazonbedrock.WithModelID("eu.amazon.nova-pro-v1:0"), // Model or inference profile ID
amazonbedrock.WithTemperature(0.7), // Randomness (0-1)
amazonbedrock.WithMaxTokens(2048), // Max output tokens
amazonbedrock.WithTopP(0.9), // Nucleus sampling
)
Cross-Region Inference Profiles (recommended for production):
Bedrock supports inference profiles that provide automatic cross-region load balancing. Use the region prefix (us., eu., etc.) with the model ID:
// EU region inference profiles
model := amazonbedrock.NewModel(client,
amazonbedrock.WithModelID("eu.amazon.nova-pro-v1:0"),
)
// US region inference profiles
model := amazonbedrock.NewModel(client,
amazonbedrock.WithModelID("us.anthropic.claude-3-5-sonnet-20240620-v1:0"),
)
Common inference profile IDs:
eu.amazon.nova-pro-v1:0/us.amazon.nova-pro-v1:0(Nova Pro with tools & vision)eu.amazon.nova-lite-v1:0/us.amazon.nova-lite-v1:0(Nova Lite with tools & vision)eu.anthropic.claude-3-5-sonnet-20240620-v1:0(Claude 3.5 Sonnet)eu.anthropic.claude-3-haiku-20240307-v1:0(Claude 3 Haiku)eu.meta.llama3-2-3b-instruct-v1:0(Llama 3.2)
To list available inference profiles in your account:
aws bedrock list-inference-profiles --query "inferenceProfileSummaries[].inferenceProfileId"
The adapter supports:
- ✅ Streaming responses (via ConverseStream API)
- ✅ Function calling via
Request.Tools - ✅ Vision models (Claude, Nova Pro/Lite)
- ✅ Multiple foundation model providers through unified API
Tool binding
Tools are passed to models via the Request.Tools field:
// Create tools
searchTool, _ := tool.NewFuncTool("search", "Search the web", searchFunc)
calcTool, _ := tool.NewFuncTool("calculator", "Perform calculations", calcFunc)
// Pass tools in request
req := &model.Request{
Messages: messages,
Tools: []tool.Tool{searchTool, calcTool},
}
for resp, err := range model.Generate(ctx, req) {
// Handle tool calls in response
}
Agent constructors handle tool binding automatically:
// Tools are passed automatically via agent options
compiled, err := agent.NewReAct(
openai.NewModel(),
agent.WithTools(searchTool, calcTool),
)
Streaming
All models support streaming through the unified iterator API:
// Stream model responses directly with full metadata access
for resp, err := range model.Generate(ctx, messages) {
if err != nil {
log.Printf("Error: %v", err)
break
}
// Print partial content as it arrives
fmt.Print(resp.Message.String())
// Access streaming reasoning (if supported)
if resp.Reasoning != "" {
fmt.Printf("\n[Reasoning: %s]\n", resp.Reasoning)
}
}
For blocking (non-streaming) mode, use model.Last():
// Get only the final response with metadata
resp, err := model.Last(model.Generate(ctx, messages))
if err != nil {
log.Fatal(err)
}
// Access message content
fmt.Println(resp.Message.String())
// Access reasoning (for o1/o3, Gemini 2.0, Claude)
if resp.Reasoning != "" {
fmt.Println("Reasoning:", resp.Reasoning)
}
// Track token usage
if resp.Usage != nil {
fmt.Printf("Total tokens: %d (prompt: %d, completion: %d, reasoning: %d)\n",
resp.Usage.TotalTokens,
resp.Usage.PromptTokens,
resp.Usage.CompletionTokens,
resp.Usage.ReasoningTokens)
}
// Check finish reason
fmt.Println("Finish reason:", resp.FinishReason)
Collect all intermediate responses:
// Gather all responses (useful for debugging streaming)
responses, err := model.Collect(model.Generate(ctx, messages))
if err != nil {
log.Fatal(err)
}
for i, resp := range responses {
fmt.Printf("Chunk %d: %s\n", i, resp.Message.String())
}
When using graph streaming, agents automatically handle the iterator:
seq := compiled.Run(ctx, messages)
for event, err := range seq {
if err != nil {
log.Printf("Error: %v", err)
continue
}
if event.Node == "model" {
// Each event contains exactly one message
fmt.Print(event.Message.String())
}
}
Accessing Token Probabilities
OpenAI models support token-level probability analysis:
model := openai.NewModel(
openai.WithLogprobs(true, 5), // Request top 5 alternatives per token
)
resp, err := model.Last(model.Generate(ctx, messages))
if err != nil {
log.Fatal(err)
}
if resp.Logprobs != nil {
for _, tokenInfo := range resp.Logprobs.Content {
// Main token chosen
fmt.Printf("Token: %s, Log Probability: %.3f\n",
tokenInfo.Token, tokenInfo.Logprob)
// Alternative tokens considered
for _, alt := range tokenInfo.TopLogprobs {
fmt.Printf(" Alt: %s (%.3f)\n", alt.Token, alt.Logprob)
}
}
}
Custom models
Implement the model.Model interface to integrate custom providers using the iterator pattern:
type CustomModel struct {
client *CustomClient
}
func (m *CustomModel) Generate(ctx context.Context, messages []message.Message) iter.Seq2[*model.Response, error] {
return func(yield func(*model.Response, error) bool) {
// Convert messages to provider format
req := convertMessages(messages)
// For streaming providers, yield partial responses
stream, err := m.client.CompleteStream(ctx, req)
if err != nil {
yield(nil, err)
return
}
var totalTokens int
for chunk := range stream {
// Convert chunk to AgentMesh format
msg := message.NewAIMessageFromText(chunk.Text)
// Build response with metadata
resp := &model.Response{
Message: msg,
Reasoning: chunk.Reasoning, // If your provider supports it
FinishReason: chunk.FinishReason, // e.g., "stop", "length"
Usage: &model.UsageInfo{
PromptTokens: chunk.PromptTokens,
CompletionTokens: chunk.CompletionTokens,
TotalTokens: chunk.TotalTokens,
},
}
// Yield response; if false returned, stop streaming
if !yield(resp, nil) {
return
}
}
// For non-streaming providers, yield single final response
// resp, err := m.client.Complete(ctx, req)
// if err != nil {
// yield(nil, err)
// return
// }
//
// yield(&model.Response{
// Message: message.NewAIMessage(message.NewTextPart(resp.Text)),
// Usage: &model.UsageInfo{
// PromptTokens: resp.Usage.PromptTokens,
// CompletionTokens: resp.Usage.CompletionTokens,
// TotalTokens: resp.Usage.TotalTokens,
// },
// FinishReason: resp.FinishReason,
// }, nil)
}
}
// Capabilities reports what this model supports
func (m *CustomModel) Capabilities() model.Capabilities {
return model.Capabilities{
Streaming: true,
Tools: true, // Set based on provider support
Vision: false,
}
}
Use your custom model like any other:
model := &CustomModel{client: myClient}
compiled, err := agent.NewReAct(model, tools)
The iterator pattern automatically supports both streaming and blocking modes through model.Last() and model.Collect() helpers.
Key Points for Custom Implementations:
- Return
iter.Seq2[*model.Response, error](note the pointer) - Populate
Usagefield for token tracking and cost monitoring - Set
FinishReasonto indicate why generation stopped - Set
Reasoningfield if your provider exposes internal reasoning - Use
Metadatamap for provider-specific information - Yield
nilerror for successful chunks, non-nil error to stop iteration
Model Routing
AgentMesh provides a flexible model routing system to intelligently select models based on query characteristics, capabilities, and availability.
Router Interface
Routers implement a simple interface that selects the best model for a request:
type Router interface {
Route(ctx context.Context, req *Request) (Model, error)
}
RoutedModel Wrapper
Wrap any router to make it transparent as a Model:
router := model.NewCostBasedRouter(cheapModel, expensiveModel)
routedModel := model.NewRoutedModel(router)
// Use like any other model
agent, _ := agent.NewReAct(routedModel, tools)
Cost-Based Routing
Route simple queries to cheaper models and complex queries to premium models:
cheapModel := openai.NewModel(openai.WithModel("gpt-4o-mini"))
premiumModel := openai.NewModel(openai.WithModel("gpt-4o"))
router := model.NewCostBasedRouter(cheapModel, premiumModel,
model.WithComplexityThreshold(0.5), // 0.0-1.0 scale
)
// Simple queries → gpt-4o-mini
// Complex queries → gpt-4o
The built-in HeuristicEstimator considers:
- Query length and word count
- Complexity keywords (“analyze”, “compare”, “explain why”)
- Multi-turn conversation context
- Tool binding presence
Capability-Based Routing
Automatically route requests to models with required capabilities:
textModel := openai.NewModel(openai.WithModel("gpt-4o-mini"))
visionModel := openai.NewModel(openai.WithModel("gpt-4o"))
router := model.NewCapabilityRouter(
model.WithCapabilityModel(textModel),
model.WithCapabilityModel(visionModel),
)
// Text-only requests → gpt-4o-mini (cheaper)
// Requests with images → gpt-4o (has Vision capability)
Fallback Routing
Build resilient pipelines with circuit breaker pattern:
primary := openai.NewModel(openai.WithModel("gpt-4o"))
backup := anthropic.NewModel(anthropic.WithModel("claude-3-5-sonnet"))
router := model.NewFallbackRouter(primary, backup,
model.WithFailureThreshold(5), // Open circuit after 5 failures
model.WithResetTimeout(30*time.Second), // Try primary again after 30s
)
// Automatic failover with health tracking
Circuit breaker states:
- Closed: All requests go to primary
- Open: All requests go to fallback (after threshold failures)
- Half-Open: Probe primary with single request to test recovery
Composite Routing
Chain multiple routing strategies:
// First check capabilities, then apply cost optimization
capabilityRouter := model.NewCapabilityRouter(textModel, visionModel)
costRouter := model.NewCostBasedRouter(cheapModel, expensiveModel)
composite := model.NewCompositeRouter(capabilityRouter, costRouter)
Conditional Routing
Route based on custom logic:
router := model.NewConditionalRouter(func(ctx context.Context, req *model.Request) Model {
// Check metadata for routing hints
if priority, ok := req.Metadata["priority"].(string); ok && priority == "high" {
return premiumModel
}
return standardModel
})
Weighted Routing
Distribute load across models:
router := model.NewWeightedRouter(
model.WeightedModel{Model: modelA, Weight: 70}, // 70% traffic
model.WeightedModel{Model: modelB, Weight: 30}, // 30% traffic
)
Complete Example
package main
import (
"context"
"log"
"time"
"github.com/hupe1980/agentmesh/pkg/agent"
"github.com/hupe1980/agentmesh/pkg/model"
"github.com/hupe1980/agentmesh/pkg/model/openai"
)
func main() {
ctx := context.Background()
// Create models
mini := openai.NewModel(openai.WithModel("gpt-4o-mini"))
full := openai.NewModel(openai.WithModel("gpt-4o"))
// Build routing chain: cost → fallback → model
costRouter := model.NewCostBasedRouter(mini, full,
model.WithComplexityThreshold(0.5),
)
resilientRouter := model.NewFallbackRouter(
model.NewRoutedModel(costRouter),
full, // Always fallback to full model
model.WithFailureThreshold(3),
model.WithResetTimeout(time.Minute),
)
// Use routed model transparently
routedModel := model.NewRoutedModel(resilientRouter)
agent, _ := agent.NewReAct(routedModel, tools)
// Execute - routing happens automatically
for result, err := range agent.Run(ctx, messages) {
// ...
}
}
See the examples/model_router directory for a complete working example.
Related
- Middleware System - Extend model execution with caching, retries, rate limiting, and token counting
- Agents - Build agents using models with tool calling
- Streaming - Real-time response streaming