In this tutorial, you’ll learn how to build an HTTP endpoint that is compatible
with the AG-UI protocol.
Prerequisites
Make sure to have Python and
Poetry installed.
Setup a New Project with Poetry
First, let’s create a new project and set up Poetry for dependency management:
poetry new my-endpoint --python=">=3.12,<4.0" && cd my-endpoint
Install Dependencies
Now, let’s install the necessary packages:
poetry add ag-ui-protocol openai fastapi uvicorn
Create a Basic Endpoint with FastAPI
Create a new file called my_endpoint/main.py
with the following code:
from fastapi import FastAPI, Request
import json
from ag_ui.core.types import RunAgentInput
app = FastAPI(title="AG-UI Endpoint")
@app.post("/awp")
async def my_endpoint():
return { "message": "Hello World" }
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Run and Test Your Endpoint
Start the server with:
poetry run uvicorn my_endpoint.main:app --reload
In another terminal, test your endpoint is running using curl:
curl -X POST http://localhost:8000/awp
You should see the following response:
{ "message": "Hello World" }
Next let’s update our endpoint to properly parse the incoming AG-UI request
using the RunAgentInput
Pydantic model:
from fastapi import FastAPI, Request, HTTPException
from ag_ui.core import RunAgentInput, Message
app = FastAPI(title="AG-UI Endpoint")
@app.post("/awp")
async def my_endpoint(input_data: RunAgentInput):
thread_id = input_data.thread_id
return { "message": "Hello World from " + thread_id }
FastAPI automatically validates the incoming request against the RunAgentInput
schema. If the request doesn’t match the expected format, it will return a 422
Validation Error with details about what went wrong.
Add Event Streaming
AG-UI supports streaming events using Server-Sent Events (SSE). Let’s modify our
/awp
endpoint to stream events back to the client:
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from ag_ui.core import RunAgentInput, Message, EventType, RunStartedEvent, RunFinishedEvent
from ag_ui.encoder import EventEncoder
app = FastAPI(title="AG-UI Endpoint")
@app.post("/awp")
async def my_endpoint(input_data: RunAgentInput):
async def event_generator():
# Create an event encoder to properly format SSE events
encoder = EventEncoder()
# Send run started event
yield encoder.encode(
RunStartedEvent(
type=EventType.RUN_STARTED,
thread_id=input_data.thread_id,
run_id=input_data.run_id
)
)
# Send run finished event
yield encoder.encode(
RunFinishedEvent(
type=EventType.RUN_FINISHED,
thread_id=input_data.thread_id,
run_id=input_data.run_id
)
)
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Awesome! We are already sending RunStartedEvent
and RunFinishedEvent
events,
which gives us a basic AG-UI compliant endpoint. Now let’s make it do something
useful.
Implementing Basic Chat
Let’s enhance our endpoint to call OpenAI’s API and stream the responses back as
AG-UI events:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from ag_ui.core import (
RunAgentInput,
Message,
EventType,
RunStartedEvent,
RunFinishedEvent,
TextMessageStartEvent,
TextMessageContentEvent,
TextMessageEndEvent
)
from ag_ui.encoder import EventEncoder
import uuid
from openai import OpenAI
app = FastAPI(title="AG-UI Endpoint")
@app.post("/awp")
async def my_endpoint(input_data: RunAgentInput):
async def event_generator():
# Create an event encoder to properly format SSE events
encoder = EventEncoder()
# Send run started event
yield encoder.encode(
RunStartedEvent(
type=EventType.RUN_STARTED,
thread_id=input_data.thread_id,
run_id=input_data.run_id
)
)
# Initialize OpenAI client
client = OpenAI()
# Generate a message ID for the assistant's response
message_id = uuid.uuid4()
# Send text message start event
yield encoder.encode(
TextMessageStartEvent(
type=EventType.TEXT_MESSAGE_START,
message_id=message_id,
role="assistant"
)
)
# Create a streaming completion request
stream = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=openai_messages,
stream=True
)
# Process the streaming response and send content events
for chunk in stream:
if hasattr(chunk.choices[0].delta, "content") and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
yield encoder.encode(
TextMessageContentEvent(
type=EventType.TEXT_MESSAGE_CONTENT,
message_id=message_id,
delta=content
)
)
# Send text message end event
yield encoder.encode(
TextMessageEndEvent(
type=EventType.TEXT_MESSAGE_END,
message_id=message_id
)
)
# Send run finished event
yield encoder.encode(
RunFinishedEvent(
type=EventType.RUN_FINISHED,
thread_id=input_data.thread_id,
run_id=input_data.run_id
)
)
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
You’ll need to set your OpenAI API key as an environment variable and then
restart the server:
export OPENAI_API_KEY=your-api-key
poetry run uvicorn my_endpoint.main:app --reload
This implementation creates a fully functional AG-UI endpoint that processes
messages and streams back the responses in real-time.
Prerequisites
Make sure to have Python and
Poetry installed.
Setup a New Project with Poetry
First, let’s create a new project and set up Poetry for dependency management:
poetry new my-endpoint --python=">=3.12,<4.0" && cd my-endpoint
Install Dependencies
Now, let’s install the necessary packages:
poetry add ag-ui-protocol openai fastapi uvicorn
Create a Basic Endpoint with FastAPI
Create a new file called my_endpoint/main.py
with the following code:
from fastapi import FastAPI, Request
import json
from ag_ui.core.types import RunAgentInput
app = FastAPI(title="AG-UI Endpoint")
@app.post("/awp")
async def my_endpoint():
return { "message": "Hello World" }
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Run and Test Your Endpoint
Start the server with:
poetry run uvicorn my_endpoint.main:app --reload
In another terminal, test your endpoint is running using curl:
curl -X POST http://localhost:8000/awp
You should see the following response:
{ "message": "Hello World" }
Next let’s update our endpoint to properly parse the incoming AG-UI request
using the RunAgentInput
Pydantic model:
from fastapi import FastAPI, Request, HTTPException
from ag_ui.core import RunAgentInput, Message
app = FastAPI(title="AG-UI Endpoint")
@app.post("/awp")
async def my_endpoint(input_data: RunAgentInput):
thread_id = input_data.thread_id
return { "message": "Hello World from " + thread_id }
FastAPI automatically validates the incoming request against the RunAgentInput
schema. If the request doesn’t match the expected format, it will return a 422
Validation Error with details about what went wrong.
Add Event Streaming
AG-UI supports streaming events using Server-Sent Events (SSE). Let’s modify our
/awp
endpoint to stream events back to the client:
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from ag_ui.core import RunAgentInput, Message, EventType, RunStartedEvent, RunFinishedEvent
from ag_ui.encoder import EventEncoder
app = FastAPI(title="AG-UI Endpoint")
@app.post("/awp")
async def my_endpoint(input_data: RunAgentInput):
async def event_generator():
# Create an event encoder to properly format SSE events
encoder = EventEncoder()
# Send run started event
yield encoder.encode(
RunStartedEvent(
type=EventType.RUN_STARTED,
thread_id=input_data.thread_id,
run_id=input_data.run_id
)
)
# Send run finished event
yield encoder.encode(
RunFinishedEvent(
type=EventType.RUN_FINISHED,
thread_id=input_data.thread_id,
run_id=input_data.run_id
)
)
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Awesome! We are already sending RunStartedEvent
and RunFinishedEvent
events,
which gives us a basic AG-UI compliant endpoint. Now let’s make it do something
useful.
Implementing Basic Chat
Let’s enhance our endpoint to call OpenAI’s API and stream the responses back as
AG-UI events:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from ag_ui.core import (
RunAgentInput,
Message,
EventType,
RunStartedEvent,
RunFinishedEvent,
TextMessageStartEvent,
TextMessageContentEvent,
TextMessageEndEvent
)
from ag_ui.encoder import EventEncoder
import uuid
from openai import OpenAI
app = FastAPI(title="AG-UI Endpoint")
@app.post("/awp")
async def my_endpoint(input_data: RunAgentInput):
async def event_generator():
# Create an event encoder to properly format SSE events
encoder = EventEncoder()
# Send run started event
yield encoder.encode(
RunStartedEvent(
type=EventType.RUN_STARTED,
thread_id=input_data.thread_id,
run_id=input_data.run_id
)
)
# Initialize OpenAI client
client = OpenAI()
# Generate a message ID for the assistant's response
message_id = uuid.uuid4()
# Send text message start event
yield encoder.encode(
TextMessageStartEvent(
type=EventType.TEXT_MESSAGE_START,
message_id=message_id,
role="assistant"
)
)
# Create a streaming completion request
stream = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=openai_messages,
stream=True
)
# Process the streaming response and send content events
for chunk in stream:
if hasattr(chunk.choices[0].delta, "content") and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
yield encoder.encode(
TextMessageContentEvent(
type=EventType.TEXT_MESSAGE_CONTENT,
message_id=message_id,
delta=content
)
)
# Send text message end event
yield encoder.encode(
TextMessageEndEvent(
type=EventType.TEXT_MESSAGE_END,
message_id=message_id
)
)
# Send run finished event
yield encoder.encode(
RunFinishedEvent(
type=EventType.RUN_FINISHED,
thread_id=input_data.thread_id,
run_id=input_data.run_id
)
)
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
You’ll need to set your OpenAI API key as an environment variable and then
restart the server:
export OPENAI_API_KEY=your-api-key
poetry run uvicorn my_endpoint.main:app --reload
This implementation creates a fully functional AG-UI endpoint that processes
messages and streams back the responses in real-time.
Prerequisites
Make sure to have Node.js (v16 or later) and
npm or yarn installed.
Setup a New Project
First, let’s create a new project and set up npm with TypeScript:
mkdir awp-endpoint && cd awp-endpoint
npm init -y
npm install typescript ts-node @types/node @types/express --save-dev
npx tsc --init
Install Dependencies
Install the necessary packages:
npm install express openai @ag-ui/core @ag-ui/encoder uuid
npm install @types/uuid --save-dev
Create a Basic Endpoint with Express
Create a new file called src/server.ts
with the following code:
import express from "express"
import { Request, Response } from "express"
const app = express()
app.use(express.json())
app.post("/awp", (req: Request, res: Response) => {
res.json({ message: "Hello World" })
})
app.listen(8000, () => {
console.log("Server running on http://localhost:8000")
})
Run and Test Your Endpoint
Start the server with:
npx ts-node src/server.ts
In another terminal, test your endpoint is running using curl:
curl -X POST http://localhost:8000/awp
You should see the following response:
{ "message": "Hello World" }
Next let’s update our endpoint to properly parse the incoming AG-UI request
using the RunAgentInput
schema:
import express, { Request, Response } from "express"
import { RunAgentInputSchema, RunAgentInput } from "@ag-ui/core"
const app = express()
app.use(express.json())
app.post("/awp", (req: Request, res: Response) => {
try {
// Parse and validate the request body
const input: RunAgentInput = RunAgentInputSchema.parse(req.body)
res.json({ message: `Hello World from ${input.threadId}` })
} catch (error) {
res.status(422).json({ error: (error as Error).message })
}
})
app.listen(8000, () => {
console.log("Server running on http://localhost:8000")
})
Express with zod validation ensures the incoming request conforms to the AG-UI
protocol format. If the request doesn’t match the expected format, it will
return a 422 Validation Error with details about what went wrong.
Add Event Streaming
AG-UI supports streaming events using Server-Sent Events (SSE). Let’s modify our
/awp
endpoint to stream events back to the client:
import express, { Request, Response } from "express"
import { RunAgentInputSchema, RunAgentInput, EventType } from "@ag-ui/core"
import { EventEncoder } from "@ag-ui/encoder"
const app = express()
app.use(express.json())
app.post("/awp", async (req: Request, res: Response) => {
try {
// Parse and validate the request body
const input: RunAgentInput = RunAgentInputSchema.parse(req.body)
// Set up SSE headers
res.setHeader("Content-Type", "text/event-stream")
res.setHeader("Cache-Control", "no-cache")
res.setHeader("Connection", "keep-alive")
// Create an event encoder
const encoder = new EventEncoder()
// Send run started event
const runStarted = {
type: EventType.RUN_STARTED,
threadId: input.threadId,
runId: input.runId,
}
res.write(encoder.encode(runStarted))
// Send run finished event
const runFinished = {
type: EventType.RUN_FINISHED,
threadId: input.threadId,
runId: input.runId,
}
res.write(encoder.encode(runFinished))
// End the response
res.end()
} catch (error) {
res.status(422).json({ error: (error as Error).message })
}
})
app.listen(8000, () => {
console.log("Server running on http://localhost:8000")
})
Implementing Basic Chat
Let’s enhance our endpoint to call OpenAI’s API and stream the responses back as
AG-UI events:
import express, { Request, Response } from "express"
import {
RunAgentInputSchema,
RunAgentInput,
EventType,
Message,
} from "@ag-ui/core"
import { EventEncoder } from "@ag-ui/encoder"
import { OpenAI } from "openai"
import { v4 as uuidv4 } from "uuid"
const app = express()
app.use(express.json())
app.post("/awp", async (req: Request, res: Response) => {
try {
// Parse and validate the request body
const input: RunAgentInput = RunAgentInputSchema.parse(req.body)
// Set up SSE headers
res.setHeader("Content-Type", "text/event-stream")
res.setHeader("Cache-Control", "no-cache")
res.setHeader("Connection", "keep-alive")
// Create an event encoder
const encoder = new EventEncoder()
// Send run started event
const runStarted = {
type: EventType.RUN_STARTED,
threadId: input.threadId,
runId: input.runId,
}
res.write(encoder.encode(runStarted))
// Initialize OpenAI client
const client = new OpenAI()
// Convert AG-UI messages to OpenAI messages format
const openaiMessages = input.messages
.filter((msg: Message) =>
["user", "system", "assistant"].includes(msg.role)
)
.map((msg: Message) => ({
role: msg.role as "user" | "system" | "assistant",
content: msg.content || "",
}))
// Generate a message ID for the assistant's response
const messageId = uuidv4()
// Send text message start event
const textMessageStart = {
type: EventType.TEXT_MESSAGE_START,
messageId,
role: "assistant",
}
res.write(encoder.encode(textMessageStart))
// Create a streaming completion request
const stream = await client.chat.completions.create({
model: "gpt-3.5-turbo",
messages: openaiMessages,
stream: true,
})
// Process the streaming response and send content events
for await (const chunk of stream) {
if (chunk.choices[0]?.delta?.content) {
const content = chunk.choices[0].delta.content
const textMessageContent = {
type: EventType.TEXT_MESSAGE_CONTENT,
messageId,
delta: content,
}
res.write(encoder.encode(textMessageContent))
}
}
// Send text message end event
const textMessageEnd = {
type: EventType.TEXT_MESSAGE_END,
messageId,
}
res.write(encoder.encode(textMessageEnd))
// Send run finished event
const runFinished = {
type: EventType.RUN_FINISHED,
threadId: input.threadId,
runId: input.runId,
}
res.write(encoder.encode(runFinished))
// End the response
res.end()
} catch (error) {
res.status(422).json({ error: (error as Error).message })
}
})
app.listen(8000, () => {
console.log("Server running on http://localhost:8000")
})
You’ll need to set your OpenAI API key as an environment variable and then start
the server:
export OPENAI_API_KEY=your-api-key
npx ts-node src/server.ts
This implementation creates a fully functional AG-UI endpoint that processes
messages and streams back the responses in real-time.
Connect Your Agent to a Frontend
Now that you’ve built your AG-UI compatible agent, it’s ready to be connected to
any frontend that supports the AG-UI protocol. One such frontend is
CopilotKit, which provides a rich set of UI
components designed to work seamlessly with AG-UI agents.
To connect your agent to CopilotKit:
- Follow the CopilotKit documentation to set up
your frontend
- Configure CopilotKit to connect to your agent using the AG-UI protocol
- Enjoy a fully functioning chat UI with streaming responses!
With this setup, you can focus on building powerful agent capabilities while
leveraging existing UI components that understand the AG-UI protocol,
significantly reducing development time and effort.