Skip to content
Back to blog
Open Source

Real-Time AI Agent Streaming with WebSockets and FastAPI

Kacper Włodarczyk · · 6 min read · Updated on February 20, 2026
websockets fastapi streaming python ai-agents
Table of Contents

TL;DR: WebSocket streaming eliminates the 5-30 second loading spinner problem for AI agent responses by delivering tokens in real-time — the same experience as ChatGPT or Claude. The implementation in FastAPI with Pydantic AI uses structured event types (text_delta, tool_call, tool_result, complete) that enable rich frontend rendering where text streams token-by-token while tool calls show visual indicators. Authentication works via JWT tokens passed as query parameters since WebSocket does not support headers, validated during the connection handshake. Production patterns include a ConnectionManager class that tracks active connections with a configurable limit (e.g., 100 max), closes duplicate connections for the same user, and handles graceful disconnects. Conversation history persists to the database after each exchange using response.all_messages() and is loaded on reconnect. The full-stack-ai-agent-template includes all these patterns pre-configured — authenticated, persistent, error-handled WebSocket streaming with both backend and frontend hooks — selectable as a single configuration option.

Why WebSocket Streaming?

HTTP request-response is fine for quick API calls, but AI agent responses can take 5-30 seconds. Without streaming, users stare at a loading spinner. With WebSocket streaming, they see tokens appear in real-time — the same experience as ChatGPT or Claude.

Here’s how to implement it properly with FastAPI.

TL;DR

  • WebSockets beat HTTP for AI agents - users see tokens in real-time instead of staring at a spinner for 5-30 seconds.
  • Connection manager handles multiple clients - track active connections, broadcast events, and clean up disconnects automatically.
  • Structured events for the frontend - separate text_delta, tool_call, tool_result, and complete event types enable rich UI rendering.
  • Authentication works via query params - WebSocket doesn’t support headers, so pass JWT tokens as query parameters and validate on connect.
  • Production patterns matter - heartbeat pings, reconnection logic, message history persistence, and rate limiting prevent the issues that kill WebSocket apps at scale.

The Architecture

architecture
Browser ←→ WebSocket ←→ FastAPI ←→ LLM Provider
↑ ↑
│ token-by-token │ SSE/streaming
│ JSON messages │ response

The browser opens a persistent WebSocket connection. When the user sends a message, FastAPI streams the LLM response back token by token. No polling, no long-running HTTP requests.

Basic Implementation

1. FastAPI WebSocket Endpoint

backend/app/api/ws.py
from fastapi import WebSocket, WebSocketDisconnect
from pydantic_ai import Agent
agent = Agent("openai:gpt-4o")
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
message = data["message"]
async with agent.run_stream(message) as response:
async for chunk in response.stream_text(delta=True):
await websocket.send_json({
"type": "chunk",
"content": chunk,
})
await websocket.send_json({"type": "done"})
except WebSocketDisconnect:
pass

2. Frontend Hook

frontend/src/hooks/useWebSocket.ts
import { useCallback, useRef, useState } from "react";
export function useChat() {
const ws = useRef<WebSocket | null>(null);
const [messages, setMessages] = useState<Message[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
const connect = useCallback(() => {
ws.current = new WebSocket("ws://localhost:8000/ws/chat");
ws.current.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === "chunk") {
setMessages((prev) => {
const last = prev[prev.length - 1];
if (last?.role === "assistant") {
return [...prev.slice(0, -1), { ...last, content: last.content + data.content }];
}
return [...prev, { role: "assistant", content: data.content }];
});
}
if (data.type === "done") {
setIsStreaming(false);
}
};
}, []);
const send = useCallback((message: string) => {
setIsStreaming(true);
setMessages((prev) => [...prev, { role: "user", content: message }]);
ws.current?.send(JSON.stringify({ message }));
}, []);
return { messages, send, connect, isStreaming };
}

Production Patterns

The basic version works, but production needs more. Here are the patterns we use in every deployment.

Authentication

Never accept anonymous WebSocket connections. Validate the token during the handshake:

backend/app/api/ws.py
from fastapi import WebSocket, Depends, status
async def get_ws_user(
websocket: WebSocket,
token: str = Query(...),
) -> User:
try:
return await verify_token(token)
except InvalidToken:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
raise
@app.websocket("/ws/chat")
async def chat(
websocket: WebSocket,
user: User = Depends(get_ws_user),
):
await websocket.accept()
# user is now authenticated

Conversation History

Store messages in the database and pass history to the agent:

backend/app/api/ws.py
from pydantic_ai.messages import ModelMessage
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket, user: User = Depends(get_ws_user)):
await websocket.accept()
# Load conversation history
history: list[ModelMessage] = await db.get_messages(user.id)
try:
while True:
data = await websocket.receive_json()
async with agent.run_stream(
data["message"],
message_history=history,
deps=Deps(user=user, db=db),
) as response:
async for chunk in response.stream_text(delta=True):
await websocket.send_json({"type": "chunk", "content": chunk})
await websocket.send_json({"type": "done"})
# Update history with new messages
history = response.all_messages()
await db.save_messages(user.id, history)
except WebSocketDisconnect:
pass

Error Handling

LLM providers fail. Rate limits hit. Connections drop. Handle it gracefully:

backend/app/api/ws.py
from pydantic_ai.exceptions import ModelRetryError
try:
async with agent.run_stream(message) as response:
async for chunk in response.stream_text(delta=True):
await websocket.send_json({"type": "chunk", "content": chunk})
await websocket.send_json({"type": "done"})
except ModelRetryError as e:
await websocket.send_json({
"type": "error",
"message": "The AI model is temporarily unavailable. Please try again.",
})
except Exception as e:
logger.error(f"Stream error: {e}")
await websocket.send_json({
"type": "error",
"message": "An unexpected error occurred.",
})

Connection Management

Track active connections for graceful shutdown and connection limits:

backend/app/core/connections.py
class ConnectionManager:
def __init__(self, max_connections: int = 100):
self.active: dict[str, WebSocket] = {}
self.max_connections = max_connections
async def connect(self, user_id: str, websocket: WebSocket):
if len(self.active) >= self.max_connections:
await websocket.close(code=1013) # Try Again Later
return False
# Close existing connection for same user
if user_id in self.active:
await self.active[user_id].close()
self.active[user_id] = websocket
return True
def disconnect(self, user_id: str):
self.active.pop(user_id, None)

Performance Tips

  1. Use delta=True — send only new tokens, not the full accumulated text
  2. Batch small chunks — if tokens arrive faster than the WebSocket can send, buffer them
  3. Set timeouts — disconnect idle connections after 5-10 minutes
  4. Use connection pooling — reuse LLM provider connections across requests
  5. Monitor with Logfire — trace every WebSocket message for debugging. See our post on AI agent observability for how we built an AI-powered assistant on top of Logfire.

Skip the Boilerplate

All of these patterns come pre-configured in the Full-Stack AI Agent Template. Select “WebSocket streaming” in the configurator and get authenticated, persistent, error-handled streaming out of the box. See our guides on shipping a production AI app fast and building a full-stack AI app with FastAPI and Next.js for the complete setup.

Share this article

Ready to ship your AI app?

Pick your frameworks, generate a production-ready project, and deploy. 75+ options, one command, zero config debt.

Need help building production AI agents?