Python

WebSocket Servers in Python with FastAPI or Starlette

Introduction

Real-time features such as live notifications, chat messaging, dashboards, and collaborative tools all rely on persistent, low-latency communication between client and server. Traditional HTTP request-response cycles introduce latency and overhead that make them unsuitable for these use cases. Python developers frequently choose FastAPI or Starlette when implementing WebSocket servers, as both frameworks provide clean, asynchronous interfaces for handling two-way communication.

In this comprehensive guide, you will learn how WebSockets work in Python, how FastAPI and Starlette implement them differently, and what patterns you should follow to build scalable and reliable real-time systems. By the end, you will understand how to handle connections, broadcast messages, manage errors, and scale WebSocket servers for production workloads.

Why WebSockets Matter

Many modern applications require immediate updates rather than periodic polling. WebSockets support this by maintaining a continuous connection between the client and the backend, enabling true real-time communication.

  • Real-time event updates: Push changes to connected clients instantly without waiting for requests
  • Lower latency: Eliminate the overhead of establishing new HTTP connections for each message
  • Reduced server load: Persistent connections avoid the cost of repeated connection handshakes
  • Better user experience: Enable responsive features for chat, games, live analytics, and collaboration
  • Bidirectional communication: Both client and server can initiate messages at any time

Python’s async ecosystem, especially FastAPI and Starlette, offers excellent foundations for building these real-time systems.

How WebSockets Work

WebSockets allow the server and client to communicate over a single, persistent TCP connection. This differs from HTTP, which is stateless and treats each request independently. Understanding the WebSocket lifecycle helps you design robust real-time applications.

The Handshake

The connection begins with an HTTP request containing an Upgrade header. The server responds with a 101 status code, and the connection switches from HTTP to the WebSocket protocol.

# Client sends:
GET /ws HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

# Server responds:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

Bidirectional Messaging

Once connected, both sides can send and receive messages at any time without waiting for a request-response cycle. Messages can be text or binary data.

Connection Lifecycle

Connections remain open until either side closes them. Clients may disconnect voluntarily, lose network connectivity, or the server may terminate the session. Heartbeat messages (ping/pong frames) keep connections alive and detect dead clients.

WebSocket Support in FastAPI

FastAPI includes first-class WebSocket support, building directly on top of Starlette while adding type hints, dependency injection, and clean async design. This makes it ideal for applications that need both REST APIs and WebSocket endpoints.

Basic Echo Server

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            message = await websocket.receive_text()
            await websocket.send_text(f"Echo: {message}")
    except WebSocketDisconnect:
        print("Client disconnected")

Clients can connect to /ws and send messages interactively. The server echoes each message back with a prefix.

Connection Manager for Broadcasting

For chat rooms, live feeds, or any feature that broadcasts to multiple clients, you need to track active connections.

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str, exclude: WebSocket = None):
        for connection in self.active_connections:
            if connection != exclude:
                await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/chat/{room_id}")
async def chat_endpoint(websocket: WebSocket, room_id: str):
    await manager.connect(websocket)
    await manager.broadcast(f"User joined room {room_id}")
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(f"Room {room_id}: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"User left room {room_id}")

Using Dependency Injection

FastAPI’s dependency injection works with WebSocket endpoints, allowing you to inject authentication, database sessions, or other dependencies.

from fastapi import Depends, WebSocket, HTTPException, status

async def get_current_user(websocket: WebSocket):
    token = websocket.query_params.get("token")
    if not token:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        raise HTTPException(status_code=403, detail="No token provided")
    # Validate token and return user
    user = await validate_token(token)
    return user

@app.websocket("/ws/secure")
async def secure_endpoint(websocket: WebSocket, user = Depends(get_current_user)):
    await websocket.accept()
    await websocket.send_text(f"Welcome, {user.name}!")
    # Continue handling messages...

WebSocket Support in Starlette

Since FastAPI builds on Starlette, you can also use Starlette directly for lightweight WebSocket servers. The API is simple and minimal, making it ideal for microservices or when you want maximum control.

Functional Approach

from starlette.applications import Starlette
from starlette.routing import WebSocketRoute
from starlette.websockets import WebSocket

async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Echo: {data}")
    except Exception:
        await websocket.close()

app = Starlette(routes=[
    WebSocketRoute("/ws", websocket_endpoint),
])

Class-Based Endpoints

Starlette also supports class-based WebSocket endpoints for better organization.

from starlette.endpoints import WebSocketEndpoint
from starlette.routing import WebSocketRoute

class ChatEndpoint(WebSocketEndpoint):
    encoding = "text"

    async def on_connect(self, websocket: WebSocket):
        await websocket.accept()
        print(f"Client connected: {websocket.client}")

    async def on_receive(self, websocket: WebSocket, data: str):
        await websocket.send_text(f"Message received: {data}")

    async def on_disconnect(self, websocket: WebSocket, close_code: int):
        print(f"Client disconnected with code: {close_code}")

app = Starlette(routes=[
    WebSocketRoute("/ws/chat", ChatEndpoint),
])

Handling Errors and Disconnects

WebSocket applications must handle broken connections, network failures, and malformed messages gracefully. Proper error handling prevents resource leaks and improves reliability.

from fastapi import WebSocket, WebSocketDisconnect
import asyncio

@app.websocket("/ws/robust")
async def robust_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            try:
                # Set a timeout for receiving messages
                data = await asyncio.wait_for(
                    websocket.receive_text(),
                    timeout=60.0  # 60 second timeout
                )
                # Validate and process message
                if not data or len(data) > 10000:
                    await websocket.send_text("Invalid message")
                    continue
                await websocket.send_text(f"Processed: {data}")
            except asyncio.TimeoutError:
                # Send ping to check if client is alive
                await websocket.send_text("ping")
    except WebSocketDisconnect:
        print("Client disconnected normally")
    except Exception as e:
        print(f"Unexpected error: {e}")
    finally:
        # Clean up resources
        print("Connection closed, cleaning up")

Implementing Heartbeats

import asyncio

async def heartbeat(websocket: WebSocket, interval: int = 30):
    """Send periodic pings to keep connection alive."""
    while True:
        try:
            await asyncio.sleep(interval)
            await websocket.send_json({"type": "ping"})
        except Exception:
            break

@app.websocket("/ws/with-heartbeat")
async def endpoint_with_heartbeat(websocket: WebSocket):
    await websocket.accept()
    heartbeat_task = asyncio.create_task(heartbeat(websocket))
    try:
        while True:
            data = await websocket.receive_json()
            if data.get("type") == "pong":
                continue  # Heartbeat response
            # Process other messages
            await websocket.send_json({"type": "response", "data": data})
    except WebSocketDisconnect:
        pass
    finally:
        heartbeat_task.cancel()

Scaling WebSocket Servers

Scaling WebSocket applications requires different strategies than traditional HTTP workloads because connections persist for extended periods. Each server instance only knows about its own connected clients.

Using Redis Pub/Sub for Distributed Broadcasting

When running multiple server instances, use Redis Pub/Sub to broadcast messages across all nodes.

import aioredis
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio

app = FastAPI()
redis = None

@app.on_event("startup")
async def startup():
    global redis
    redis = await aioredis.from_url("redis://localhost")

class DistributedConnectionManager:
    def __init__(self):
        self.local_connections: dict[str, list[WebSocket]] = {}

    async def connect(self, channel: str, websocket: WebSocket):
        await websocket.accept()
        if channel not in self.local_connections:
            self.local_connections[channel] = []
            # Start listening to Redis channel
            asyncio.create_task(self._listen_redis(channel))
        self.local_connections[channel].append(websocket)

    async def broadcast(self, channel: str, message: str):
        # Publish to Redis so all instances receive it
        await redis.publish(channel, message)

    async def _listen_redis(self, channel: str):
        pubsub = redis.pubsub()
        await pubsub.subscribe(channel)
        async for message in pubsub.listen():
            if message["type"] == "message":
                data = message["data"].decode()
                # Send to all local connections for this channel
                for ws in self.local_connections.get(channel, []):
                    await ws.send_text(data)

manager = DistributedConnectionManager()

Running with ASGI Servers

Deploy WebSocket applications using ASGI servers optimized for async workloads:

# Uvicorn - recommended for development and production
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

# Hypercorn - alternative with HTTP/2 support
hypercorn main:app --bind 0.0.0.0:8000 --workers 4

# Gunicorn with Uvicorn workers
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker

Load Balancing Considerations

WebSocket connections require sticky sessions or Layer-4 load balancing to maintain persistent connections. Configure your load balancer to route based on client IP or use connection-aware routing.

When to Choose FastAPI vs Starlette

Both frameworks support WebSockets well, but they excel in different scenarios.

Choose FastAPI When

  • Building full applications: You need REST APIs and WebSockets in the same project
  • Using dependency injection: Authentication, database sessions, or other dependencies
  • Type safety matters: You want IDE support and validation with Pydantic
  • Auto-generated docs: You need OpenAPI documentation for your HTTP endpoints

Choose Starlette When

  • Building microservices: Lightweight services with minimal overhead
  • Maximum control: You prefer explicit code over framework abstractions
  • WebSocket-only services: Dedicated real-time gateways without REST endpoints
  • Performance critical: Every microsecond counts and you want minimal layers

Common Pitfalls to Avoid

Not Handling Disconnects Properly

Always wrap WebSocket handling in try/except blocks and clean up resources in finally blocks. Unhandled disconnects cause resource leaks.

Blocking the Event Loop

Never run synchronous blocking code in WebSocket handlers. Use asyncio.to_thread() for CPU-bound work or blocking I/O.

Unbounded Connection Lists

Limit the number of concurrent connections per user or room to prevent memory exhaustion attacks.

Missing Authentication

WebSocket endpoints need authentication. Validate tokens during the connection handshake, not after.

Conclusion

WebSocket support in FastAPI and Starlette enables Python developers to build fast, efficient, and fully interactive real-time applications. Both frameworks offer clean async interfaces, strong performance, and excellent integration with modern ASGI servers. FastAPI adds conveniences like dependency injection and type hints, while Starlette provides a minimal foundation for maximum control.

For building REST APIs alongside your WebSocket endpoints, read How to Build a REST API in Python Using FastAPI. To understand async patterns that power these frameworks, see Async Programming in Python: asyncio, Trio, and Curio. For handling authentication in your real-time applications, explore OAuth2, JWT, and Session Tokens Explained. Reference the official FastAPI WebSocket documentation and the Starlette WebSocket documentation for the latest APIs and best practices.

Leave a Comment