Skip to main content

Middleware

The middleware system in @ag-ui/client provides a powerful way to transform, filter, and augment event streams flowing through agents. Middleware can intercept and modify events, add logging, implement authentication, filter tool calls, and more.
import { Middleware, MiddlewareFunction, FilterToolCallsMiddleware } from "@ag-ui/client"

Types

MiddlewareFunction

A function that transforms the event stream.
type MiddlewareFunction = (
  input: RunAgentInput,
  next: AbstractAgent
) => Observable<BaseEvent>

Middleware

Abstract base class for creating middleware.
abstract class Middleware {
  abstract run(
    input: RunAgentInput,
    next: AbstractAgent
  ): Observable<BaseEvent>
}

Function-Based Middleware

The simplest way to create middleware is with a function. Function middleware is ideal for stateless transformations.

Basic Example

const loggingMiddleware: MiddlewareFunction = (input, next) => {
  console.log(`[${new Date().toISOString()}] Starting run ${input.runId}`)

  return next.run(input).pipe(
    tap(event => console.log(`Event: ${event.type}`)),
    finalize(() => console.log(`Run ${input.runId} completed`))
  )
}

agent.use(loggingMiddleware)

Transforming Events

const prefixMiddleware: MiddlewareFunction = (input, next) => {
  return next.run(input).pipe(
    map(event => {
      if (event.type === EventType.TEXT_MESSAGE_CHUNK) {
        return {
          ...event,
          delta: `[Assistant]: ${event.delta}`
        }
      }
      return event
    })
  )
}

Error Handling

const errorMiddleware: MiddlewareFunction = (input, next) => {
  return next.run(input).pipe(
    catchError(error => {
      console.error("Agent error:", error)

      // Return error event
      return of({
        type: EventType.RUN_ERROR,
        message: error.message
      } as BaseEvent)
    })
  )
}

Class-Based Middleware

For stateful operations or complex logic, extend the Middleware class.

Basic Implementation

class CounterMiddleware extends Middleware {
  private totalEvents = 0

  run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
    let runEvents = 0

    return next.run(input).pipe(
      tap(() => {
        runEvents++
        this.totalEvents++
      }),
      finalize(() => {
        console.log(`Run events: ${runEvents}, Total: ${this.totalEvents}`)
      })
    )
  }
}

agent.use(new CounterMiddleware())

Configuration-Based Middleware

class AuthMiddleware extends Middleware {
  constructor(
    private apiKey: string,
    private headerName: string = "Authorization"
  ) {
    super()
  }

  run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
    // Add authentication to context
    const authenticatedInput = {
      ...input,
      context: [
        ...input.context,
        {
          type: "auth",
          [this.headerName]: `Bearer ${this.apiKey}`
        }
      ]
    }

    return next.run(authenticatedInput)
  }
}

agent.use(new AuthMiddleware(process.env.API_KEY))

Built-in Middleware

FilterToolCallsMiddleware

Filters tool calls based on allowed or disallowed lists.
import { FilterToolCallsMiddleware } from "@ag-ui/client"

Configuration

type FilterToolCallsConfig =
  | { allowedToolCalls: string[]; disallowedToolCalls?: never }
  | { disallowedToolCalls: string[]; allowedToolCalls?: never }

Allow Specific Tools

const allowFilter = new FilterToolCallsMiddleware({
  allowedToolCalls: ["search", "calculate", "summarize"]
})

agent.use(allowFilter)

Block Specific Tools

const blockFilter = new FilterToolCallsMiddleware({
  disallowedToolCalls: ["delete", "modify", "execute"]
})

agent.use(blockFilter)

Middleware Patterns

Timing Middleware

const timingMiddleware: MiddlewareFunction = (input, next) => {
  const startTime = performance.now()

  return next.run(input).pipe(
    finalize(() => {
      const duration = performance.now() - startTime
      console.log(`Execution time: ${duration.toFixed(2)}ms`)
    })
  )
}

Rate Limiting

class RateLimitMiddleware extends Middleware {
  private lastCall = 0

  constructor(private minInterval: number) {
    super()
  }

  run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
    const now = Date.now()
    const elapsed = now - this.lastCall

    if (elapsed < this.minInterval) {
      // Delay the execution
      return timer(this.minInterval - elapsed).pipe(
        switchMap(() => {
          this.lastCall = Date.now()
          return next.run(input)
        })
      )
    }

    this.lastCall = now
    return next.run(input)
  }
}

// Limit to one request per second
agent.use(new RateLimitMiddleware(1000))

Retry Logic

const retryMiddleware: MiddlewareFunction = (input, next) => {
  return next.run(input).pipe(
    retry({
      count: 3,
      delay: (error, retryCount) => {
        console.log(`Retry attempt ${retryCount}`)
        return timer(1000 * retryCount) // Exponential backoff
      }
    })
  )
}

Caching

class CacheMiddleware extends Middleware {
  private cache = new Map<string, BaseEvent[]>()

  run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
    const cacheKey = this.getCacheKey(input)

    if (this.cache.has(cacheKey)) {
      console.log("Cache hit")
      return from(this.cache.get(cacheKey)!)
    }

    const events: BaseEvent[] = []

    return next.run(input).pipe(
      tap(event => events.push(event)),
      finalize(() => {
        this.cache.set(cacheKey, events)
      })
    )
  }

  private getCacheKey(input: RunAgentInput): string {
    // Create a cache key from the input
    return JSON.stringify({
      messages: input.messages,
      tools: input.tools.map(t => t.name)
    })
  }
}

Chaining Middleware

Multiple middleware can be combined to create sophisticated processing pipelines.
// Create middleware instances
const logger = loggingMiddleware
const auth = new AuthMiddleware(apiKey)
const filter = new FilterToolCallsMiddleware({
  allowedToolCalls: ["search"]
})
const rateLimit = new RateLimitMiddleware(1000)

// Apply middleware in order
agent.use(
  logger,      // First: Log all events
  auth,        // Second: Add authentication
  rateLimit,   // Third: Apply rate limiting
  filter       // Fourth: Filter tool calls
)

// Execution flow:
// logger → auth → rateLimit → filter → agent → filter → rateLimit → auth → logger

Advanced Usage

Conditional Middleware

const debugMiddleware: MiddlewareFunction = (input, next) => {
  const isDebug = input.context.some(c => c.type === "debug")

  if (!isDebug) {
    return next.run(input)
  }

  return next.run(input).pipe(
    tap(event => {
      console.debug("[DEBUG]", JSON.stringify(event, null, 2))
    })
  )
}

Event Filtering

const filterEventsMiddleware: MiddlewareFunction = (input, next) => {
  return next.run(input).pipe(
    filter(event => {
      // Only allow specific event types
      return [
        EventType.RUN_STARTED,
        EventType.TEXT_MESSAGE_CHUNK,
        EventType.RUN_FINISHED
      ].includes(event.type)
    })
  )
}

Stream Manipulation

const bufferMiddleware: MiddlewareFunction = (input, next) => {
  return next.run(input).pipe(
    // Buffer text chunks and emit them in batches
    bufferWhen(() =>
      interval(100).pipe(
        filter(() => true)
      )
    ),
    map(events => events.flat())
  )
}

Best Practices

  1. Single Responsibility: Each middleware should focus on one concern
  2. Error Handling: Always handle errors gracefully and consider recovery strategies
  3. Performance: Be mindful of processing overhead in high-throughput scenarios
  4. State Management: Use class-based middleware when state is required
  5. Testing: Write unit tests for each middleware independently
  6. Documentation: Document middleware behavior and side effects

TypeScript Support

The middleware system is fully typed for excellent IDE support:
import {
  Middleware,
  MiddlewareFunction,
  FilterToolCallsMiddleware
} from "@ag-ui/client"
import { RunAgentInput, BaseEvent, EventType } from "@ag-ui/core"

// Type-safe middleware function
const typedMiddleware: MiddlewareFunction = (
  input: RunAgentInput,
  next: AbstractAgent
): Observable<BaseEvent> => {
  return next.run(input)
}

// Type-safe middleware class
class TypedMiddleware extends Middleware {
  run(
    input: RunAgentInput,
    next: AbstractAgent
  ): Observable<BaseEvent> {
    return next.run(input)
  }
}