Real-Time AI Agent Streaming with WebSockets and FastAPI
Table of Contents
TL;DR: WebSocket streaming eliminates the 5-30 second loading spinner problem for AI agent responses by delivering tokens in real-time — the same experience as ChatGPT or Claude. The implementation in FastAPI with Pydantic AI uses structured event types (
text_delta,tool_call,tool_result,complete) that enable rich frontend rendering where text streams token-by-token while tool calls show visual indicators. Authentication works via JWT tokens passed as query parameters since WebSocket does not support headers, validated during the connection handshake. Production patterns include a ConnectionManager class that tracks active connections with a configurable limit (e.g., 100 max), closes duplicate connections for the same user, and handles graceful disconnects. Conversation history persists to the database after each exchange usingresponse.all_messages()and is loaded on reconnect. The full-stack-ai-agent-template includes all these patterns pre-configured — authenticated, persistent, error-handled WebSocket streaming with both backend and frontend hooks — selectable as a single configuration option.
Why WebSocket Streaming?
HTTP request-response is fine for quick API calls, but AI agent responses can take 5-30 seconds. Without streaming, users stare at a loading spinner. With WebSocket streaming, they see tokens appear in real-time — the same experience as ChatGPT or Claude.
Here’s how to implement it properly with FastAPI.
TL;DR
- WebSockets beat HTTP for AI agents - users see tokens in real-time instead of staring at a spinner for 5-30 seconds.
- Connection manager handles multiple clients - track active connections, broadcast events, and clean up disconnects automatically.
- Structured events for the frontend - separate
text_delta,tool_call,tool_result, andcompleteevent types enable rich UI rendering. - Authentication works via query params - WebSocket doesn’t support headers, so pass JWT tokens as query parameters and validate on connect.
- Production patterns matter - heartbeat pings, reconnection logic, message history persistence, and rate limiting prevent the issues that kill WebSocket apps at scale.
The Architecture
Browser ←→ WebSocket ←→ FastAPI ←→ LLM Provider ↑ ↑ │ token-by-token │ SSE/streaming │ JSON messages │ responseThe browser opens a persistent WebSocket connection. When the user sends a message, FastAPI streams the LLM response back token by token. No polling, no long-running HTTP requests.
Basic Implementation
1. FastAPI WebSocket Endpoint
from fastapi import WebSocket, WebSocketDisconnectfrom pydantic_ai import Agent
agent = Agent("openai:gpt-4o")
@app.websocket("/ws/chat")async def chat(websocket: WebSocket): await websocket.accept()
try: while True: data = await websocket.receive_json() message = data["message"]
async with agent.run_stream(message) as response: async for chunk in response.stream_text(delta=True): await websocket.send_json({ "type": "chunk", "content": chunk, })
await websocket.send_json({"type": "done"})
except WebSocketDisconnect: pass2. Frontend Hook
import { useCallback, useRef, useState } from "react";
export function useChat() { const ws = useRef<WebSocket | null>(null); const [messages, setMessages] = useState<Message[]>([]); const [isStreaming, setIsStreaming] = useState(false);
const connect = useCallback(() => { ws.current = new WebSocket("ws://localhost:8000/ws/chat");
ws.current.onmessage = (event) => { const data = JSON.parse(event.data);
if (data.type === "chunk") { setMessages((prev) => { const last = prev[prev.length - 1]; if (last?.role === "assistant") { return [...prev.slice(0, -1), { ...last, content: last.content + data.content }]; } return [...prev, { role: "assistant", content: data.content }]; }); }
if (data.type === "done") { setIsStreaming(false); } }; }, []);
const send = useCallback((message: string) => { setIsStreaming(true); setMessages((prev) => [...prev, { role: "user", content: message }]); ws.current?.send(JSON.stringify({ message })); }, []);
return { messages, send, connect, isStreaming };}Production Patterns
The basic version works, but production needs more. Here are the patterns we use in every deployment.
Authentication
Never accept anonymous WebSocket connections. Validate the token during the handshake:
from fastapi import WebSocket, Depends, status
async def get_ws_user( websocket: WebSocket, token: str = Query(...),) -> User: try: return await verify_token(token) except InvalidToken: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) raise
@app.websocket("/ws/chat")async def chat( websocket: WebSocket, user: User = Depends(get_ws_user),): await websocket.accept() # user is now authenticatedConversation History
Store messages in the database and pass history to the agent:
from pydantic_ai.messages import ModelMessage
@app.websocket("/ws/chat")async def chat(websocket: WebSocket, user: User = Depends(get_ws_user)): await websocket.accept()
# Load conversation history history: list[ModelMessage] = await db.get_messages(user.id)
try: while True: data = await websocket.receive_json()
async with agent.run_stream( data["message"], message_history=history, deps=Deps(user=user, db=db), ) as response: async for chunk in response.stream_text(delta=True): await websocket.send_json({"type": "chunk", "content": chunk})
await websocket.send_json({"type": "done"})
# Update history with new messages history = response.all_messages() await db.save_messages(user.id, history)
except WebSocketDisconnect: passError Handling
LLM providers fail. Rate limits hit. Connections drop. Handle it gracefully:
from pydantic_ai.exceptions import ModelRetryError
try: async with agent.run_stream(message) as response: async for chunk in response.stream_text(delta=True): await websocket.send_json({"type": "chunk", "content": chunk}) await websocket.send_json({"type": "done"})
except ModelRetryError as e: await websocket.send_json({ "type": "error", "message": "The AI model is temporarily unavailable. Please try again.", })
except Exception as e: logger.error(f"Stream error: {e}") await websocket.send_json({ "type": "error", "message": "An unexpected error occurred.", })Connection Management
Track active connections for graceful shutdown and connection limits:
class ConnectionManager: def __init__(self, max_connections: int = 100): self.active: dict[str, WebSocket] = {} self.max_connections = max_connections
async def connect(self, user_id: str, websocket: WebSocket): if len(self.active) >= self.max_connections: await websocket.close(code=1013) # Try Again Later return False
# Close existing connection for same user if user_id in self.active: await self.active[user_id].close()
self.active[user_id] = websocket return True
def disconnect(self, user_id: str): self.active.pop(user_id, None)Performance Tips
- Use
delta=True— send only new tokens, not the full accumulated text - Batch small chunks — if tokens arrive faster than the WebSocket can send, buffer them
- Set timeouts — disconnect idle connections after 5-10 minutes
- Use connection pooling — reuse LLM provider connections across requests
- Monitor with Logfire — trace every WebSocket message for debugging. See our post on AI agent observability for how we built an AI-powered assistant on top of Logfire.
Skip the Boilerplate
All of these patterns come pre-configured in the Full-Stack AI Agent Template. Select “WebSocket streaming” in the configurator and get authenticated, persistent, error-handled streaming out of the box. See our guides on shipping a production AI app fast and building a full-stack AI app with FastAPI and Next.js for the complete setup.