AgentSubscriber
The AgentSubscriber
interface provides a comprehensive event-driven system for
handling agent lifecycle events, message updates, and state mutations during
agent execution. It allows you to hook into various stages of the agent’s
operation and modify its behavior.
import { AgentSubscriber } from "@ag-ui/client"
Overview
AgentSubscriber
defines a collection of optional event handlers and lifecycle
hooks that can respond to different stages of agent execution. All methods in
the interface are optional, allowing you to implement only the events you need
to handle.
All subscriber methods can be either synchronous or asynchronous - if they
return a Promise, the agent will await their completion before proceeding.
Adding Subscribers to Agents
Subscribers can be added to agents in two ways:
Permanent Subscription
Use the subscribe()
method to add a subscriber that will persist across
multiple agent runs:
const agent = new HttpAgent({ url: "https://api.example.com/agent" })
const subscriber: AgentSubscriber = {
onTextMessageContentEvent: ({ textMessageBuffer }) => {
console.log("Streaming text:", textMessageBuffer)
},
}
// Add permanent subscriber
const subscription = agent.subscribe(subscriber)
// Later, remove the subscriber if needed
subscription.unsubscribe()
Temporary Subscription
Pass a subscriber directly to runAgent()
for one-time use:
const temporarySubscriber: AgentSubscriber = {
onRunFinishedEvent: ({ result }) => {
console.log("Run completed with result:", result)
},
}
// Use subscriber for this run only
await agent.runAgent({ tools: [myTool] }, temporarySubscriber)
Core Interfaces
AgentSubscriber
The main interface that defines all available event handlers and lifecycle
hooks. All methods in the interface are optional, allowing you to implement only
the events you need to handle.
AgentStateMutation
Event handlers can return an AgentStateMutation
object to modify the agent’s
state and control event processing:
interface AgentStateMutation {
messages?: Message[] // Update the message history
state?: State // Update the agent state
stopPropagation?: boolean // Prevent further subscribers from processing this event
}
When a subscriber returns a mutation:
- messages: Replaces the current message history
- state: Replaces the current agent state
- stopPropagation: If
true
, prevents subsequent subscribers from handling
the event (useful for overriding default behavior)
AgentSubscriberParams
Common parameters passed to most subscriber methods:
interface AgentSubscriberParams {
messages: Message[] // Current message history
state: State // Current agent state
agent: AbstractAgent // The agent instance
input: RunAgentInput // The original input parameters
}
Event Handlers
Lifecycle Events
onRunInitialized()
Called when the agent run is first initialized, before any processing begins.
onRunInitialized?(params: AgentSubscriberParams): MaybePromise<Omit<AgentStateMutation, "stopPropagation"> | void>
onRunFailed()
Called when the agent run encounters an error.
onRunFailed?(params: { error: Error } & AgentSubscriberParams): MaybePromise<Omit<AgentStateMutation, "stopPropagation"> | void>
onRunFinalized()
Called when the agent run completes, regardless of success or failure.
onRunFinalized?(params: AgentSubscriberParams): MaybePromise<Omit<AgentStateMutation, "stopPropagation"> | void>
Event Handlers
onEvent()
General event handler that receives all events during agent execution.
onEvent?(params: { event: BaseEvent } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
onRunStartedEvent()
Triggered when an agent run begins execution.
onRunStartedEvent?(params: { event: RunStartedEvent } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
onRunFinishedEvent()
Called when an agent run completes successfully.
onRunFinishedEvent?(params: { event: RunFinishedEvent; result?: any } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
onRunErrorEvent()
Triggered when an agent run encounters an error.
onRunErrorEvent?(params: { event: RunErrorEvent } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
onStepStartedEvent()
Called when a step within an agent run begins.
onStepStartedEvent?(params: { event: StepStartedEvent } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
onStepFinishedEvent()
Triggered when a step within an agent run completes.
onStepFinishedEvent?(params: { event: StepFinishedEvent } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
Message Events
onTextMessageStartEvent()
Triggered when a text message starts being generated.
onTextMessageStartEvent?(params: { event: TextMessageStartEvent } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
onTextMessageContentEvent()
Called for each chunk of text content as it’s generated.
onTextMessageContentEvent?(params: { event: TextMessageContentEvent; textMessageBuffer: string } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
onTextMessageEndEvent()
Called when a text message generation is complete.
onTextMessageEndEvent?(params: { event: TextMessageEndEvent; textMessageBuffer: string } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
Triggered when a tool call begins.
onToolCallStartEvent?(params: { event: ToolCallStartEvent } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
Called as tool call arguments are being parsed, providing both raw and parsed
argument data.
onToolCallArgsEvent?(params: { event: ToolCallArgsEvent; toolCallBuffer: string; toolCallName: string; partialToolCallArgs: Record<string, any> } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
Called when a tool call is complete with final arguments.
onToolCallEndEvent?(params: { event: ToolCallEndEvent; toolCallName: string; toolCallArgs: Record<string, any> } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
Triggered when a tool call result is received.
onToolCallResultEvent?(params: { event: ToolCallResultEvent } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
State Events
onStateSnapshotEvent()
Called when a complete state snapshot is provided.
onStateSnapshotEvent?(params: { event: StateSnapshotEvent } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
onStateDeltaEvent()
Triggered when partial state changes are applied.
onStateDeltaEvent?(params: { event: StateDeltaEvent } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
onMessagesSnapshotEvent()
Called when a complete message history snapshot is provided.
onMessagesSnapshotEvent?(params: { event: MessagesSnapshotEvent } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
onRawEvent()
Handler for raw, unprocessed events.
onRawEvent?(params: { event: RawEvent } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
onCustomEvent()
Handler for custom application-specific events.
onCustomEvent?(params: { event: CustomEvent } & AgentSubscriberParams): MaybePromise<AgentStateMutation | void>
State Change Handlers
onMessagesChanged()
Called when the agent’s message history is updated.
onMessagesChanged?(params: Omit<AgentSubscriberParams, "input"> & { input?: RunAgentInput }): MaybePromise<void>
onStateChanged()
Triggered when the agent’s state is modified.
onStateChanged?(params: Omit<AgentSubscriberParams, "input"> & { input?: RunAgentInput }): MaybePromise<void>
onNewMessage()
Called when a new message is added to the conversation.
onNewMessage?(params: { message: Message } & Omit<AgentSubscriberParams, "input"> & { input?: RunAgentInput }): MaybePromise<void>
Triggered when a new tool call is added to a message.
onNewToolCall?(params: { toolCall: ToolCall } & Omit<AgentSubscriberParams, "input"> & { input?: RunAgentInput }): MaybePromise<void>
Async Support
All subscriber methods support both synchronous and asynchronous execution:
const subscriber: AgentSubscriber = {
// Synchronous handler
onTextMessageContentEvent: ({ textMessageBuffer }) => {
updateUI(textMessageBuffer)
},
// Asynchronous handler
onStateChanged: async ({ state }) => {
await saveStateToDatabase(state)
},
// Async handler with mutation
onRunInitialized: async ({ messages, state }) => {
const enrichedState = await loadUserPreferences()
return {
state: { ...state, ...enrichedState },
}
},
}
Multiple Subscribers
Agents can have multiple subscribers, which are processed in the order they were
added:
// First subscriber modifies state
const stateEnricher: AgentSubscriber = {
onRunInitialized: ({ state }) => ({
state: { ...state, timestamp: new Date().toISOString() },
}),
}
// Second subscriber sees the modified state
const logger: AgentSubscriber = {
onRunInitialized: ({ state }) => {
console.log("State after enrichment:", state)
},
}
agent.subscribe(stateEnricher)
agent.subscribe(logger)
Integration with Agents
Basic usage pattern:
const agent = new HttpAgent({ url: "https://api.example.com/agent" })
// Add persistent subscriber
agent.subscribe({
onTextMessageContentEvent: ({ textMessageBuffer }) => {
updateStreamingUI(textMessageBuffer)
},
onRunFinishedEvent: ({ result }) => {
displayFinalResult(result)
},
})
// Run agent (subscriber will be called automatically)
const result = await agent.runAgent({
tools: [myTool],
})