Skip to main content

Middleware

The middleware system in @ag-ui/client provides a powerful way to transform, filter, and augment event streams flowing through agents. Middleware can intercept and modify events, add logging, implement authentication, filter tool calls, and more.
import {
  AbstractAgent,
  BaseEvent,
  EventType,
  FilterToolCallsMiddleware,
  Message,
  Middleware,
  MiddlewareFunction,
  RunAgentInput
} from "@ag-ui/client"
import { Observable } from "rxjs"
Examples below assume the relevant RxJS operators/utilities (map, tap, filter, finalize, catchError, switchMap, timer, of, etc.) are imported.

Types

MiddlewareFunction

A function that transforms the event stream.
type MiddlewareFunction = (
  input: RunAgentInput,
  next: AbstractAgent
) => Observable<BaseEvent>
Function middleware receives events exactly as emitted by the next middleware or agent.

Middleware

Abstract base class for creating middleware.
interface EventWithState {
  event: BaseEvent
  messages: Message[]
  state: unknown
}

abstract class Middleware {
  abstract run(
    input: RunAgentInput,
    next: AbstractAgent
  ): Observable<BaseEvent>

  protected runNext(
    input: RunAgentInput,
    next: AbstractAgent
  ): Observable<BaseEvent>

  protected runNextWithState(
    input: RunAgentInput,
    next: AbstractAgent
  ): Observable<EventWithState>
}
  • runNext() runs next.run(...) and normalizes chunk events into complete TEXT_MESSAGE_* / TOOL_CALL_* sequences.
  • runNextWithState() does the same and also provides accumulated messages and state after each event is applied.

Function-Based Middleware

The simplest way to create middleware is with a function. Function middleware is ideal for stateless transformations.

Basic Example

const loggingMiddleware: MiddlewareFunction = (input, next) => {
  console.log(`[${new Date().toISOString()}] Starting run ${input.runId}`)

  return next.run(input).pipe(
    tap(event => console.log(`Event: ${event.type}`)),
    finalize(() => console.log(`Run ${input.runId} completed`))
  )
}

agent.use(loggingMiddleware)

Transforming Events

const timestampMiddleware: MiddlewareFunction = (input, next) => {
  return next.run(input).pipe(
    map(event => {
      if (event.type === EventType.RUN_STARTED) {
        return {
          ...event,
          timestamp: Date.now()
        }
      }
      return event
    })
  )
}

Error Handling

const errorMiddleware: MiddlewareFunction = (input, next) => {
  return next.run(input).pipe(
    catchError(error => {
      console.error("Agent error:", error)

      // Return error event
      return of({
        type: EventType.RUN_ERROR,
        message: error.message
      } as BaseEvent)
    })
  )
}

Class-Based Middleware

For stateful operations or complex logic, extend the Middleware class.

Basic Implementation

class CounterMiddleware extends Middleware {
  private totalEvents = 0

  run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
    let runEvents = 0

    return this.runNext(input, next).pipe(
      tap(() => {
        runEvents++
        this.totalEvents++
      }),
      finalize(() => {
        console.log(`Run events: ${runEvents}, Total: ${this.totalEvents}`)
      })
    )
  }
}

agent.use(new CounterMiddleware())

Configuration-Based Middleware

class AuthMiddleware extends Middleware {
  constructor(
    private apiKey: string,
    private headerName: string = "Authorization"
  ) {
    super()
  }

  run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
    // Attach auth data in forwardedProps for downstream transport/agent logic
    const authenticatedInput: RunAgentInput = {
      ...input,
      forwardedProps: {
        ...input.forwardedProps,
        auth: {
          headerName: this.headerName,
          value: `Bearer ${this.apiKey}`
        }
      }
    }

    return this.runNext(authenticatedInput, next)
  }
}

const apiKey = process.env.API_KEY ?? ""
agent.use(new AuthMiddleware(apiKey))

Accumulator Helpers (Class Middleware)

Class middleware can use helper methods from Middleware to work with normalized events and accumulated state.

runNext()

runNext() forwards execution and normalizes chunk events into full TEXT_MESSAGE_* and TOOL_CALL_* events.

runNextWithState()

runNextWithState() returns { event, messages, state }, where messages and state are the accumulated values after each event has been applied.
class MetricsWithStateMiddleware extends Middleware {
  run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
    return this.runNextWithState(input, next).pipe(
      tap(({ event, messages, state }) => {
        if (event.type === EventType.RUN_FINISHED) {
          const stateKeyCount =
            state && typeof state === "object" ? Object.keys(state).length : 0

          console.log("Assistant messages:", messages.filter(m => m.role === "assistant").length)
          console.log("Final state keys:", stateKeyCount)
        }
      }),
      map(({ event }) => event)
    )
  }
}

Built-in Middleware

FilterToolCallsMiddleware

Filters tool calls based on allowed or disallowed lists. FilterToolCallsMiddleware filters emitted TOOL_CALL_* events (including args/results for blocked calls). It does not prevent tool execution in the upstream model/runtime.
import { FilterToolCallsMiddleware } from "@ag-ui/client"

Configuration

type FilterToolCallsConfig =
  | { allowedToolCalls: string[]; disallowedToolCalls?: never }
  | { disallowedToolCalls: string[]; allowedToolCalls?: never }

Allow Specific Tools

const allowFilter = new FilterToolCallsMiddleware({
  allowedToolCalls: ["search", "calculate", "summarize"]
})

agent.use(allowFilter)
You can also use disallowedToolCalls instead of allowedToolCalls.

Middleware Patterns

Timing Middleware

const timingMiddleware: MiddlewareFunction = (input, next) => {
  const startTime = performance.now()

  return next.run(input).pipe(
    finalize(() => {
      const duration = performance.now() - startTime
      console.log(`Execution time: ${duration.toFixed(2)}ms`)
    })
  )
}

Rate Limiting

class RateLimitMiddleware extends Middleware {
  private lastCall = 0

  constructor(private minInterval: number) {
    super()
  }

  run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
    const now = Date.now()
    const elapsed = now - this.lastCall

    if (elapsed < this.minInterval) {
      // Delay the execution
      return timer(this.minInterval - elapsed).pipe(
        switchMap(() => {
          this.lastCall = Date.now()
          return this.runNext(input, next)
        })
      )
    }

    this.lastCall = now
    return this.runNext(input, next)
  }
}

// Limit to one request per second
agent.use(new RateLimitMiddleware(1000))
Other common patterns include retry logic and response caching.

Chaining Middleware

Multiple middleware can be combined to create sophisticated processing pipelines.
const logger = loggingMiddleware
const auth = new AuthMiddleware(apiKey)
const filter = new FilterToolCallsMiddleware({ allowedToolCalls: ["search"] })

agent.use(logger, auth, filter)

// Execution flow:
// logger → auth → filter → agent → filter → auth → logger

Advanced Usage

Conditional Middleware

const debugMiddleware: MiddlewareFunction = (input, next) => {
  const isDebug = input.forwardedProps?.debug === true

  if (!isDebug) {
    return next.run(input)
  }

  return next.run(input).pipe(
    tap(event => {
      console.debug("[DEBUG]", JSON.stringify(event, null, 2))
    })
  )
}

Lifecycle Notes

Middleware added with agent.use(...) runs in runAgent() (and the legacy bridge path). connectAgent() currently calls connect() directly and does not run middleware.

Best Practices

  1. Single Responsibility: Each middleware should focus on one concern
  2. Error Handling: Always handle errors gracefully and consider recovery strategies
  3. Performance: Be mindful of processing overhead in high-throughput scenarios
  4. State Management: Use class-based middleware when state is required
  5. Testing: Write unit tests for each middleware independently
  6. Documentation: Document middleware behavior and side effects

TypeScript Support

The middleware system is fully typed for excellent IDE support:
import {
  AbstractAgent,
  BaseEvent,
  MiddlewareFunction,
  RunAgentInput
} from "@ag-ui/client"
import { Observable } from "rxjs"

// Type-safe middleware function
const typedMiddleware: MiddlewareFunction = (
  input: RunAgentInput,
  next: AbstractAgent
): Observable<BaseEvent> => {
  return next.run(input)
}