Skip to main content

AbstractAgent

The AbstractAgent class provides the foundation for all agent implementations in the Agent User Interaction Protocol. It handles the core event stream processing, state management, and message history.
import { AbstractAgent } from "@ag-ui/client"

Configuration

By default, all agents are configured by providing an optional AgentConfig object to the constructor.
interface AgentConfig {
  agentId?: string // The identifier of the agent
  description?: string // A description of the agent, used by the LLM
  threadId?: string // The conversation thread identifier
  initialMessages?: Message[] // An array of initial messages
  initialState?: State // The initial state of the agent
}

Adding Configuration Options in your Subclass

To add additional configuration options, it is recommended to extend the AgentConfig interface and call the super constructor with the extended config from your subclass like this:
interface MyAgentConfig extends AgentConfig {
  myConfigOption: string
}

class MyAgent extends AbstractAgent {
  private myConfigOption: string

  constructor(config: MyAgentConfig) {
    super(config)
    this.myConfigOption = config.myConfigOption
  }
}

Core Methods

runAgent()

The primary method for executing an agent and processing the result.
runAgent(parameters?: RunAgentParameters, subscriber?: AgentSubscriber): Promise<RunAgentResult>

Parameters

interface RunAgentParameters {
  runId?: string // Unique ID for this execution run
  tools?: Tool[] // Available tools for the agent
  context?: Context[] // Contextual information
  forwardedProps?: Record<string, any> // Additional properties to forward
}
The optional subscriber parameter allows you to provide an AgentSubscriber for handling events during this specific run.

Return Value

interface RunAgentResult {
  result: any // The final result returned by the agent
  newMessages: Message[] // New messages added during this run
}

subscribe()

Adds an AgentSubscriber to handle events across multiple agent runs.
subscribe(subscriber: AgentSubscriber): { unsubscribe: () => void }
Returns an object with an unsubscribe() method to remove the subscriber when no longer needed.

use()

Adds middleware to the agent’s event processing pipeline.
use(...middlewares: (Middleware | MiddlewareFunction)[]): this
Middleware can be either:
  • Function middleware: Simple functions that transform the event stream
  • Class middleware: Instances of the Middleware class for stateful operations
// Function middleware
agent.use((input, next) => {
  console.log("Processing:", input.runId);
  return next.run(input);
});

// Class middleware
agent.use(new FilterToolCallsMiddleware({
  allowedToolCalls: ["search"]
}));

// Chain multiple middleware
agent.use(loggingMiddleware, authMiddleware, filterMiddleware);
Middleware executes in the order added, with each wrapping the next. See the Middleware documentation for more details.

abortRun()

Cancels the current agent execution.
abortRun(): void

clone()

Creates a deep copy of the agent instance.
clone(): AbstractAgent

connectAgent()

Establishes a persistent connection with an agent that implements the connect() method.
connectAgent(parameters?: RunAgentParameters, subscriber?: AgentSubscriber): Promise<RunAgentResult>
Similar to runAgent() but uses the connect() method internally. The agent must implement connect() or this functionality must be provided by a framework like CopilotKit.

Observable Properties

events$

An observable stream of all events emitted during agent execution.
events$: Observable<BaseEvent>
This property provides direct access to the agent’s event stream. Events are stored using a ReplaySubject, allowing late subscribers will receive all historical events.

Properties

  • agentId: Unique identifier for the agent instance
  • description: Human-readable description
  • threadId: Conversation thread identifier
  • messages: Array of conversation messages
  • state: Current agent state object
  • events$: Observable stream of all BaseEvent objects emitted during agent execution (replayed for late subscribers)

Protected Methods

These methods are meant to be implemented or extended by subclasses:

run()

Executes the agent and returns an observable event stream.
protected abstract run(input: RunAgentInput): RunAgent

connect()

Establishes a persistent connection and returns an observable event stream.
protected connect(input: RunAgentInput): RunAgent
Override this method to implement persistent connections. Default implementation throws ConnectNotImplementedError.

apply()

Processes events from the run and updates the agent state.
protected apply(input: RunAgentInput): ApplyEvents

prepareRunAgentInput()

Prepares the input parameters for the agent execution.
protected prepareRunAgentInput(parameters?: RunAgentParameters): RunAgentInput

onError() and onFinalize()

Lifecycle hooks for error handling and cleanup operations.
protected onError(error: Error): void
protected onFinalize(): void