Human-in-the-loop
Human-in-the-loop (HITL) adds people to the decision loop so agents can pause before risky actions, route questions to humans, and continue once someone signs off. In agentic systems this shows up everywhere:
- Compliance desks and policy reviews.
- Customer support escalations.
- Coding agents before creating, editing, or deleting files.
While the library does not have built-in HITL support, it provides the primitives to implement it yourself. One way to do this is to interrupt the agent run with an error when human input is needed, then replay the run once you have a decision.
At a high level:
- Stream the run and keep the emitted
AgentItem
history, even if an error occurs. - When a tool needs approval, throw a domain-specific error, such as
RequireApprovalError
. It halts the stream and bubbles up to your app. - Catch the error in your app, ask the human for a decision, stash that decision somewhere, and rerun the stream with the exact same transcript. Approved branches finish normally; denials surface an error result that the model can acknowledge in its reply.
This works because of the Resumability guarantee. It will reuse the same model response with the same tool calls, but the difference is that the tool execution will now succeed or fail based on the human decision.
sequenceDiagram participant Human participant App participant Agent participant LLM participant Tool Human->>App: submit request Note over App,Agent: Transcript = [user_message] App->>Agent: runStream([user_message], context) Agent->>LLM: request completion LLM-->>Agent: assistant_message(tool_call) Agent-->>App: emit assistant_message(tool_call) Note over App,Agent: Transcript = [user_message, assistant_message(tool_call)] Agent->>Tool: execute approval-gated tool Tool-->>Agent: throw RequireApprovalError Agent-->>App: bubble RequireApprovalError App->>Human: prompt for approval/denial Human-->>App: decision App->>App: persist decision in context App->>Agent: runStream([user_message, assistant_message(tool_call)], updated context) Agent->>Tool: execute with recorded decision Tool-->>Agent: tool_result (success or error) Agent-->>App: emit tool_result Agent->>LLM: request completion LLM-->>Agent: assistant_message Agent-->>App: emit assistant_message Agent-->>App: final response App-->>Human: deliver outcome
Implementation
Section titled “Implementation”The example implements this pattern using a minimal context that stores approval state in memory, a single tool that guards execution, a CLI prompt, and a loop that retries until the run finishes.
import { Agent, AgentToolExecutionError, type AgentItem, type AgentStreamResponseEvent,} from "@hoangvvo/llm-agent";import { zodTool } from "@hoangvvo/llm-agent/zod";import { stdin as input, stdout as output } from "node:process";import { createInterface } from "node:readline/promises";import { z } from "zod";import { getContentText } from "../src/utils.ts";import { getModel } from "./get-model.ts";
// Human-in-the-loop outline with agent primitives:// 1. Seed the run with a user `AgentItem` and call `Agent#runStream` so we capture// every emitted `AgentStreamEvent` (model messages, tool results, etc.).// 2. When the tool throws our user-land `RequireApprovalError`, collect the human// decision and persist it on the shared RunSession context.// 3. Repeat step (1) with the accumulated items and mutated context until the tool// succeeds or returns an error result that reflects the denial.
class RequireApprovalError extends Error { readonly artifact: string;
constructor(message: string, artifact: string) { super(message); this.name = "RequireApprovalError"; this.artifact = artifact; }}
type ApprovalStatus = "approved" | "denied";
interface VaultContext { approvals: Map<string, ApprovalStatus>;}
const vaultContext: VaultContext = { approvals: new Map(),};
// Single AgentTool that inspects the context map and interrupts the run.// Thrown errors become AgentToolExecutionError.const unlockArtifact = zodTool({ name: "unlock_artifact", description: "Unlock an artifact for release once a human supervisor has recorded their approval.", parameters: z.object({ artifact: z.string().min(1).describe("Name of the artifact to release."), }), async execute({ artifact }, ctx: VaultContext) { const artifactKey = artifact.trim().toLowerCase(); const status = ctx.approvals.get(artifactKey);
if (!status) { throw new RequireApprovalError( `Release of ${artifact} requires human approval before it can proceed.`, artifact, ); }
if (status === "denied") { return { content: [ { type: "text", text: `Release of ${artifact} remains blocked until a supervisor approves it.`, }, ], is_error: true, }; }
return { content: [ { type: "text", text: `${artifact} unlocked. Proceed with standard vault handling protocols.`, }, ], is_error: false, }; },});
const sentinel = new Agent<VaultContext>({ name: "VaultSentinel", model: getModel("openai", "gpt-4o"), instructions: [ "You supervise the Eon Vault, safeguarding experimental expedition technology.", ], tools: [unlockArtifact],});
const initialText = "We have an emergency launch window in four hours. Please unlock the Starlight Compass for the Horizon survey team.";
const allItems: AgentItem[] = [ { type: "message", role: "user", content: [ { type: "text", text: initialText, }, ], },];
console.log(`[user] ${initialText}`);
async function run(context: VaultContext): Promise<AgentStreamResponseEvent> { const stream = sentinel.runStream({ context, input: [...allItems], });
for await (const event of stream) { if (event.event === "partial") { continue; }
if (event.event === "item") { // Persist generated items so later iterations operate on the full history. allItems.push(event.item); logItem(event.item); }
if (event.event === "response") { return event; } }
throw new Error("Agent stream completed without emitting a response.");}
function logItem(item: AgentItem) { switch (item.type) { case "message": { const text = getContentText(item); if (text !== "") { console.log(`\n[${item.role}] ${text}`); } break; } case "model": { const text = getContentText(item); if (text !== "") { console.log(`\n[assistant]\n${text}`); } break; } case "tool": { const toolOutput = getContentText({ content: item.output }); console.log( `\n[tool:${item.tool_name}] input=${JSON.stringify(item.input)} output=${JSON.stringify(toolOutput)}`, ); break; } }}
async function promptForApproval(artifact: string): Promise<ApprovalStatus> { const rl = createInterface({ input, output }); try { const decision = ( await rl.question(`Grant approval to unlock ${artifact}? (y/N) `) ) .trim() .toLowerCase();
if (/^y(es)?$/.test(decision)) { return "approved"; }
if (/^n(o)?$/.test(decision) || decision === "") { return "denied"; }
console.log("Unrecognized response, treating as denied."); return "denied"; } finally { rl.close(); }}
for (;;) { try { const response = await run(vaultContext);
console.log("\nCompleted run."); console.dir(response.content, { depth: null }); break; } catch (err) { if ( err instanceof AgentToolExecutionError && err.cause instanceof RequireApprovalError ) { console.log(`\n[agent halted] err = ${err.cause.message}`);
const haltedArtifact = err.cause.artifact; const normalized = haltedArtifact.trim().toLowerCase(); // Store the decision so the tool sees the new approval status on retry. const decision = await promptForApproval(haltedArtifact);
vaultContext.approvals.set(normalized, decision);
continue; }
throw err; }}
use dotenvy::dotenv;use futures::{future::BoxFuture, StreamExt};use llm_agent::{ Agent, AgentError, AgentItem, AgentRequest, AgentStreamEvent, AgentTool, AgentToolResult,};use llm_sdk::{ openai::{OpenAIModel, OpenAIModelOptions}, Message, Part,};use serde::Deserialize;use serde_json::{json, Value};use std::{ collections::HashMap, env, error::Error, fmt, io::{self, Write}, sync::{Arc, Mutex},};
// Human-in-the-loop outline with agent primitives:// 1. Seed the run with a user `AgentItem` and call `Agent::run_stream` so we// capture every emitted `AgentStreamEvent` (model messages, tool results,// etc.).// 2. When the tool throws our user-land `RequireApprovalError`, collect the// human decision and persist it on the shared RunSession context.// 3. Repeat step (1) with the accumulated items and mutated context until the// tool succeeds or returns an error result that reflects the denial.#[derive(Clone, Copy, Debug, PartialEq, Eq)]enum ApprovalStatus { Approved, Denied,}
#[derive(Clone, Default)]struct VaultContext { approvals: Arc<Mutex<HashMap<String, ApprovalStatus>>>,}
impl VaultContext { fn status(&self, key: &str) -> Option<ApprovalStatus> { self.approvals .lock() .expect("approvals mutex poisoned") .get(key) .copied() }
fn set_status(&self, artifact: &str, status: ApprovalStatus) { self.approvals .lock() .expect("approvals mutex poisoned") .insert(artifact.to_lowercase(), status); }}
#[derive(Debug)]struct RequireApprovalError { message: String, artifact: String,}
impl RequireApprovalError { fn new(artifact: String) -> Self { let message = format!("Release of {artifact} requires human approval before it can proceed."); Self { message, artifact } }}
impl fmt::Display for RequireApprovalError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.message) }}
impl Error for RequireApprovalError {}
// Single AgentTool that inspects the context map and interrupts the run without// touching the Agent implementation. Thrown errors become// AgentToolExecutionError.struct UnlockArtifactTool;
#[derive(Deserialize)]struct UnlockArtifactArgs { artifact: String,}
impl AgentTool<VaultContext> for UnlockArtifactTool { fn name(&self) -> String { "unlock_artifact".into() }
fn description(&self) -> String { "Unlock an artifact for release once a human supervisor has recorded their approval.".into() }
fn parameters(&self) -> llm_sdk::JSONSchema { json!({ "type": "object", "properties": { "artifact": { "type": "string", "description": "Name of the artifact to release.", "minLength": 1 } }, "required": ["artifact"], "additionalProperties": false }) }
fn execute<'a>( &'a self, args: Value, context: &'a VaultContext, _state: &llm_agent::RunState, ) -> BoxFuture<'a, Result<AgentToolResult, Box<dyn Error + Send + Sync>>> { Box::pin(async move { let params: UnlockArtifactArgs = serde_json::from_value(args) .map_err(|err| Box::new(err) as Box<dyn Error + Send + Sync>)?; let artifact = params.artifact.trim().to_string(); let artifact_key = artifact.to_lowercase(); match context.status(&artifact_key) { None => { Err(Box::new(RequireApprovalError::new(artifact)) as Box<dyn Error + Send + Sync>) } Some(ApprovalStatus::Denied) => Ok(AgentToolResult { content: vec![Part::text(format!( "Release of {artifact} remains blocked until a supervisor approves it." ))], is_error: true, }), Some(ApprovalStatus::Approved) => Ok(AgentToolResult { content: vec![Part::text(format!( "{artifact} unlocked. Proceed with standard vault handling protocols." ))], is_error: false, }), } }) }}
fn build_agent(model: Arc<dyn llm_sdk::LanguageModel + Send + Sync>) -> Agent<VaultContext> { Agent::builder("VaultSentinel", model) .add_instruction( "You supervise the Eon Vault, safeguarding experimental expedition technology.", ) .add_tool(UnlockArtifactTool) .build()}
const INITIAL_PROMPT: &str = "We have an emergency launch window in four hours. Please unlock the \ Starlight Compass for the Horizon survey team.";
fn initial_transcript() -> Vec<AgentItem> { vec![AgentItem::Message(Message::user(vec![Part::text( INITIAL_PROMPT, )]))]}
// Stream one pass of the agent, appending every AgentStreamItemEvent.async fn run_stream( agent: &Agent<VaultContext>, transcript: &mut Vec<AgentItem>, context: VaultContext,) -> Result<llm_agent::AgentResponse, AgentError> { let mut stream = agent .run_stream(AgentRequest { context, input: transcript.clone(), }) .await?;
while let Some(event) = stream.next().await { match event? { AgentStreamEvent::Partial(_) => {} AgentStreamEvent::Item(item_event) => { // Persist generated items so later iterations operate on the full history. transcript.push(item_event.item.clone()); log_item(&item_event.item); } AgentStreamEvent::Response(response) => return Ok(response), } }
Err(AgentError::Invariant( "agent stream completed without emitting a response".into(), ))}
fn log_item(item: &AgentItem) { match item { AgentItem::Message(message) => { let text = render_parts(message_content(message)); if !text.is_empty() { println!("\n[{}] {}", message_role(message), text); } } AgentItem::Model(response) => { let text = render_parts(&response.content); if !text.is_empty() { println!("\n[assistant]\n{text}"); } } AgentItem::Tool(tool) => { let input = serde_json::to_string(&tool.input).unwrap_or_else(|_| "{}".into()); println!("\n[tool:{}]", tool.tool_name); println!(" input={input}"); let output = render_parts(&tool.output); if !output.is_empty() { println!(" output={output}"); } } }}
fn prompt_for_approval(artifact: &str) -> ApprovalStatus { print!("Grant approval to unlock {artifact}? (y/N) "); io::stdout().flush().expect("flush stdout");
let mut input = String::new(); if io::stdin().read_line(&mut input).is_err() { eprintln!("failed to read input; treating as denial"); return ApprovalStatus::Denied; }
match input.trim().to_lowercase().as_str() { "y" | "yes" => ApprovalStatus::Approved, "n" | "no" | "" => ApprovalStatus::Denied, _ => { println!("Unrecognized response, treating as denied."); ApprovalStatus::Denied } }}
fn render_parts(parts: &[Part]) -> String { parts .iter() .filter_map(|part| match part { Part::Text(text) => { let trimmed = text.text.trim(); if trimmed.is_empty() { None } else { Some(trimmed.to_string()) } } _ => None, }) .collect::<Vec<_>>() .join("\n")}
fn message_content(message: &Message) -> &Vec<Part> { match message { Message::User(user) => &user.content, Message::Assistant(assistant) => &assistant.content, Message::Tool(tool) => &tool.content, }}
fn message_role(message: &Message) -> &'static str { match message { Message::User(_) => "user", Message::Assistant(_) => "assistant", Message::Tool(_) => "tool", }}
#[tokio::main]async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { dotenv().ok();
let api_key = env::var("OPENAI_API_KEY")?; let model = Arc::new(OpenAIModel::new( "gpt-4o", OpenAIModelOptions { api_key, ..Default::default() }, ));
let agent = build_agent(model); let mut transcript = initial_transcript(); println!("[user] {INITIAL_PROMPT}");
let context = VaultContext::default();
loop { match run_stream(&agent, &mut transcript, context.clone()).await { Ok(response) => { println!("\nCompleted run."); println!("{:#?}", response.content); break; } Err(AgentError::ToolExecution(inner)) => { if let Some(approval) = inner.downcast_ref::<RequireApprovalError>() { println!("\n[agent halted] err = {}", approval.message); let decision = prompt_for_approval(&approval.artifact); context.set_status(&approval.artifact, decision); continue; } return Err(inner); } Err(err) => return Err(err.into()), } }
Ok(())}
package main
import ( "bufio" "bytes" "context" "encoding/json" "errors" "fmt" "log" "os" "strings"
llmagent "github.com/hoangvvo/llm-sdk/agent-go" llmsdk "github.com/hoangvvo/llm-sdk/sdk-go" "github.com/hoangvvo/llm-sdk/sdk-go/openai" "github.com/joho/godotenv" "github.com/sanity-io/litter")
// Human-in-the-loop outline with agent primitives:// 1. Seed the run with a user `AgentItem` and call `Agent#RunStream` so we capture// every emitted `AgentStreamEvent` (model messages, tool results, etc.).// 2. When the tool throws our user-land `requireApprovalError`, collect the human// decision and persist it on the shared RunSession context.// 3. Repeat step (1) with the accumulated items and mutated context until the tool// succeeds or returns an error result that reflects the denial.
type approvalStatus string
const ( statusApproved approvalStatus = "approved" statusDenied approvalStatus = "denied")
type vaultContext struct { approvals map[string]approvalStatus}
type requireApprovalError struct { message string artifact string}
func (e *requireApprovalError) Error() string { return e.message }
// Single AgentTool that inspects the context map and interrupts the run without// touching the Agent implementation. Thrown errors become AgentToolExecutionError.type unlockArtifactTool struct{}
func (unlockArtifactTool) Name() string { return "unlock_artifact" }func (unlockArtifactTool) Description() string { return "Unlock an artifact for release once a human supervisor has recorded their approval."}
func (unlockArtifactTool) Parameters() llmsdk.JSONSchema { return llmsdk.JSONSchema{ "type": "object", "properties": map[string]any{ "artifact": map[string]any{ "type": "string", "description": "Name of the artifact to release.", "minLength": 1, }, }, "required": []string{"artifact"}, "additionalProperties": false, }}
func (unlockArtifactTool) Execute(ctx context.Context, raw json.RawMessage, state *vaultContext, _ *llmagent.RunState) (llmagent.AgentToolResult, error) { var params struct { Artifact string `json:"artifact"` } if err := json.Unmarshal(raw, ¶ms); err != nil { return llmagent.AgentToolResult{}, err }
artifact := strings.TrimSpace(params.Artifact) artifactKey := strings.ToLower(artifact) status, ok := state.approvals[artifactKey] if !ok { return llmagent.AgentToolResult{}, &requireApprovalError{ message: fmt.Sprintf("Release of %s requires human approval before it can proceed.", artifact), artifact: artifact, } }
if status == statusDenied { return llmagent.AgentToolResult{ Content: []llmsdk.Part{llmsdk.NewTextPart(fmt.Sprintf("Release of %s remains blocked until a supervisor approves it.", artifact))}, IsError: true, }, nil }
return llmagent.AgentToolResult{ Content: []llmsdk.Part{llmsdk.NewTextPart(fmt.Sprintf("%s unlocked. Proceed with standard vault handling protocols.", artifact))}, IsError: false, }, nil}
func newAgent(model llmsdk.LanguageModel) *llmagent.Agent[*vaultContext] { instruction := "You supervise the Eon Vault, safeguarding experimental expedition technology." return llmagent.NewAgent[*vaultContext]( "VaultSentinel", model, llmagent.WithInstructions(llmagent.InstructionParam[*vaultContext]{String: &instruction}), llmagent.WithTools(unlockArtifactTool{}), )}
var transcript []llmagent.AgentItem
// Stream one pass of the agent, appending every AgentStreamItemEvent.func runStream(agent *llmagent.Agent[*vaultContext], ctxVal *vaultContext) (*llmagent.AgentResponse, error) { input := append([]llmagent.AgentItem(nil), transcript...)
stream, err := agent.RunStream(context.Background(), llmagent.AgentRequest[*vaultContext]{ Context: ctxVal, Input: input, }) if err != nil { return nil, err }
for stream.Next() { event := stream.Current() if event.Item != nil { transcript = append(transcript, event.Item.Item) logItem(event.Item.Item) } if event.Response != nil { return event.Response, nil } }
if err := stream.Err(); err != nil { return nil, err }
return nil, errors.New("agent stream completed without emitting a response")}
func logItem(item llmagent.AgentItem) { switch item.Type() { case llmagent.AgentItemTypeMessage: msg := item.Message if msg == nil { return } text := extractMessageText(*msg) if text != "" { fmt.Printf("\n[%s] %s\n", strings.ToLower(messageRole(*msg)), text) } case llmagent.AgentItemTypeModel: if item.Model == nil { return } text := renderParts(item.Model.Content) if text != "" { fmt.Printf("\n[assistant]\n%s\n", text) } case llmagent.AgentItemTypeTool: tool := item.Tool if tool == nil { return } fmt.Printf("\n[tool:%s]\n input=%s\n", tool.ToolName, indentJSON(tool.Input)) if output := renderParts(tool.Output); output != "" { fmt.Printf(" output=%s\n", output) } }}
func promptForApproval(reader *bufio.Reader, artifact string) approvalStatus { fmt.Printf("Grant approval to unlock %s? (y/N) ", artifact) line, err := reader.ReadString('\n') if err != nil { fmt.Printf("read input error: %v\n", err) return statusDenied } decision := strings.TrimSpace(strings.ToLower(line)) switch decision { case "y", "yes": return statusApproved case "", "n", "no": return statusDenied default: fmt.Println("Unrecognized response, treating as denied.") return statusDenied }}
func main() { if err := godotenv.Load("../.env"); err != nil && !errors.Is(err, os.ErrNotExist) { log.Fatalf("load env: %v", err) }
apiKey := os.Getenv("OPENAI_API_KEY") if apiKey == "" { log.Fatal("OPENAI_API_KEY environment variable must be set") }
model := openai.NewOpenAIModel("gpt-4o", openai.OpenAIModelOptions{APIKey: apiKey}) agent := newAgent(model)
initialText := "We have an emergency launch window in four hours. Please unlock the Starlight Compass for the Horizon survey team." transcript = []llmagent.AgentItem{ llmagent.NewAgentItemMessage(llmsdk.NewUserMessage(llmsdk.NewTextPart(initialText))), } fmt.Printf("[user] %s\n", initialText)
ctxVal := &vaultContext{approvals: map[string]approvalStatus{}} reader := bufio.NewReader(os.Stdin)
for { response, err := runStream(agent, ctxVal) if err == nil { fmt.Println("\nCompleted run.") litter.Dump(response.Content) break }
var approvalErr *requireApprovalError if errors.As(err, &approvalErr) { fmt.Printf("\n[agent halted] err = %s\n", approvalErr.Error()) decision := promptForApproval(reader, approvalErr.artifact) key := strings.ToLower(strings.TrimSpace(approvalErr.artifact)) if key != "" { ctxVal.approvals[key] = decision } continue }
log.Fatalf("run failed: %v", err) }}
func renderParts(parts []llmsdk.Part) string { if len(parts) == 0 { return "" } var lines []string for _, part := range parts { if part.TextPart == nil { continue } trimmed := strings.TrimSpace(part.TextPart.Text) if trimmed != "" { lines = append(lines, trimmed) } } return strings.Join(lines, "\n")}
func extractMessageText(message llmsdk.Message) string { if message.UserMessage != nil { return renderParts(message.UserMessage.Content) } if message.AssistantMessage != nil { return renderParts(message.AssistantMessage.Content) } if message.ToolMessage != nil { return renderParts(message.ToolMessage.Content) } return ""}
func messageRole(message llmsdk.Message) string { switch { case message.UserMessage != nil: return "user" case message.AssistantMessage != nil: return "assistant" case message.ToolMessage != nil: return "tool" default: return "unknown" }}
func indentJSON(raw json.RawMessage) string { if len(raw) == 0 { return "null" } var buf bytes.Buffer if err := json.Indent(&buf, raw, "", " "); err != nil { return string(raw) } return buf.String()}