Go 1.24+ GitHub

Connect language models

Integrate OpenAI, Anthropic, Gemini, Ollama, LangChainGo, or custom providers with consistent streaming and tool calling.

Overview

AgentMesh abstracts language models behind a common model.Model interface that uses Go 1.24+ iterators for unified streaming:

type Model interface {
    Generate(ctx context.Context, req *Request) iter.Seq2[*Response, error]
    Capabilities() Capabilities
}

The Request struct bundles messages, tools, instructions, and other options:

type Request struct {
    Messages     []message.Message  // Conversation history
    Tools        []tool.Tool        // Available tools for function calling
    Instructions string             // Per-request instructions
    OutputSchema *schema.OutputSchema // Structured output schema
    Stream       bool               // Enable streaming mode
    Metadata     map[string]any     // Provider-specific options
}

The iterator-based API unifies streaming and blocking modes:

  • Streaming: Iterate over partial responses as they arrive
  • Blocking: Use model.Last() to get only the final response
  • Batch collection: Use model.Collect() to gather all responses

Response Structure

Models return a *model.Response with rich metadata:

type Response struct {
    Message      message.Message // The actual message content
    Reasoning    string          // Native reasoning (o1/o3, Gemini 2.0, Claude)
    FinishReason string          // Why generation stopped
    Logprobs     *Logprobs       // Token probabilities (OpenAI)
    Usage        *UsageInfo      // Token consumption tracking
    Metadata     map[string]any  // Provider-specific metadata
    Partial      bool            // true for streaming chunks, false for final
}

type UsageInfo struct {
    PromptTokens     int // Input tokens
    CompletionTokens int // Output tokens
    ReasoningTokens  int // Reasoning tokens (o1/o3)
    TotalTokens      int // Sum of all tokens
}

Model Capabilities

All models expose their features via Capabilities():

caps := model.Capabilities()

// Discover what the model supports
if caps.Tools {
    // Can pass tools in Request.Tools
}
if caps.NativeReasoning {
    // Response.Reasoning will be populated
}
if caps.Vision {
    // Can send message.FilePart with image MIME type
}

type Capabilities struct {
    Streaming           bool     // Supports incremental responses
    Tools               bool     // Supports function calling
    StructuredOutput    bool     // Supports JSON schema
    NativeReasoning     bool     // Exposes internal reasoning
    Logprobs            bool     // Provides token probabilities
    Vision              bool     // Accepts images
    Audio               bool     // Accepts audio
    MaxContextTokens    int      // Context window size
    MaxOutputTokens     int      // Max generation length
    SupportedModalities []string // Input types: "text", "image", "audio"
}

Checking Capabilities

Always check Capabilities() before using features to ensure the model supports them:

caps := model.Capabilities()

if caps.Tools {
    // Safe to pass tools in Request.Tools
}

if caps.Vision {
    // Safe to include image parts in messages
}

Available models

OpenAI

The OpenAI adapter wraps the official openai-go SDK for Chat Completions:

import (
    "github.com/hupe1980/agentmesh/pkg/agent"
    "github.com/hupe1980/agentmesh/pkg/model/openai"
)

model := openai.NewModel(
    openai.WithModel("gpt-4o"),
    openai.WithTemperature(0.7),
    openai.WithMaxCompletionTokens(1000),
)

compiled, err := agent.NewReAct(model, tools)

Configuration options:

openai.NewModel(
    openai.WithModel("gpt-4o-mini"),           // Model name
    openai.WithTemperature(0.2),               // Randomness (0-2)
    openai.WithMaxCompletionTokens(500),       // Max output tokens
)

The adapter supports:

  • ✅ Streaming responses
  • ✅ Function calling via Request.Tools
  • ✅ Parallel tool calls
  • ✅ Vision models (pass message.FilePart with image MIME type)

Anthropic

The Anthropic adapter integrates Claude models via the official SDK:

import "github.com/hupe1980/agentmesh/pkg/model/anthropic"

model := anthropic.NewModel(
    anthropic.WithModel("claude-3-5-sonnet-20241022"),
    anthropic.WithMaxTokens(1024),
    anthropic.WithTemperature(1.0),
)

compiled, err := agent.NewReAct(model, tools)

Configuration options:

anthropic.NewModel(
    anthropic.WithModel("claude-3-5-sonnet-20241022"),
    anthropic.WithMaxTokens(2048),
    anthropic.WithTemperature(0.5),
    anthropic.WithAPIKey("your-api-key"), // Optional if set in env
)

The adapter supports:

  • ✅ Streaming responses
  • ✅ Function calling via Request.Tools
  • ✅ Vision models
  • ✅ System prompts

Gemini

The Gemini adapter integrates Google’s Gemini models via the official SDK:

import "github.com/hupe1980/agentmesh/pkg/model/gemini"

model, err := gemini.NewModel(ctx,
    gemini.WithModel("gemini-2.0-flash-exp"),
    gemini.WithMaxOutputTokens(4096),
    gemini.WithTemperature(0.7),
)
if err != nil {
    log.Fatal(err)
}

compiled, err := agent.NewReAct(model, tools)

Configuration options:

gemini.NewModel(ctx,
    gemini.WithModel("gemini-2.0-flash-exp"), // Model name
    gemini.WithMaxOutputTokens(4096),         // Max output tokens
    gemini.WithTemperature(0.7),              // Randomness (0-1)
    gemini.WithTopP(0.95),                    // Nucleus sampling
    gemini.WithTopK(40),                      // Top-k sampling
    gemini.WithAPIKey("your-api-key"),        // Optional if set in env
)

The adapter supports:

  • ✅ Streaming responses
  • ✅ Function calling via Request.Tools
  • ✅ Vision models (multimodal)
  • ✅ Native reasoning (Gemini 2.0)

Ollama

The Ollama adapter enables local model execution with no API keys or cloud dependencies:

import "github.com/hupe1980/agentmesh/pkg/model/ollama"

// Connect to local Ollama instance (default: http://localhost:11434)
model := ollama.NewModel(
    ollama.WithModel("llama3.2"),
    ollama.WithTemperature(0.7),
    ollama.WithNumPredict(1000),
)

compiled, err := agent.NewReAct(model, tools)

Configuration options:

ollama.NewModel(
    ollama.WithModel("llama3.2"),      // Model name (llama3.2, mistral, codellama, etc.)
    ollama.WithTemperature(0.7),       // Randomness (0-2)
    ollama.WithNumPredict(1000),       // Max tokens (-1 = unlimited)
    ollama.WithTopK(40),                // Top-k sampling
    ollama.WithTopP(0.9),               // Nucleus sampling
    ollama.WithSeed(42),                // Reproducible output
)

The adapter supports:

  • ✅ Streaming responses
  • ✅ Function calling via Request.Tools
  • ✅ Local execution (no API keys needed)
  • ✅ Any Ollama-compatible model (llama3.2, mistral, codellama, gemma, phi, etc.)
  • ❌ Structured output (JSON schema)
  • ❌ Vision (model-dependent, e.g., llava)

Prerequisites:

  1. Install Ollama: https://ollama.ai
  2. Pull a model: ollama pull llama3.2
  3. Start the server: ollama serve

Custom Ollama host:

import "github.com/ollama/ollama/api"

client, _ := api.ClientFromEnvironment()
// Or: client := api.NewClient("http://remote-host:11434", nil)

model, _ := ollama.NewModelFromClient(client,
    ollama.WithModel("llama3.2"),
)

Popular Ollama models:

  • llama3.2 - Meta’s latest LLaMA (default)
  • mistral - Mistral AI’s efficient 7B model
  • codellama - Specialized for code generation
  • gemma - Google’s open Gemma model
  • phi - Microsoft’s compact Phi model
  • llava - Multimodal vision model

See Ollama model library for the full list.

LangChainGo

Wrap any LangChainGo LLM to reuse existing integrations:

import (
    "github.com/hupe1980/agentmesh/pkg/model/langchaingo"
    "github.com/tmc/langchaingo/llms/openai"
)

llm, _ := openai.New(openai.WithModel("gpt-4"))
model, _ := langchaingo.NewModel(llm)

compiled, err := agent.NewReAct(model, tools)

This adapter enables:

  • Integration with LangChainGo’s 50+ model providers
  • Reuse of existing LangChainGo configurations
  • Gradual migration from LangChainGo to AgentMesh

Amazon Bedrock

The Amazon Bedrock adapter integrates foundation models from multiple providers (Anthropic Claude, Meta Llama, Amazon Nova, Mistral, etc.) via the AWS SDK’s Converse API:

import (
    "context"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/bedrockruntime"
    "github.com/hupe1980/agentmesh/pkg/model/amazonbedrock"
)

cfg, _ := config.LoadDefaultConfig(context.Background())
client := bedrockruntime.NewFromConfig(cfg)

model := amazonbedrock.NewModel(client,
    amazonbedrock.WithModelID("eu.amazon.nova-pro-v1:0"),
    amazonbedrock.WithTemperature(0.7),
    amazonbedrock.WithMaxTokens(1024),
)

compiled, err := agent.NewReAct(model, tools)

Configuration options:

amazonbedrock.NewModel(client,
    amazonbedrock.WithModelID("eu.amazon.nova-pro-v1:0"), // Model or inference profile ID
    amazonbedrock.WithTemperature(0.7),                    // Randomness (0-1)
    amazonbedrock.WithMaxTokens(2048),                     // Max output tokens
    amazonbedrock.WithTopP(0.9),                           // Nucleus sampling
)

Cross-Region Inference Profiles (recommended for production):

Bedrock supports inference profiles that provide automatic cross-region load balancing. Use the region prefix (us., eu., etc.) with the model ID:

// EU region inference profiles
model := amazonbedrock.NewModel(client,
    amazonbedrock.WithModelID("eu.amazon.nova-pro-v1:0"),
)

// US region inference profiles
model := amazonbedrock.NewModel(client,
    amazonbedrock.WithModelID("us.anthropic.claude-3-5-sonnet-20240620-v1:0"),
)

Common inference profile IDs:

  • eu.amazon.nova-pro-v1:0 / us.amazon.nova-pro-v1:0 (Nova Pro with tools & vision)
  • eu.amazon.nova-lite-v1:0 / us.amazon.nova-lite-v1:0 (Nova Lite with tools & vision)
  • eu.anthropic.claude-3-5-sonnet-20240620-v1:0 (Claude 3.5 Sonnet)
  • eu.anthropic.claude-3-haiku-20240307-v1:0 (Claude 3 Haiku)
  • eu.meta.llama3-2-3b-instruct-v1:0 (Llama 3.2)

To list available inference profiles in your account:

aws bedrock list-inference-profiles --query "inferenceProfileSummaries[].inferenceProfileId"

The adapter supports:

  • ✅ Streaming responses (via ConverseStream API)
  • ✅ Function calling via Request.Tools
  • ✅ Vision models (Claude, Nova Pro/Lite)
  • ✅ Multiple foundation model providers through unified API

Tool binding

Tools are passed to models via the Request.Tools field:

// Create tools
searchTool, _ := tool.NewFuncTool("search", "Search the web", searchFunc)
calcTool, _ := tool.NewFuncTool("calculator", "Perform calculations", calcFunc)

// Pass tools in request
req := &model.Request{
    Messages: messages,
    Tools:    []tool.Tool{searchTool, calcTool},
}

for resp, err := range model.Generate(ctx, req) {
    // Handle tool calls in response
}

Agent constructors handle tool binding automatically:

// Tools are passed automatically via agent options
compiled, err := agent.NewReAct(
    openai.NewModel(),
    agent.WithTools(searchTool, calcTool),
)

Streaming

All models support streaming through the unified iterator API:

// Stream model responses directly with full metadata access
for resp, err := range model.Generate(ctx, messages) {
    if err != nil {
        log.Printf("Error: %v", err)
        break
    }
    
    // Print partial content as it arrives
    fmt.Print(resp.Message.String())
    
    // Access streaming reasoning (if supported)
    if resp.Reasoning != "" {
        fmt.Printf("\n[Reasoning: %s]\n", resp.Reasoning)
    }
}

For blocking (non-streaming) mode, use model.Last():

// Get only the final response with metadata
resp, err := model.Last(model.Generate(ctx, messages))
if err != nil {
    log.Fatal(err)
}

// Access message content
fmt.Println(resp.Message.String())

// Access reasoning (for o1/o3, Gemini 2.0, Claude)
if resp.Reasoning != "" {
    fmt.Println("Reasoning:", resp.Reasoning)
}

// Track token usage
if resp.Usage != nil {
    fmt.Printf("Total tokens: %d (prompt: %d, completion: %d, reasoning: %d)\n",
        resp.Usage.TotalTokens,
        resp.Usage.PromptTokens,
        resp.Usage.CompletionTokens,
        resp.Usage.ReasoningTokens)
}

// Check finish reason
fmt.Println("Finish reason:", resp.FinishReason)

Collect all intermediate responses:

// Gather all responses (useful for debugging streaming)
responses, err := model.Collect(model.Generate(ctx, messages))
if err != nil {
    log.Fatal(err)
}

for i, resp := range responses {
    fmt.Printf("Chunk %d: %s\n", i, resp.Message.String())
}

When using graph streaming, agents automatically handle the iterator:

seq := compiled.Run(ctx, messages)
for event, err := range seq {
    if err != nil {
        log.Printf("Error: %v", err)
        continue
    }
    
    if event.Node == "model" {
        // Each event contains exactly one message
        fmt.Print(event.Message.String())
    }
}

Accessing Token Probabilities

OpenAI models support token-level probability analysis:

model := openai.NewModel(
    openai.WithLogprobs(true, 5), // Request top 5 alternatives per token
)

resp, err := model.Last(model.Generate(ctx, messages))
if err != nil {
    log.Fatal(err)
}

if resp.Logprobs != nil {
    for _, tokenInfo := range resp.Logprobs.Content {
        // Main token chosen
        fmt.Printf("Token: %s, Log Probability: %.3f\n",
            tokenInfo.Token, tokenInfo.Logprob)
        
        // Alternative tokens considered
        for _, alt := range tokenInfo.TopLogprobs {
            fmt.Printf("  Alt: %s (%.3f)\n", alt.Token, alt.Logprob)
        }
    }
}

Custom models

Implement the model.Model interface to integrate custom providers using the iterator pattern:

type CustomModel struct {
    client *CustomClient
}

func (m *CustomModel) Generate(ctx context.Context, messages []message.Message) iter.Seq2[*model.Response, error] {
    return func(yield func(*model.Response, error) bool) {
        // Convert messages to provider format
        req := convertMessages(messages)
        
        // For streaming providers, yield partial responses
        stream, err := m.client.CompleteStream(ctx, req)
        if err != nil {
            yield(nil, err)
            return
        }
        
        var totalTokens int
        for chunk := range stream {
            // Convert chunk to AgentMesh format
            msg := message.NewAIMessageFromText(chunk.Text)
            
            // Build response with metadata
            resp := &model.Response{
                Message:      msg,
                Reasoning:    chunk.Reasoning,     // If your provider supports it
                FinishReason: chunk.FinishReason,  // e.g., "stop", "length"
                Usage: &model.UsageInfo{
                    PromptTokens:     chunk.PromptTokens,
                    CompletionTokens: chunk.CompletionTokens,
                    TotalTokens:      chunk.TotalTokens,
                },
            }
            
            // Yield response; if false returned, stop streaming
            if !yield(resp, nil) {
                return
            }
        }
        
        // For non-streaming providers, yield single final response
        // resp, err := m.client.Complete(ctx, req)
        // if err != nil {
        //     yield(nil, err)
        //     return
        // }
        // 
        // yield(&model.Response{
        //     Message: message.NewAIMessage(message.NewTextPart(resp.Text)),
        //     Usage: &model.UsageInfo{
        //         PromptTokens:     resp.Usage.PromptTokens,
        //         CompletionTokens: resp.Usage.CompletionTokens,
        //         TotalTokens:      resp.Usage.TotalTokens,
        //     },
        //     FinishReason: resp.FinishReason,
        // }, nil)
    }
}

// Capabilities reports what this model supports
func (m *CustomModel) Capabilities() model.Capabilities {
    return model.Capabilities{
        Streaming: true,
        Tools:     true, // Set based on provider support
        Vision:    false,
    }
}

Use your custom model like any other:

model := &CustomModel{client: myClient}
compiled, err := agent.NewReAct(model, tools)

The iterator pattern automatically supports both streaming and blocking modes through model.Last() and model.Collect() helpers.

Key Points for Custom Implementations:

  • Return iter.Seq2[*model.Response, error] (note the pointer)
  • Populate Usage field for token tracking and cost monitoring
  • Set FinishReason to indicate why generation stopped
  • Set Reasoning field if your provider exposes internal reasoning
  • Use Metadata map for provider-specific information
  • Yield nil error for successful chunks, non-nil error to stop iteration

Model Routing

AgentMesh provides a flexible model routing system to intelligently select models based on query characteristics, capabilities, and availability.

Router Interface

Routers implement a simple interface that selects the best model for a request:

type Router interface {
    Route(ctx context.Context, req *Request) (Model, error)
}

RoutedModel Wrapper

Wrap any router to make it transparent as a Model:

router := model.NewCostBasedRouter(cheapModel, expensiveModel)
routedModel := model.NewRoutedModel(router)

// Use like any other model
agent, _ := agent.NewReAct(routedModel, tools)

Cost-Based Routing

Route simple queries to cheaper models and complex queries to premium models:

cheapModel := openai.NewModel(openai.WithModel("gpt-4o-mini"))
premiumModel := openai.NewModel(openai.WithModel("gpt-4o"))

router := model.NewCostBasedRouter(cheapModel, premiumModel,
    model.WithComplexityThreshold(0.5), // 0.0-1.0 scale
)

// Simple queries → gpt-4o-mini
// Complex queries → gpt-4o

The built-in HeuristicEstimator considers:

  • Query length and word count
  • Complexity keywords (“analyze”, “compare”, “explain why”)
  • Multi-turn conversation context
  • Tool binding presence

Capability-Based Routing

Automatically route requests to models with required capabilities:

textModel := openai.NewModel(openai.WithModel("gpt-4o-mini"))
visionModel := openai.NewModel(openai.WithModel("gpt-4o"))

router := model.NewCapabilityRouter(
    model.WithCapabilityModel(textModel),
    model.WithCapabilityModel(visionModel),
)

// Text-only requests → gpt-4o-mini (cheaper)
// Requests with images → gpt-4o (has Vision capability)

Fallback Routing

Build resilient pipelines with circuit breaker pattern:

primary := openai.NewModel(openai.WithModel("gpt-4o"))
backup := anthropic.NewModel(anthropic.WithModel("claude-3-5-sonnet"))

router := model.NewFallbackRouter(primary, backup,
    model.WithFailureThreshold(5),          // Open circuit after 5 failures
    model.WithResetTimeout(30*time.Second), // Try primary again after 30s
)

// Automatic failover with health tracking

Circuit breaker states:

  • Closed: All requests go to primary
  • Open: All requests go to fallback (after threshold failures)
  • Half-Open: Probe primary with single request to test recovery

Composite Routing

Chain multiple routing strategies:

// First check capabilities, then apply cost optimization
capabilityRouter := model.NewCapabilityRouter(textModel, visionModel)
costRouter := model.NewCostBasedRouter(cheapModel, expensiveModel)

composite := model.NewCompositeRouter(capabilityRouter, costRouter)

Conditional Routing

Route based on custom logic:

router := model.NewConditionalRouter(func(ctx context.Context, req *model.Request) Model {
    // Check metadata for routing hints
    if priority, ok := req.Metadata["priority"].(string); ok && priority == "high" {
        return premiumModel
    }
    return standardModel
})

Weighted Routing

Distribute load across models:

router := model.NewWeightedRouter(
    model.WeightedModel{Model: modelA, Weight: 70}, // 70% traffic
    model.WeightedModel{Model: modelB, Weight: 30}, // 30% traffic
)

Complete Example

package main

import (
    "context"
    "log"
    "time"

    "github.com/hupe1980/agentmesh/pkg/agent"
    "github.com/hupe1980/agentmesh/pkg/model"
    "github.com/hupe1980/agentmesh/pkg/model/openai"
)

func main() {
    ctx := context.Background()

    // Create models
    mini := openai.NewModel(openai.WithModel("gpt-4o-mini"))
    full := openai.NewModel(openai.WithModel("gpt-4o"))

    // Build routing chain: cost → fallback → model
    costRouter := model.NewCostBasedRouter(mini, full,
        model.WithComplexityThreshold(0.5),
    )
    
    resilientRouter := model.NewFallbackRouter(
        model.NewRoutedModel(costRouter),
        full, // Always fallback to full model
        model.WithFailureThreshold(3),
        model.WithResetTimeout(time.Minute),
    )

    // Use routed model transparently
    routedModel := model.NewRoutedModel(resilientRouter)
    agent, _ := agent.NewReAct(routedModel, tools)

    // Execute - routing happens automatically
    for result, err := range agent.Run(ctx, messages) {
        // ...
    }
}

See the examples/model_router directory for a complete working example.


  • Middleware System - Extend model execution with caching, retries, rate limiting, and token counting
  • Agents - Build agents using models with tool calling
  • Streaming - Real-time response streaming