WebSockets e Comunicação em Tempo Real#
Implementar comunicação bidirecional em tempo real usando WebSockets com FastAPI, incluindo autenticação, gerenciamento de conexões, chat, notificações push e integração com Redis para aplicações distribuídas.
🎯 O que você vai aprender#
Fundamentos de WebSockets e comunicação em tempo real
Implementação de WebSockets com FastAPI
Autenticação e autorização em WebSockets
Sistema de chat em tempo real
Notificações push
Gerenciamento de conexões distribuídas com Redis
Padrões de arquitetura para aplicações real-time
Monitoramento e debugging de WebSockets
🌐 Conceitos Fundamentais de WebSockets#
O que são WebSockets?#
WebSockets são um protocolo de comunicação que permite conexão bidirecional e full-duplex entre cliente e servidor sobre uma única conexão TCP. Diferente do HTTP tradicional, que segue o modelo request-response, WebSockets mantêm uma conexão persistente que permite comunicação em tempo real.
Características dos WebSockets#
Conexão Persistente: Uma vez estabelecida, a conexão permanece aberta
Bidirecional: Cliente e servidor podem enviar dados a qualquer momento
Full-Duplex: Dados podem fluir simultaneamente em ambas as direções
Baixa Latência: Sem overhead de headers HTTP em cada mensagem
Eficiência: Menos recursos de rede comparado a polling
HTTP vs WebSockets#
HTTP Tradicional (Request-Response)#
Cliente Servidor
| |
|-------- Request -------->|
| |
|<------- Response --------|
| |
|-------- Request -------->|
| |
|<------- Response --------|
Características:
Stateless (sem estado)
Unidirecional (cliente inicia)
Overhead de headers em cada request
Ideal para operações CRUD
WebSockets#
Cliente Servidor
| |
|------ Handshake -------->|
|<----- Handshake ---------|
| |
|<====== Mensagens ======>|
|<====== Bidirecionais ===>|
|<====== Full-Duplex =====>|
Características:
Stateful (mantém estado)
Bidirecional (ambos podem iniciar)
Baixo overhead após handshake
Ideal para tempo real
Casos de Uso para WebSockets#
1. Chat e Mensagens Instantâneas#
Aplicações de mensagens em tempo real
Salas de chat com múltiplos usuários
Indicadores de “usuário digitando”
Histórico de mensagens sincronizado
2. Notificações em Tempo Real#
Alertas do sistema
Notificações de atividade do usuário
Atualizações de status
Lembretes e avisos
3. Dashboards e Monitoramento#
Métricas em tempo real
Gráficos dinâmicos
Status de sistemas
Alertas operacionais
4. Colaboração em Tempo Real#
Editores colaborativos
Quadros compartilhados
Comentários em tempo real
Sincronização de cursores
5. Jogos e Aplicações Interativas#
Jogos multiplayer
Aplicações de desenho colaborativo
Votações em tempo real
Leilões online
🚀 Implementação com FastAPI#
Configuração Básica#
# main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import json
import logging
app = FastAPI(title="WebSocket Chat API")
logger = logging.getLogger(__name__)
# Gerenciador de conexões simples
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
logger.info(f"Nova conexão WebSocket. Total: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
logger.info(f"Conexão WebSocket removida. Total: {len(self.active_connections)}")
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except Exception as e:
logger.error(f"Erro ao enviar mensagem: {e}")
manager = ConnectionManager()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"Mensagem recebida: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
Estruturas de Dados para Mensagens#
# schemas/websocket.py
from pydantic import BaseModel
from typing import Optional, Dict, Any
from datetime import datetime
from enum import Enum
class MessageType(str, Enum):
CHAT = "chat"
NOTIFICATION = "notification"
SYSTEM = "system"
TYPING = "typing"
USER_JOIN = "user_join"
USER_LEAVE = "user_leave"
class WebSocketMessage(BaseModel):
type: MessageType
data: Dict[str, Any]
timestamp: datetime = datetime.now()
user_id: Optional[str] = None
room_id: Optional[str] = None
class ChatMessage(BaseModel):
message: str
user_id: str
username: str
room_id: str
timestamp: datetime = datetime.now()
class NotificationMessage(BaseModel):
title: str
content: str
severity: str = "info" # info, warning, error, success
action_url: Optional[str] = None
class TypingIndicator(BaseModel):
user_id: str
username: str
room_id: str
is_typing: bool
🔐 Autenticação WebSocket#
Middleware de Autenticação#
# core/websocket_auth.py
from fastapi import WebSocket, HTTPException, status
from fastapi.security import HTTPBearer
from typing import Optional
from core.auth import decode_access_token
from db.models.user import User
from db.session import SessionLocal
import logging
logger = logging.getLogger(__name__)
async def authenticate_websocket(websocket: WebSocket) -> User | None:
"""Authenticate WebSocket connection using token from query params or headers."""
token = None
# Tentar obter token dos query parameters
token = websocket.query_params.get("token")
# Se não encontrou, tentar dos headers
if not token:
auth_header = websocket.headers.get("authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
if not token:
logger.warning("WebSocket connection without authentication token")
return None
try:
# Decodificar token
payload = decode_access_token(token)
if not payload:
logger.warning("Invalid WebSocket authentication token")
return None
user_id = payload.get("sub")
if not user_id:
logger.warning("Token without user ID")
return None
# Buscar usuário no banco
db = SessionLocal()
try:
user = db.query(User).filter(User.id == int(user_id)).first()
if not user or not user.is_active:
logger.warning(f"User {user_id} not found or inactive")
return None
return user
finally:
db.close()
except Exception as e:
logger.error(f"WebSocket authentication error: {e}")
return None
async def require_websocket_auth(websocket: WebSocket) -> User:
"""Require authentication for WebSocket connection."""
user = await authenticate_websocket(websocket)
if not user:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Authentication required")
raise HTTPException(status_code=401, detail="Authentication required")
return user
Gerenciador de Conexões Autenticadas#
# core/websocket_manager.py
from typing import Dict, Set, Optional, List
from fastapi import WebSocket
from db.models.user import User
from schemas.websocket import WebSocketMessage, MessageType
import json
import logging
logger = logging.getLogger(__name__)
class AuthenticatedConnectionManager:
def __init__(self):
# user_id -> websocket
self.user_connections: Dict[str, WebSocket] = {}
# room_id -> set of user_ids
self.room_members: Dict[str, Set[str]] = {}
# user_id -> set of room_ids
self.user_rooms: Dict[str, Set[str]] = {}
async def connect_user(self, websocket: WebSocket, user: User):
"""Connect authenticated user."""
user_id = str(user.id)
# Desconectar conexão anterior se existir
if user_id in self.user_connections:
try:
await self.user_connections[user_id].close()
except:
pass
self.user_connections[user_id] = websocket
# Inicializar rooms do usuário se não existir
if user_id not in self.user_rooms:
self.user_rooms[user_id] = set()
logger.info(f"User {user.username} ({user_id}) connected via WebSocket")
# Enviar mensagem de boas-vindas
await self.send_to_user(user_id, WebSocketMessage(
type=MessageType.SYSTEM,
data={"message": f"Bem-vindo, {user.username}!"}
))
def disconnect_user(self, user_id: str):
"""Disconnect user and clean up."""
if user_id in self.user_connections:
del self.user_connections[user_id]
# Remover usuário de todas as salas
if user_id in self.user_rooms:
for room_id in self.user_rooms[user_id].copy():
self.leave_room(user_id, room_id)
del self.user_rooms[user_id]
logger.info(f"User {user_id} disconnected")
async def join_room(self, user_id: str, room_id: str):
"""Add user to room."""
if room_id not in self.room_members:
self.room_members[room_id] = set()
self.room_members[room_id].add(user_id)
self.user_rooms[user_id].add(room_id)
# Notificar outros membros da sala
await self.broadcast_to_room(room_id, WebSocketMessage(
type=MessageType.USER_JOIN,
data={"user_id": user_id, "room_id": room_id},
user_id=user_id
), exclude_user=user_id)
logger.info(f"User {user_id} joined room {room_id}")
def leave_room(self, user_id: str, room_id: str):
"""Remove user from room."""
if room_id in self.room_members:
self.room_members[room_id].discard(user_id)
# Remover sala se vazia
if not self.room_members[room_id]:
del self.room_members[room_id]
if user_id in self.user_rooms:
self.user_rooms[user_id].discard(room_id)
logger.info(f"User {user_id} left room {room_id}")
async def send_to_user(self, user_id: str, message: WebSocketMessage):
"""Send message to specific user."""
if user_id in self.user_connections:
try:
websocket = self.user_connections[user_id]
await websocket.send_text(message.model_dump_json())
except Exception as e:
logger.error(f"Error sending message to user {user_id}: {e}")
self.disconnect_user(user_id)
async def broadcast_to_room(self, room_id: str, message: WebSocketMessage, exclude_user: Optional[str] = None):
"""Broadcast message to all users in room."""
if room_id not in self.room_members:
return
for user_id in self.room_members[room_id]:
if exclude_user and user_id == exclude_user:
continue
await self.send_to_user(user_id, message)
async def broadcast_to_all(self, message: WebSocketMessage):
"""Broadcast message to all connected users."""
for user_id in self.user_connections:
await self.send_to_user(user_id, message)
def get_room_members(self, room_id: str) -> List[str]:
"""Get list of user IDs in room."""
return list(self.room_members.get(room_id, set()))
def get_user_rooms(self, user_id: str) -> List[str]:
"""Get list of room IDs user is in."""
return list(self.user_rooms.get(user_id, set()))
def is_user_online(self, user_id: str) -> bool:
"""Check if user is currently connected."""
return user_id in self.user_connections
# Instância global
connection_manager = AuthenticatedConnectionManager()
💬 Sistema de Chat em Tempo Real#
Modelos de Dados#
# db/models/chat.py
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from db.base import Base
class ChatRoom(Base):
__tablename__ = "chat_rooms"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100), nullable=False)
description = Column(Text)
is_private = Column(Boolean, default=False)
created_by = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relacionamentos
creator = relationship("User", back_populates="created_rooms")
messages = relationship("ChatMessage", back_populates="room", cascade="all, delete-orphan")
members = relationship("ChatRoomMember", back_populates="room", cascade="all, delete-orphan")
class ChatMessage(Base):
__tablename__ = "chat_messages"
id = Column(Integer, primary_key=True, index=True)
content = Column(Text, nullable=False)
room_id = Column(Integer, ForeignKey("chat_rooms.id"))
user_id = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime(timezone=True), server_default=func.now())
edited_at = Column(DateTime(timezone=True))
is_deleted = Column(Boolean, default=False)
# Relacionamentos
room = relationship("ChatRoom", back_populates="messages")
user = relationship("User", back_populates="chat_messages")
class ChatRoomMember(Base):
__tablename__ = "chat_room_members"
id = Column(Integer, primary_key=True, index=True)
room_id = Column(Integer, ForeignKey("chat_rooms.id"))
user_id = Column(Integer, ForeignKey("users.id"))
joined_at = Column(DateTime(timezone=True), server_default=func.now())
is_admin = Column(Boolean, default=False)
# Relacionamentos
room = relationship("ChatRoom", back_populates="members")
user = relationship("User", back_populates="chat_memberships")
Endpoints WebSocket para Chat#
# api/websocket/chat.py
from fastapi import WebSocket, WebSocketDisconnect, Depends
from sqlalchemy.orm import Session
from db.session import get_db
from core.websocket_auth import require_websocket_auth
from core.websocket_manager import connection_manager
from schemas.websocket import WebSocketMessage, MessageType, ChatMessage
from services.chat_service import ChatService
import json
import logging
logger = logging.getLogger(__name__)
async def handle_chat_websocket(websocket: WebSocket, db: Session = Depends(get_db)):
"""Handle chat WebSocket connections."""
try:
# Autenticar usuário
user = await require_websocket_auth(websocket)
# Conectar usuário
await connection_manager.connect_user(websocket, user)
chat_service = ChatService(db)
while True:
# Receber mensagem
data = await websocket.receive_text()
message_data = json.loads(data)
message_type = message_data.get("type")
payload = message_data.get("data", {})
if message_type == "join_room":
await handle_join_room(user.id, payload, chat_service)
elif message_type == "leave_room":
await handle_leave_room(user.id, payload)
elif message_type == "send_message":
await handle_send_message(user, payload, chat_service)
elif message_type == "typing":
await handle_typing_indicator(user, payload)
else:
logger.warning(f"Unknown message type: {message_type}")
except WebSocketDisconnect:
connection_manager.disconnect_user(str(user.id))
except Exception as e:
logger.error(f"WebSocket error: {e}")
connection_manager.disconnect_user(str(user.id))
async def handle_join_room(user_id: int, payload: dict, chat_service: ChatService):
"""Handle user joining a chat room."""
room_id = payload.get("room_id")
if not room_id:
return
# Verificar se usuário pode entrar na sala
if await chat_service.can_user_join_room(user_id, room_id):
await connection_manager.join_room(str(user_id), str(room_id))
# Enviar histórico de mensagens
messages = await chat_service.get_room_messages(room_id, limit=50)
await connection_manager.send_to_user(str(user_id), WebSocketMessage(
type=MessageType.SYSTEM,
data={"messages": [msg.dict() for msg in messages]}
))
async def handle_leave_room(user_id: int, payload: dict):
"""Handle user leaving a chat room."""
room_id = payload.get("room_id")
if room_id:
connection_manager.leave_room(str(user_id), str(room_id))
async def handle_send_message(user, payload: dict, chat_service: ChatService):
"""Handle sending a chat message."""
room_id = payload.get("room_id")
content = payload.get("message")
if not room_id or not content:
return
# Salvar mensagem no banco
message = await chat_service.create_message(
user_id=user.id,
room_id=room_id,
content=content
)
# Broadcast para a sala
chat_message = ChatMessage(
message=content,
user_id=str(user.id),
username=user.username,
room_id=str(room_id)
)
await connection_manager.broadcast_to_room(str(room_id), WebSocketMessage(
type=MessageType.CHAT,
data=chat_message.dict(),
user_id=str(user.id)
))
async def handle_typing_indicator(user, payload: dict):
"""Handle typing indicator."""
room_id = payload.get("room_id")
is_typing = payload.get("is_typing", False)
if room_id:
await connection_manager.broadcast_to_room(str(room_id), WebSocketMessage(
type=MessageType.TYPING,
data={
"user_id": str(user.id),
"username": user.username,
"is_typing": is_typing
},
user_id=str(user.id)
), exclude_user=str(user.id))
🔔 Sistema de Notificações Push#
Gerenciador de Notificações#
# core/notification_manager.py
from typing import Dict, List, Optional
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
from core.websocket_manager import connection_manager
from schemas.websocket import WebSocketMessage, MessageType, NotificationMessage
class NotificationPriority(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
@dataclass
class Notification:
id: str
title: str
content: str
priority: NotificationPriority
user_id: str | None = None
action_url: str | None = None
created_at: datetime = datetime.now()
read: bool = False
class NotificationManager:
def __init__(self):
# user_id -> list of notifications
self.user_notifications: dict[str, list[Notification]] = {}
async def send_notification(
self,
user_id: str,
title: str,
content: str,
priority: NotificationPriority = NotificationPriority.MEDIUM,
action_url: str | None = None
):
"""Send notification to specific user."""
notification = Notification(
id=f"notif_{datetime.now().timestamp()}",
title=title,
content=content,
priority=priority,
user_id=user_id,
action_url=action_url
)
# Armazenar notificação
if user_id not in self.user_notifications:
self.user_notifications[user_id] = []
self.user_notifications[user_id].append(notification)
# Enviar via WebSocket se usuário estiver online
if connection_manager.is_user_online(user_id):
await connection_manager.send_to_user(user_id, WebSocketMessage(
type=MessageType.NOTIFICATION,
data=NotificationMessage(
title=title,
content=content,
severity=priority.value,
action_url=action_url
).dict()
))
async def broadcast_notification(
self,
title: str,
content: str,
priority: NotificationPriority = NotificationPriority.MEDIUM,
action_url: Optional[str] = None
):
"""Broadcast notification to all online users."""
await connection_manager.broadcast_to_all(WebSocketMessage(
type=MessageType.NOTIFICATION,
data=NotificationMessage(
title=title,
content=content,
severity=priority.value,
action_url=action_url
).dict()
))
def get_user_notifications(self, user_id: str, unread_only: bool = False) -> List[Notification]:
"""Get notifications for user."""
notifications = self.user_notifications.get(user_id, [])
if unread_only:
notifications = [n for n in notifications if not n.read]
return notifications
def mark_as_read(self, user_id: str, notification_id: str):
"""Mark notification as read."""
notifications = self.user_notifications.get(user_id, [])
for notification in notifications:
if notification.id == notification_id:
notification.read = True
break
# Instância global
notification_manager = NotificationManager()
Integração com Redis para Distribuição#
# core/redis_websocket.py
import redis.asyncio as redis
import json
import logging
from typing import Optional
from core.websocket_manager import connection_manager
from schemas.websocket import WebSocketMessage
logger = logging.getLogger(__name__)
class RedisWebSocketManager:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_url = redis_url
self.redis_client: redis.Redis | None = None
self.pubsub: redis.client.PubSub | None = None
async def connect(self):
"""Connect to Redis."""
self.redis_client = redis.from_url(self.redis_url)
self.pubsub = self.redis_client.pubsub()
# Subscribe to WebSocket channels
await self.pubsub.subscribe("websocket:broadcast", "websocket:user", "websocket:room")
logger.info("Connected to Redis for WebSocket distribution")
async def disconnect(self):
"""Disconnect from Redis."""
if self.pubsub:
await self.pubsub.unsubscribe()
await self.pubsub.close()
if self.redis_client:
await self.redis_client.close()
async def listen_for_messages(self):
"""Listen for Redis messages and distribute to local connections."""
if not self.pubsub:
return
async for message in self.pubsub.listen():
if message["type"] == "message":
try:
data = json.loads(message["data"])
channel = message["channel"].decode()
if channel == "websocket:broadcast":
await self._handle_broadcast(data)
elif channel == "websocket:user":
await self._handle_user_message(data)
elif channel == "websocket:room":
await self._handle_room_message(data)
except Exception as e:
logger.error(f"Error processing Redis message: {e}")
async def _handle_broadcast(self, data: dict):
"""Handle broadcast message."""
message = WebSocketMessage(**data["message"])
await connection_manager.broadcast_to_all(message)
async def _handle_user_message(self, data: dict):
"""Handle user-specific message."""
user_id = data["user_id"]
message = WebSocketMessage(**data["message"])
await connection_manager.send_to_user(user_id, message)
async def _handle_room_message(self, data: dict):
"""Handle room message."""
room_id = data["room_id"]
message = WebSocketMessage(**data["message"])
exclude_user = data.get("exclude_user")
await connection_manager.broadcast_to_room(room_id, message, exclude_user)
async def publish_broadcast(self, message: WebSocketMessage):
"""Publish broadcast message to Redis."""
if self.redis_client:
await self.redis_client.publish("websocket:broadcast", json.dumps({
"message": message.dict()
}))
async def publish_user_message(self, user_id: str, message: WebSocketMessage):
"""Publish user message to Redis."""
if self.redis_client:
await self.redis_client.publish("websocket:user", json.dumps({
"user_id": user_id,
"message": message.dict()
}))
async def publish_room_message(self, room_id: str, message: WebSocketMessage, exclude_user: Optional[str] = None):
"""Publish room message to Redis."""
if self.redis_client:
await self.redis_client.publish("websocket:room", json.dumps({
"room_id": room_id,
"message": message.dict(),
"exclude_user": exclude_user
}))
# Instância global
redis_websocket_manager = RedisWebSocketManager()
📊 Monitoramento e Métricas#
Métricas WebSocket#
# core/websocket_metrics.py
from prometheus_client import Counter, Gauge, Histogram
import time
from functools import wraps
# Métricas Prometheus
websocket_connections_total = Counter(
'websocket_connections_total',
'Total number of WebSocket connections',
['status'] # connected, disconnected, failed
)
websocket_active_connections = Gauge(
'websocket_active_connections',
'Number of active WebSocket connections'
)
websocket_messages_total = Counter(
'websocket_messages_total',
'Total number of WebSocket messages',
['type', 'direction'] # type: chat, notification, etc; direction: sent, received
)
websocket_message_duration = Histogram(
'websocket_message_duration_seconds',
'Time spent processing WebSocket messages',
['type']
)
def track_websocket_connection(func):
"""Decorator to track WebSocket connections."""
@wraps(func)
async def wrapper(*args, **kwargs):
websocket_connections_total.labels(status='connected').inc()
websocket_active_connections.inc()
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
websocket_connections_total.labels(status='failed').inc()
raise
finally:
websocket_connections_total.labels(status='disconnected').inc()
websocket_active_connections.dec()
return wrapper
def track_message_processing(message_type: str):
"""Decorator to track message processing time."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
websocket_messages_total.labels(type=message_type, direction='processed').inc()
return result
finally:
duration = time.time() - start_time
websocket_message_duration.labels(type=message_type).observe(duration)
return wrapper
return decorator
Health Checks para WebSocket#
# core/websocket_health.py
from fastapi import APIRouter, HTTPException
from core.websocket_manager import connection_manager
from core.redis_websocket import redis_websocket_manager
router = APIRouter(prefix="/health", tags=["health"])
@router.get("/websocket")
async def websocket_health():
"""Health check for WebSocket services."""
try:
# Verificar conexões ativas
active_connections = len(connection_manager.user_connections)
# Verificar Redis (se configurado)
redis_status = "not_configured"
if redis_websocket_manager.redis_client:
try:
await redis_websocket_manager.redis_client.ping()
redis_status = "healthy"
except Exception:
redis_status = "unhealthy"
return {
"status": "healthy",
"active_connections": active_connections,
"redis_status": redis_status,
"rooms": len(connection_manager.room_members)
}
except Exception as e:
raise HTTPException(status_code=503, detail=f"WebSocket service unhealthy: {str(e)}")
🧪 Testes para WebSockets#
Testes de Conexão#
# tests/test_websocket.py
import pytest
from fastapi.testclient import TestClient
from main import app
import json
@pytest.fixture
def client():
return TestClient(app)
def test_websocket_connection(client):
"""Test basic WebSocket connection."""
with client.websocket_connect("/ws") as websocket:
# Enviar mensagem de teste
websocket.send_text("Hello WebSocket")
# Receber resposta
data = websocket.receive_text()
assert "Hello WebSocket" in data
def test_websocket_authentication():
"""Test WebSocket authentication."""
# Teste sem token
with pytest.raises(Exception):
with TestClient(app).websocket_connect("/ws/chat"):
pass
# Teste com token válido
token = "valid_jwt_token"
with TestClient(app).websocket_connect(f"/ws/chat?token={token}") as websocket:
data = websocket.receive_text()
message = json.loads(data)
assert message["type"] == "system"
def test_chat_message_flow():
"""Test chat message flow."""
token = "valid_jwt_token"
with TestClient(app).websocket_connect(f"/ws/chat?token={token}") as websocket:
# Entrar em uma sala
websocket.send_text(json.dumps({
"type": "join_room",
"data": {"room_id": "1"}
}))
# Enviar mensagem
websocket.send_text(json.dumps({
"type": "send_message",
"data": {
"room_id": "1",
"message": "Hello, room!"
}
}))
# Verificar se mensagem foi processada
data = websocket.receive_text()
message = json.loads(data)
assert message["type"] == "chat"
assert message["data"]["message"] == "Hello, room!"
📝 Resumo#
Neste step, implementamos um sistema completo de WebSockets e comunicação em tempo real com:
✅ Funcionalidades Implementadas#
Fundamentos WebSocket
Conceitos e diferenças com HTTP
Casos de uso e padrões de comunicação
Protocolo e estados de conexão
Implementação FastAPI
Configuração básica de WebSockets
Gerenciador de conexões autenticadas
Estruturas de dados para mensagens
Autenticação e Segurança
Middleware de autenticação WebSocket
Validação de tokens JWT
Gerenciamento seguro de conexões
Sistema de Chat
Salas de chat com membros
Mensagens em tempo real
Indicadores de digitação
Histórico de mensagens
Notificações Push
Sistema de notificações em tempo real
Diferentes níveis de prioridade
Broadcast e mensagens direcionadas
Integração Redis
Distribuição de mensagens entre instâncias
Pub/Sub para escalabilidade
Gerenciamento distribuído de conexões
Monitoramento
Métricas Prometheus
Health checks específicos
Tracking de performance
Testes e Cliente
Testes automatizados para WebSockets
Cliente JavaScript completo
Exemplos de integração
🎯 Próximos Passos#
No próximo step, vamos abordar Deploy, Monitoramento e Observabilidade, incluindo:
Containerização com Docker
CI/CD com GitHub Actions
Monitoramento em produção
Logging estruturado
Observabilidade e tracing