Model Context Protocol (MCP)
Model Context Protocol (MCP) is an open standard for wiring assistants to external data sources, automation, and services without baking those integrations into your agent. MCP servers expose tools over a shared protocol, while clients (like this agent library) negotiate capabilities and relay tool calls. The MCP integration lets each run session attach to an MCP server, discover the tools it offers, and route tool invocations through the protocol using per-user credentials or transports.
Under the hood this integration is just an implementation of the Toolkit
primitive, so it shares the same lifecycle and composition rules as your other toolkits. The MCP connection opens during session creation and closes when the session ends.
Configuration types
Section titled “Configuration types”type MCPInit<TContext> = | MCPParams | ((context: TContext) => MCPParams | Promise<MCPParams>);
type MCPParams = MCPStdioParams | MCPStreamableHTTPParams;
interface MCPStdioParams { type: "stdio"; // Executable that implements the MCP server. command: string; // Optional arguments passed to the command. args?: string[];}
interface MCPStreamableHTTPParams { type: "streamable-http"; // Base URL for the MCP server. url: string; // Authorization header value; OAuth2 flows are not handled automatically so // callers must provide a token when required. authorization?: string;}
pub enum MCPInit<TCtx>where TCtx: Send + Sync + 'static,{ Params(MCPParams), Func(Arc<dyn Fn(&TCtx) -> Result<MCPParams, BoxedError> + Send + Sync>), AsyncFunc( Arc<dyn Fn(&TCtx) -> BoxFuture<'static, Result<MCPParams, BoxedError>> + Send + Sync>, ),}
pub enum MCPParams { Stdio(MCPStdioParams), StreamableHttp(MCPStreamableHTTPParams),}
pub struct MCPStdioParams { /// Executable to spawn (e.g. "uvx"). pub command: String, /// Optional arguments passed to the command. #[serde(default, skip_serializing_if = "Vec::is_empty")] pub args: Vec<String>,}
pub struct MCPStreamableHTTPParams { /// Base URL for the MCP server. pub url: String, /// Authorization header value when required. OAuth flows are not automated, /// so supply a token directly. #[serde(default, skip_serializing_if = "Option::is_none")] pub authorization: Option<String>,}
type MCPInit[C any] func(ctx context.Context, contextVal C) (MCPParams, error)
type MCPParams struct { stdio *MCPStdioParams streamableHTTP *MCPStreamableHTTPParams}
type MCPStdioParams struct { // Command is the executable to launch (e.g. "uvx"). Command string `json:"command"` // Args are optional arguments passed to the command. Args []string `json:"args,omitempty"`}
type MCPStreamableHTTPParams struct { // URL is the base endpoint of the MCP server. URL string `json:"url"` // Authorization is an optional header value. OAuth flows are not handled automatically, // so callers should supply a ready-to-use token when required. Authorization string `json:"authorization,omitempty"`}
MCPInit
lets you pass static parameters or derive them from the per-run context. Use synchronous or asynchronous resolvers to fetch tokens, endpoints, or feature flags on demand.MCPParams
chooses the transport (stdio
for local processes,streamable-http
for hosted servers).authorization
is forwarded as provided. Supply ready-to-use credentials because the library does not negotiate OAuth flows on your behalf.
Context-aware setup
Section titled “Context-aware setup”This library is typically deployed as a single agent service that serves many users. The agent itself stays stateless; the run context
carries caller-specific identifiers so Toolkit.create_session
can resolve per-session details before the conversation starts. Use that resolver to look up OAuth tokens, choose the correct MCP endpoint, or apply tenant-level feature flags. It keeps the agent reusable while still routing each session through the appropriate integration, matching the lifecycle in Agent vs Run session.
The snippets below show an init function that loads an OAuth token from storage based on the user ID in context and returns MCP parameters for that session.
import { Agent } from "@hoangvvo/llm-agent";import { mcpToolkit, type MCPParams } from "@hoangvvo/llm-agent/mcp";
interface SessionContext {tenantId: string;userId: string;}
async function resolveMcpParams(context: SessionContext): Promise<MCPParams> {const token = await fetchMcpTokenForUser(context.userId);const url = await resolveMcpEndpointForTenant(context.tenantId);return { type: "streamable-http", url, authorization: token,};}
const agent = new Agent<SessionContext>({name: "Transit Concierge",model,toolkits: [mcpToolkit(resolveMcpParams)],});
use llm_agent::mcp::{MCPInit, MCPParams, MCPStreamableHTTPParams, MCPToolkit};
let toolkit = MCPToolkit::new(MCPInit::from_async_fn(|context: &SessionContext| async move { let token = fetch_mcp_token_for_user(&context.user_id).await?; let url = resolve_mcp_endpoint_for_tenant(&context.tenant_id).await?; Ok(MCPParams::StreamableHttp(MCPStreamableHTTPParams { url, authorization: Some(token), }))}));
let agent = Agent::builder("Transit Concierge", model) .add_toolkit(toolkit) .build();
toolkit := llmmcp.NewMCPToolkit[*SessionContext](func(ctx context.Context, data *SessionContext) (llmmcp.MCPParams, error) { if data == nil { return llmmcp.MCPParams{}, fmt.Errorf("session context missing") }
token, err := fetchMCPTokenForUser(ctx, data.UserID) if err != nil { return llmmcp.MCPParams{}, err }
url, err := resolveMCPEndpointForTenant(ctx, data.TenantID) if err != nil { return llmmcp.MCPParams{}, err }
return llmmcp.NewMCPStreamableHTTPParams(url, token), nil})
agent := llmagent.NewAgent[*SessionContext]( "Transit Concierge", model, llmagent.WithToolkits(toolkit),)
Tool lifecycle
Section titled “Tool lifecycle”- At session start the toolkit calls
list_tools
and subscribes totool_list_changed
, so updates published by the server flow back automatically. - Each remote definition becomes an
AgentTool
, letting the agent runtime evaluate tool selection and error handling exactly the same way it does for local tools. - Tool responses are converted into SDK parts (text, image, audio) before being appended to the transcript.
Example: Streamable HTTP server
Section titled “Example: Streamable HTTP server”The full examples below stand up a minimal shuttle-planning MCP server, register it through the toolkit, and run a single conversation turn against it.
import { Agent } from "@hoangvvo/llm-agent";import { mcpToolkit } from "@hoangvvo/llm-agent/mcp";import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";import { randomUUID } from "node:crypto";import { once } from "node:events";import { createServer, type IncomingMessage } from "node:http";import { z } from "zod";
import { getModel } from "./get-model.ts";
// This example demonstrates:// 1. Launching a minimal streamable HTTP MCP server using the official TypeScript SDK.// 2. Registering that server through the MCP toolkit primitive.// 3. Having the agent call the remote tool during a conversation.
const PORT = 39813;const SERVER_URL = `http://127.0.0.1:${PORT}`;const AUTH_TOKEN = "transit-hub-secret";
interface SessionContext { riderName: string; authorization: string;}
async function main(): Promise<void> { const stopServer = await startStubMcpServer(); try { const model = getModel("openai", "gpt-4o-mini");
const agent = new Agent<SessionContext>({ name: "Sage", model, instructions: [ "You are Sage, the shuttle concierge for the Transit Hub.", "Lean on connected transit systems before guessing, and tailor advice to the rider's shift.", (context) => `You are assisting ${context.riderName} with tonight's shuttle planning.`, ], // The MCP toolkit primitive resolves transport params per session. Here we pull the rider-specific // authorization token from context so each agent session connects with the correct credentials. toolkits: [ mcpToolkit((context) => ({ type: "streamable-http", url: SERVER_URL, authorization: context.authorization, })), ], });
const session = await agent.createSession({ riderName: "Avery", authorization: AUTH_TOKEN, }); try { const turn = await session.run({ input: [ { type: "message", role: "user", content: [ { type: "text", text: "What shuttles are running tonight?" }, ], }, ], });
console.log("=== Agent Response ==="); const replyText = turn.content .filter( (part): part is { type: "text"; text: string } => part.type === "text", ) .map((part) => part.text) .join("\n"); console.log(replyText || JSON.stringify(turn.content, null, 2)); } finally { await session.close(); } } finally { await stopServer(); }}
await main().catch((error) => { console.error(error); process.exitCode = 1;});
function createShuttleServer(): McpServer { const server = new McpServer({ name: "shuttle-scheduler", version: "1.0.0", });
server.registerTool( "list_shuttles", { description: "List active shuttle routes for the selected shift", inputSchema: { shift: z .enum(["evening", "overnight"]) .describe( "Which operating window to query. OpenAI requires `additionalProperties: false` and every property listed in `required`, so this schema keeps a single required field.", ), }, }, async ({ shift }) => ({ content: [ { type: "text", text: shift === "overnight" ? "Harbor Express and Dawn Flyer are staged for the overnight shift." : "Midnight Loop and Harbor Express are on duty tonight.", }, ], }), );
return server;}
function isAuthorized(req: IncomingMessage): boolean { const header = req.headers.authorization; return typeof header === "string" && header === `Bearer ${AUTH_TOKEN}`;}
async function startStubMcpServer(): Promise<() => Promise<void>> { const server = createShuttleServer(); const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID(), enableJsonResponse: true, });
await server.connect(transport);
const httpServer = createServer((req, res) => { if (req.url === "/status" && req.method === "GET") { res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ status: "ok" })); return; }
if (!isAuthorized(req)) { res.writeHead(401, { "Content-Type": "application/json" }); res.end( JSON.stringify({ error: "unauthorized", message: "Provide the shuttle access token.", }), ); return; }
if (req.url === "/" && req.method === "POST") { const chunks: Buffer[] = []; req.on("data", (chunk) => chunks.push(chunk as Buffer)); req.on("end", async () => { const body = Buffer.concat(chunks); await transport.handleRequest(req, res, JSON.parse(body.toString())); }); return; }
res.writeHead(404); res.end(); });
httpServer.listen(PORT); await once(httpServer, "listening");
return async () => { await new Promise<void>((resolve, reject) => { httpServer.close((err) => { if (err) reject(err); else resolve(); }); }); };}
use axum::{ extract::Request, http::{header, HeaderMap, Method, StatusCode}, middleware::{self, Next}, response::{IntoResponse, Response}, routing::get, Json, Router,};use dotenvy::dotenv;use llm_agent::{ mcp::{MCPParams, MCPStreamableHTTPParams, MCPToolkit}, Agent, AgentItem, AgentRequest, BoxedError,};use llm_sdk::{ openai::{OpenAIModel, OpenAIModelOptions}, Message, Part,};use rmcp::{ handler::server::{router::tool::ToolRouter, wrapper::Parameters}, model::{ServerCapabilities, ServerInfo}, schemars, tool, tool_handler, tool_router, transport::streamable_http_server::{ session::local::LocalSessionManager, tower::{StreamableHttpServerConfig, StreamableHttpService}, }, ServerHandler,};use serde::Deserialize;use serde_json::json;use std::{ env, io::{Error as IoError, ErrorKind}, sync::Arc, time::Duration,};use tokio::{net::TcpListener, sync::oneshot, task::JoinHandle, time::sleep};
// This example demonstrates:// 1. Launching a minimal streamable HTTP MCP server using the official Rust// SDK.// 2. Registering that server through the MCP toolkit primitive.// 3. Having the agent call the remote tool during a conversation.
const SERVER_ADDR: &str = "127.0.0.1:39811";const SERVER_URL: &str = "http://127.0.0.1:39811";const AUTH_TOKEN: &str = "transit-hub-secret";
#[derive(Clone)]struct SessionContext { rider_name: String, authorization: String,}
#[tokio::main]async fn main() -> Result<(), BoxedError> { dotenv().ok();
let server = start_stub_mcp_server().await?; let run_result = run_agent_demo().await; server.shutdown().await?; run_result}
async fn run_agent_demo() -> Result<(), BoxedError> { let api_key = env::var("OPENAI_API_KEY").map_err(|_| missing_env("OPENAI_API_KEY"))?;
let model = Arc::new(OpenAIModel::new( "gpt-4o-mini", OpenAIModelOptions { api_key, ..Default::default() }, ));
let agent = Agent::<SessionContext>::builder("Sage", model) .add_instruction("You are Sage, the shuttle concierge for the Transit Hub.") .add_instruction( "Lean on connected transit systems before guessing, and tailor advice to the rider's \ shift.", ) .add_instruction(|context: &SessionContext| { Ok(format!( "You are assisting {} with tonight's shuttle planning.", context.rider_name )) }) // The MCP toolkit primitive resolves transport params per session. Here we pull the // rider-specific authorization token from context so each agent session connects // with the correct credentials. .add_toolkit(MCPToolkit::new(|context: &SessionContext| { Ok(MCPParams::StreamableHttp(MCPStreamableHTTPParams { url: SERVER_URL.to_string(), authorization: Some(context.authorization.clone()), })) })) .build();
let request = AgentRequest { context: SessionContext { rider_name: "Avery".to_string(), authorization: AUTH_TOKEN.to_string(), }, input: vec![AgentItem::Message(Message::user(vec![Part::text( "What shuttles are running tonight?", )]))], };
let response = agent .run(request) .await .map_err(|err| Box::new(err) as BoxedError)?;
println!("=== Agent Response ==="); let reply = response.text(); if reply.is_empty() { println!("{:?}", response.content); } else { println!("{reply}"); }
Ok(())}
async fn start_stub_mcp_server() -> Result<ServerGuard, BoxedError> { let session_manager = Arc::new(LocalSessionManager::default()); let service: StreamableHttpService<ShuttleServer, _> = StreamableHttpService::new( || Ok(ShuttleServer::default()), session_manager, StreamableHttpServerConfig::default(), );
let app = Router::new() .route("/status", get(server_status)) .fallback_service(service) .layer(middleware::from_fn(authenticate));
let listener = TcpListener::bind(SERVER_ADDR) .await .map_err(|err| Box::new(err) as BoxedError)?; let (shutdown_tx, shutdown_rx) = oneshot::channel();
let handle = tokio::spawn(async move { let server = axum::serve(listener, app).with_graceful_shutdown(async { let _ = shutdown_rx.await; });
if let Err(err) = server.await { eprintln!("MCP server error: {err}"); } });
sleep(Duration::from_millis(200)).await;
Ok(ServerGuard { shutdown: Some(shutdown_tx), handle, })}
struct ShuttleServer { tool_router: ToolRouter<Self>,}
impl ShuttleServer { fn new() -> Self { Self { tool_router: Self::tool_router(), } }}
impl Default for ShuttleServer { fn default() -> Self { Self::new() }}
#[derive(Debug, Deserialize, schemars::JsonSchema)]#[serde(deny_unknown_fields)]struct ListShuttlesArgs { #[schemars(description = "Operating window to query")] shift: Shift,}
#[derive(Debug, Deserialize, schemars::JsonSchema)]#[serde(rename_all = "lowercase")]#[schemars(inline)]enum Shift { Evening, Overnight,}
#[tool_router]impl ShuttleServer { #[tool(description = "List active shuttle routes for the selected shift")] fn list_shuttles(&self, Parameters(args): Parameters<ListShuttlesArgs>) -> String { let _ = &self.tool_router; match args.shift { Shift::Evening => "Midnight Loop and Harbor Express are on duty tonight.".into(), Shift::Overnight => { "Harbor Express and Dawn Flyer are staged for the overnight shift.".into() } } }}
#[tool_handler(router = self.tool_router)]impl ServerHandler for ShuttleServer { fn get_info(&self) -> ServerInfo { ServerInfo { capabilities: ServerCapabilities::builder().enable_tools().build(), server_info: rmcp::model::Implementation { name: "shuttle-scheduler".into(), title: Some("Transit hub shuttle coordinator".into()), ..Default::default() }, instructions: Some( "Authenticate with the shuttle control token before calling tools.".into(), ), ..Default::default() } }}
async fn server_status() -> Json<serde_json::Value> { Json(json!({ "status": "ok" }))}
async fn authenticate(req: Request, next: Next) -> Result<Response, Response> { let method = req.method().clone(); let is_status = req.uri().path() == "/status"; let authorized = has_valid_token(req.headers(), &method); if is_status || authorized { Ok(next.run(req).await) } else { Err(( StatusCode::UNAUTHORIZED, Json(json!({ "error": "unauthorized", "message": "Provide the shuttle access token.", })), ) .into_response()) }}
fn has_valid_token(headers: &HeaderMap, method: &Method) -> bool { if let Some(token) = headers .get(header::AUTHORIZATION) .and_then(|value| value.to_str().ok()) { if token.trim() == format!("Bearer {AUTH_TOKEN}") { return true; } }
matches!(method, &Method::GET | &Method::DELETE) && headers.contains_key("mcp-session-id")}
struct ServerGuard { shutdown: Option<oneshot::Sender<()>>, handle: JoinHandle<()>,}
impl ServerGuard { async fn shutdown(self) -> Result<(), BoxedError> { if let Some(tx) = self.shutdown { let _ = tx.send(()); }
match self.handle.await { Ok(()) => Ok(()), Err(err) => Err(Box::new(err) as BoxedError), } }}
fn missing_env(var: &str) -> BoxedError { Box::new(IoError::new( ErrorKind::NotFound, format!("{var} environment variable must be set"), ))}
package main
import ( "context" "encoding/json" "errors" "fmt" "log" "net" "net/http" "os" "time"
llmagent "github.com/hoangvvo/llm-sdk/agent-go" llmmcp "github.com/hoangvvo/llm-sdk/agent-go/mcp" llmsdk "github.com/hoangvvo/llm-sdk/sdk-go" "github.com/hoangvvo/llm-sdk/sdk-go/openai" "github.com/joho/godotenv" "github.com/modelcontextprotocol/go-sdk/mcp")
// This example demonstrates:// 1. Launching a minimal streamable HTTP MCP server using the official Go SDK.// 2. Registering that server through the MCP toolkit primitive.// 3. Having the agent call the remote tool during a conversation.
const ( listenerAddr = "127.0.0.1:39812" serverURL = "http://" + listenerAddr authToken = "transit-hub-secret")
func main() { godotenv.Load("../.env")
apiKey := os.Getenv("OPENAI_API_KEY") if apiKey == "" { log.Fatal("OPENAI_API_KEY is required") }
stopServer := startStubMCPServer() defer stopServer()
model := openai.NewOpenAIModel("gpt-4o-mini", openai.OpenAIModelOptions{APIKey: apiKey})
// Build the agent and register the MCP toolkit so every run hydrates tools from the remote server. agent := llmagent.NewAgent[*sessionContext]( "Sage", model, llmagent.WithInstructions( llmagent.InstructionParam[*sessionContext]{String: stringPtr("You are Sage, the shuttle concierge for the Transit Hub.")}, llmagent.InstructionParam[*sessionContext]{String: stringPtr("Lean on connected transit systems before guessing, and tailor advice to the rider's shift.")}, llmagent.InstructionParam[*sessionContext]{Func: func(ctx context.Context, sc *sessionContext) (string, error) { if sc == nil { return "", fmt.Errorf("session context missing") } return fmt.Sprintf("You are assisting %s with tonight's shuttle planning.", sc.RiderName), nil }}, ), llmagent.WithToolkits( // The MCP toolkit primitive resolves transport params per session. Here we pull the rider-specific // authorization token from context so each agent session connects with the correct credentials. llmmcp.NewMCPToolkit[*sessionContext](func(_ context.Context, sc *sessionContext) (llmmcp.MCPParams, error) { if sc == nil { return llmmcp.MCPParams{}, fmt.Errorf("session context missing") } return llmmcp.NewMCPStreamableHTTPParams(serverURL, sc.Authorization), nil }), ), )
ctx := context.Background()
session, err := agent.CreateSession(ctx, &sessionContext{ RiderName: "Avery", Authorization: authToken, }) if err != nil { log.Fatalf("create session: %v", err) } defer func() { if cerr := session.Close(ctx); cerr != nil { log.Printf("session close: %v", cerr) } }()
transcript := []llmagent.AgentItem{ llmagent.NewAgentItemMessage(llmsdk.NewUserMessage(llmsdk.NewTextPart("What shuttles are running tonight?"))), }
response, err := session.Run(ctx, llmagent.RunSessionRequest{Input: transcript}) if err != nil { log.Fatalf("run: %v", err) }
fmt.Println("=== Agent Response ===") fmt.Println(response.Text())}
// sessionContext illustrates how MCP params can depend on the agent context.type sessionContext struct { RiderName string Authorization string}
func stringPtr(s string) *string { return &s }
// startStubMCPServer launches a minimal streamable HTTP MCP server using the official SDK.func startStubMCPServer() func() { handler := mcp.NewStreamableHTTPHandler(func(*http.Request) *mcp.Server { server := mcp.NewServer(&mcp.Implementation{ Name: "shuttle-scheduler", Version: "1.0.0", }, nil)
type listArgs struct { Shift string `json:"shift" jsonschema_description:"Operating window to query" jsonschema_enum:"evening,overnight"` }
mcp.AddTool(server, &mcp.Tool{ Name: "list_shuttles", Description: "List active shuttle routes for the selected shift", }, func(ctx context.Context, _ *mcp.CallToolRequest, args listArgs) (*mcp.CallToolResult, any, error) { _ = ctx summary := "Midnight Loop and Harbor Express are on duty tonight." if args.Shift == "overnight" { summary = "Harbor Express and Dawn Flyer are staged for the overnight shift." } return &mcp.CallToolResult{ Content: []mcp.Content{ &mcp.TextContent{Text: summary}, }, }, nil, nil })
return server }, &mcp.StreamableHTTPOptions{Stateless: true})
mux := http.NewServeMux() mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) }) mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if !hasValidToken(r) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusUnauthorized) _ = json.NewEncoder(w).Encode(map[string]string{ "error": "unauthorized", "message": "Provide the shuttle access token.", }) return } handler.ServeHTTP(w, r) }))
srv := &http.Server{Addr: listenerAddr, Handler: mux} ln, err := net.Listen("tcp", listenerAddr) if err != nil { log.Fatalf("listen: %v", err) }
go func() { if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) { log.Printf("mcp server: %v", err) } }()
time.Sleep(200 * time.Millisecond)
return func() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() _ = srv.Shutdown(ctx) }}
func hasValidToken(r *http.Request) bool { return r.Header.Get("Authorization") == "Bearer "+authToken}
When you create a run session manually, call its close
method after the conversation so the MCP connection and any spawned processes shut down cleanly. The one-shot run
and run_stream
helpers already manage that lifecycle for you; see Agent vs Run session for a refresher.