Deploy, Monitoramento e Observabilidade#
Implementar deploy em produção com Docker, CI/CD, monitoramento completo, logging estruturado e observabilidade para uma aplicação FastAPI robusta e escalável.
🎯 O que você vai aprender#
Containerização com Docker
Deploy em produção (AWS, GCP, Azure)
CI/CD com GitHub Actions
Monitoramento e métricas
Logging estruturado
Observabilidade e tracing
Health checks e readiness
Backup e disaster recovery
1. Conceitos Fundamentais de Deployment e Monitoramento#
1.1 O que é Deployment?#
Deployment é o processo de disponibilizar uma aplicação para uso em um ambiente de produção. Envolve:
Empacotamento: Preparar a aplicação e suas dependências
Distribuição: Mover a aplicação para o ambiente de produção
Configuração: Ajustar configurações para o ambiente específico
Inicialização: Colocar a aplicação em funcionamento
1.2 Ambientes de Deployment#
# Exemplo de configuração por ambiente
from enum import Enum
from pydantic import BaseSettings
class Environment(str, Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
class Settings(BaseSettings):
environment: Environment = Environment.DEVELOPMENT
debug: bool = False
database_url: str
redis_url: str
secret_key: str
class Config:
env_file = f".env.{environment}"
@property
def is_production(self) -> bool:
return self.environment == Environment.PRODUCTION
1.3 Estratégias de Deployment#
Blue-Green Deployment#
Mantém duas versões idênticas do ambiente
Permite rollback instantâneo
Zero downtime durante atualizações
Rolling Deployment#
Atualiza instâncias gradualmente
Mantém disponibilidade durante o processo
Permite detecção precoce de problemas
Canary Deployment#
Direciona pequena porcentagem do tráfego para nova versão
Permite validação com usuários reais
Reduz riscos de problemas em larga escala
1.4 Os Três Pilares da Observabilidade#
Logs#
Registros de eventos da aplicação
Essenciais para debugging e auditoria
Devem ser estruturados e pesquisáveis
Métricas#
Dados quantitativos sobre performance
Permitem alertas automáticos
Essenciais para capacity planning
Traces#
Rastreamento de requisições através de serviços
Identificam gargalos em sistemas distribuídos
Mostram o fluxo completo de uma operação
2. Containerização com Docker#
2.1 Dockerfile Multi-stage Otimizado#
# Dockerfile
FROM python:3.11-slim as base
# Configurações base
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
# Estágio de dependências
FROM base as dependencies
# Instalar dependências do sistema necessárias para compilação
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# Instalar dependências Python
COPY requirements.txt requirements-dev.txt ./
RUN pip install --user --no-warn-script-location -r requirements.txt
# Estágio de desenvolvimento
FROM dependencies as development
# Instalar dependências de desenvolvimento
RUN pip install --user --no-warn-script-location -r requirements-dev.txt
# Copiar código
COPY . /app
WORKDIR /app
# Usuário não-root
RUN groupadd -r appuser && useradd -r -g appuser appuser
RUN chown -R appuser:appuser /app
USER appuser
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
# Estágio de produção
FROM base as production
# Instalar apenas dependências de runtime
RUN apt-get update && apt-get install -y --no-install-recommends \
libpq5 \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copiar dependências Python do estágio anterior
COPY --from=dependencies /root/.local /root/.local
# Criar usuário não-root
RUN groupadd -r appuser && useradd -r -g appuser appuser
# Copiar aplicação
COPY --chown=appuser:appuser . /app
WORKDIR /app
# Configurar PATH para incluir pacotes do usuário
ENV PATH=/root/.local/bin:$PATH
# Usar usuário não-root
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Expor porta
EXPOSE 8000
# Comando padrão
CMD ["gunicorn", "app.main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]
2.2 Docker Compose para Desenvolvimento#
# docker-compose.yml
version: '3.8'
services:
app:
build:
context: .
target: development
ports:
- "8000:8000"
volumes:
- .:/app
- /app/__pycache__
environment:
- DATABASE_URL=postgresql://user:password@db:5432/fastapi_db
- REDIS_URL=redis://redis:6379/0
- ENVIRONMENT=development
depends_on:
- db
- redis
networks:
- app-network
db:
image: postgres:15-alpine
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: password
POSTGRES_DB: fastapi_db
volumes:
- postgres_data:/var/lib/postgresql/data
- ./scripts/init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
networks:
- app-network
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- app-network
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/ssl:/etc/nginx/ssl
depends_on:
- app
networks:
- app-network
volumes:
postgres_data:
redis_data:
networks:
app-network:
driver: bridge
2.3 Docker Compose para Produção#
# docker-compose.prod.yml
version: '3.8'
services:
app:
build:
context: .
target: production
restart: unless-stopped
environment:
- DATABASE_URL=${DATABASE_URL}
- REDIS_URL=${REDIS_URL}
- SECRET_KEY=${SECRET_KEY}
- ENVIRONMENT=production
depends_on:
- db
- redis
networks:
- app-network
deploy:
replicas: 3
resources:
limits:
cpus: '0.5'
memory: 512M
reservations:
cpus: '0.25'
memory: 256M
db:
image: postgres:15-alpine
restart: unless-stopped
environment:
POSTGRES_USER: ${DB_USER}
POSTGRES_PASSWORD: ${DB_PASSWORD}
POSTGRES_DB: ${DB_NAME}
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- app-network
deploy:
resources:
limits:
cpus: '1'
memory: 1G
redis:
image: redis:7-alpine
restart: unless-stopped
volumes:
- redis_data:/data
networks:
- app-network
command: redis-server --appendonly yes --requirepass ${REDIS_PASSWORD}
nginx:
image: nginx:alpine
restart: unless-stopped
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/nginx.prod.conf:/etc/nginx/nginx.conf
- ./nginx/ssl:/etc/nginx/ssl
- static_files:/app/static
depends_on:
- app
networks:
- app-network
volumes:
postgres_data:
redis_data:
static_files:
networks:
app-network:
driver: overlay
3. Configuração Nginx#
3.1 Nginx para Produção#
# nginx/nginx.prod.conf
events {
worker_connections 2048;
}
http {
# Configurações de performance
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
types_hash_max_size 2048;
# Gzip compression
gzip on;
gzip_vary on;
gzip_min_length 1024;
gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript;
# Rate limiting
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
limit_req_zone $binary_remote_addr zone=login:10m rate=1r/s;
# Upstream servers
upstream app {
least_conn;
server app_1:8000 max_fails=3 fail_timeout=30s;
server app_2:8000 max_fails=3 fail_timeout=30s;
server app_3:8000 max_fails=3 fail_timeout=30s;
}
# HTTP to HTTPS redirect
server {
listen 80;
server_name yourdomain.com www.yourdomain.com;
return 301 https://$server_name$request_uri;
}
# HTTPS server
server {
listen 443 ssl http2;
server_name yourdomain.com www.yourdomain.com;
# SSL configuration
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384;
ssl_prefer_server_ciphers off;
# Security headers
add_header X-Frame-Options DENY;
add_header X-Content-Type-Options nosniff;
add_header X-XSS-Protection "1; mode=block";
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload";
# Static files
location /static/ {
alias /app/static/;
expires 1y;
add_header Cache-Control "public, immutable";
}
# API endpoints with rate limiting
location /api/ {
limit_req zone=api burst=20 nodelay;
proxy_pass http://app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# Login endpoint with stricter rate limiting
location /api/auth/login {
limit_req zone=login burst=5 nodelay;
proxy_pass http://app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# WebSocket connections
location /ws {
proxy_pass http://app;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_read_timeout 86400;
}
# Health check
location /health {
access_log off;
proxy_pass http://app;
proxy_set_header Host $host;
}
}
}
4. CI/CD com GitHub Actions#
4.1 Workflow Principal#
# .github/workflows/ci-cd.yml
name: CI/CD Pipeline
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
env:
REGISTRY: ghcr.io
IMAGE_NAME: \${{ github.repository }}
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: test_db
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
redis:
image: redis:7
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Cache dependencies
uses: actions/cache@v3
with:
path: ~/.cache/pip
key: \${{ runner.os }}-pip-\${{ hashFiles('**/requirements*.txt') }}
restore-keys: |
\${{ runner.os }}-pip-
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Run linting
run: |
flake8 app tests
black --check app tests
isort --check-only app tests
- name: Run type checking
run: mypy app
- name: Run tests
env:
DATABASE_URL: postgresql://postgres:postgres@localhost:5432/test_db
REDIS_URL: redis://localhost:6379/0
SECRET_KEY: test-secret-key
run: |
pytest tests/ -v --cov=app --cov-report=xml --cov-report=html
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run security scan
uses: pypa/gh-action-pip-audit@v1.0.8
with:
inputs: requirements.txt
- name: Run Bandit security linter
run: |
pip install bandit
bandit -r app/
build:
needs: [test, security]
runs-on: ubuntu-latest
if: github.event_name == 'push'
steps:
- uses: actions/checkout@v4
- name: Log in to Container Registry
uses: docker/login-action@v3
with:
registry: \${{ env.REGISTRY }}
username: \${{ github.actor }}
password: \${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: \${{ env.REGISTRY }}/\${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=sha,prefix={{branch}}-
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
target: production
push: true
tags: \${{ steps.meta.outputs.tags }}
labels: \${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
deploy-staging:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/develop'
environment: staging
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: |
echo "Deploying to staging environment"
# Aqui você adicionaria os comandos específicos do seu provedor
deploy-production:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
environment: production
steps:
- uses: actions/checkout@v4
- name: Deploy to production
run: |
echo "Deploying to production environment"
# Comandos de deployment para produção
5. Logging Estruturado#
5.1 Configuração de Logging#
# app/core/logging.py
import json
import logging
import sys
from datetime import datetime
from typing import Any
from pythonjsonlogger import jsonlogger
class StructuredLogger:
def __init__(self, name: str, level: str = "INFO"):
self.logger = logging.getLogger(name)
self.logger.setLevel(getattr(logging, level.upper()))
# Evitar duplicação de handlers
if not self.logger.handlers:
self._setup_handler()
def _setup_handler(self):
"""Configurar handler com formato JSON"""
handler = logging.StreamHandler(sys.stdout)
# Formato JSON para produção
formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(name)s %(levelname)s %(message)s',
datefmt='%Y-%m-%dT%H:%M:%S'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log(self, level: str, message: str, **kwargs):
"""Log estruturado com contexto adicional"""
extra_data = {
"timestamp": datetime.utcnow().isoformat(),
"service": "fastapi-app",
**kwargs
}
log_method = getattr(self.logger, level.lower())
log_method(message, extra=extra_data)
def info(self, message: str, **kwargs):
self.log("info", message, **kwargs)
def error(self, message: str, **kwargs):
self.log("error", message, **kwargs)
def warning(self, message: str, **kwargs):
self.log("warning", message, **kwargs)
def debug(self, message: str, **kwargs):
self.log("debug", message, **kwargs)
# Instância global
logger = StructuredLogger("fastapi-app")
5.2 Middleware de Logging#
# app/middleware/logging.py
import time
import uuid
from typing import Callable
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from app.core.logging import logger
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: Callable) -> Response:
# Gerar ID único para a requisição
request_id = str(uuid.uuid4())
request.state.request_id = request_id
# Capturar informações da requisição
start_time = time.time()
# Log da requisição recebida
logger.info(
"Request received",
request_id=request_id,
method=request.method,
url=str(request.url),
user_agent=request.headers.get("user-agent"),
client_ip=request.client.host if request.client else None
)
try:
# Processar requisição
response = await call_next(request)
# Calcular tempo de processamento
process_time = time.time() - start_time
# Log da resposta
logger.info(
"Request completed",
request_id=request_id,
status_code=response.status_code,
process_time=process_time,
response_size=response.headers.get("content-length")
)
# Adicionar headers de rastreamento
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = str(process_time)
return response
except Exception as e:
# Log de erro
process_time = time.time() - start_time
logger.error(
"Request failed",
request_id=request_id,
error=str(e),
error_type=type(e).__name__,
process_time=process_time
)
raise
6. Métricas com Prometheus#
6.1 Configuração de Métricas#
# app/core/metrics.py
from prometheus_client import Counter, Histogram, Gauge, Info, generate_latest
from prometheus_client.core import CollectorRegistry
import psutil
import time
from typing import Any
class MetricsCollector:
def __init__(self):
self.registry = CollectorRegistry()
# Métricas HTTP
self.http_requests_total = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status_code'],
registry=self.registry
)
self.http_request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration in seconds',
['method', 'endpoint'],
registry=self.registry
)
# Métricas de sistema
self.cpu_usage = Gauge(
'system_cpu_usage_percent',
'System CPU usage percentage',
registry=self.registry
)
self.memory_usage = Gauge(
'system_memory_usage_bytes',
'System memory usage in bytes',
registry=self.registry
)
self.disk_usage = Gauge(
'system_disk_usage_percent',
'System disk usage percentage',
registry=self.registry
)
# Métricas de aplicação
self.active_connections = Gauge(
'websocket_connections_active',
'Active WebSocket connections',
registry=self.registry
)
self.database_connections = Gauge(
'database_connections_active',
'Active database connections',
registry=self.registry
)
self.cache_hits = Counter(
'cache_hits_total',
'Total cache hits',
['cache_type'],
registry=self.registry
)
self.cache_misses = Counter(
'cache_misses_total',
'Total cache misses',
['cache_type'],
registry=self.registry
)
# Informações da aplicação
self.app_info = Info(
'app_info',
'Application information',
registry=self.registry
)
def record_http_request(self, method: str, endpoint: str, status_code: int, duration: float):
"""Registrar métricas de requisição HTTP"""
self.http_requests_total.labels(
method=method,
endpoint=endpoint,
status_code=status_code
).inc()
self.http_request_duration.labels(
method=method,
endpoint=endpoint
).observe(duration)
def update_system_metrics(self):
"""Atualizar métricas do sistema"""
self.cpu_usage.set(psutil.cpu_percent())
memory = psutil.virtual_memory()
self.memory_usage.set(memory.used)
disk = psutil.disk_usage('/')
self.disk_usage.set(disk.percent)
def set_active_connections(self, count: int):
"""Definir número de conexões WebSocket ativas"""
self.active_connections.set(count)
def set_database_connections(self, count: int):
"""Definir número de conexões de base de dados ativas"""
self.database_connections.set(count)
def record_cache_hit(self, cache_type: str):
"""Registrar cache hit"""
self.cache_hits.labels(cache_type=cache_type).inc()
def record_cache_miss(self, cache_type: str):
"""Registrar cache miss"""
self.cache_misses.labels(cache_type=cache_type).inc()
def set_app_info(self, version: str, environment: str):
"""Definir informações da aplicação"""
self.app_info.info({
'version': version,
'environment': environment,
'python_version': psutil.sys.version
})
def get_metrics(self) -> str:
"""Obter métricas no formato Prometheus"""
self.update_system_metrics()
return generate_latest(self.registry)
# Instância global
metrics = MetricsCollector()
6.2 Middleware de Métricas#
# app/middleware/metrics.py
import time
from typing import Callable
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from app.core.metrics import metrics
class MetricsMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next: Callable) -> Response:
start_time = time.time()
# Extrair endpoint limpo (sem parâmetros)
endpoint = self._clean_endpoint(request.url.path)
try:
response = await call_next(request)
# Calcular duração
duration = time.time() - start_time
# Registrar métricas
metrics.record_http_request(
method=request.method,
endpoint=endpoint,
status_code=response.status_code,
duration=duration
)
return response
except Exception as e:
# Registrar erro
duration = time.time() - start_time
metrics.record_http_request(
method=request.method,
endpoint=endpoint,
status_code=500,
duration=duration
)
raise
def _clean_endpoint(self, path: str) -> str:
"""Limpar endpoint para evitar cardinalidade alta"""
# Substituir IDs por placeholder
import re
# Padrões comuns de ID
patterns = [
(r'/\d+', '/{id}'),
(r'/[a-f0-9-]{36}', '/{uuid}'),
(r'/[a-f0-9]{24}', '/{objectid}'),
]
for pattern, replacement in patterns:
path = re.sub(pattern, replacement, path)
return path
7. Health Checks Avançados#
7.1 Sistema de Health Checks#
# app/core/health.py
import asyncio
import time
from typing import Any
from enum import Enum
from dataclasses import dataclass
from sqlalchemy.ext.asyncio import AsyncSession
from redis.asyncio import Redis
from app.database import get_db
from app.core.redis import get_redis
class HealthStatus(str, Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class HealthCheck:
name: str
status: HealthStatus
response_time: float
details: dict[str, Any] = None
error: str = None
class HealthMonitor:
def __init__(self):
self.checks = {}
self.last_check_time = None
self.cache_duration = 30 # segundos
async def check_database(self) -> HealthCheck:
"""Verificar saúde da base de dados"""
start_time = time.time()
try:
async with get_db() as db:
# Executar query simples
result = await db.execute("SELECT 1")
await result.fetchone()
response_time = time.time() - start_time
return HealthCheck(
name="database",
status=HealthStatus.HEALTHY,
response_time=response_time,
details={"connection": "active"}
)
except Exception as e:
response_time = time.time() - start_time
return HealthCheck(
name="database",
status=HealthStatus.UNHEALTHY,
response_time=response_time,
error=str(e)
)
async def check_redis(self) -> HealthCheck:
"""Verificar saúde do Redis"""
start_time = time.time()
try:
redis = await get_redis()
await redis.ping()
response_time = time.time() - start_time
# Obter informações adicionais
info = await redis.info()
return HealthCheck(
name="redis",
status=HealthStatus.HEALTHY,
response_time=response_time,
details={
"connected_clients": info.get("connected_clients"),
"used_memory": info.get("used_memory_human"),
"uptime": info.get("uptime_in_seconds")
}
)
except Exception as e:
response_time = time.time() - start_time
return HealthCheck(
name="redis",
status=HealthStatus.UNHEALTHY,
response_time=response_time,
error=str(e)
)
async def check_disk_space(self, threshold: float = 0.9) -> HealthCheck:
"""Verificar espaço em disco"""
import psutil
start_time = time.time()
try:
disk_usage = psutil.disk_usage('/')
usage_percent = disk_usage.used / disk_usage.total
response_time = time.time() - start_time
if usage_percent < threshold:
status = HealthStatus.HEALTHY
elif usage_percent < 0.95:
status = HealthStatus.DEGRADED
else:
status = HealthStatus.UNHEALTHY
return HealthCheck(
name="disk_space",
status=status,
response_time=response_time,
details={
"usage_percent": round(usage_percent * 100, 2),
"free_gb": round(disk_usage.free / (1024**3), 2),
"total_gb": round(disk_usage.total / (1024**3), 2)
}
)
except Exception as e:
response_time = time.time() - start_time
return HealthCheck(
name="disk_space",
status=HealthStatus.UNHEALTHY,
response_time=response_time,
error=str(e)
)
async def run_all_checks(self) -> dict[str, Any]:
"""Executar todos os health checks"""
current_time = time.time()
# Usar cache se disponível
if (self.last_check_time and
current_time - self.last_check_time < self.cache_duration and
self.checks):
return self.checks
# Executar checks em paralelo
checks = await asyncio.gather(
self.check_database(),
self.check_redis(),
self.check_disk_space(),
return_exceptions=True
)
# Processar resultados
results = {}
overall_status = HealthStatus.HEALTHY
for check in checks:
if isinstance(check, Exception):
continue
results[check.name] = {
"status": check.status,
"response_time": check.response_time,
"details": check.details,
"error": check.error
}
# Determinar status geral
if check.status == HealthStatus.UNHEALTHY:
overall_status = HealthStatus.UNHEALTHY
elif check.status == HealthStatus.DEGRADED and overall_status == HealthStatus.HEALTHY:
overall_status = HealthStatus.DEGRADED
self.checks = {
"status": overall_status,
"timestamp": current_time,
"checks": results
}
self.last_check_time = current_time
return self.checks
# Instância global
health_monitor = HealthMonitor()
8. Sistema de Alertas#
8.1 Configuração de Alertas#
# app/core/alerts.py
import asyncio
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Any
from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timedelta
import aiohttp
class AlertSeverity(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class Alert:
name: str
severity: AlertSeverity
message: str
details: dict[str, Any]
timestamp: datetime
resolved: bool = False
class AlertManager:
def __init__(self):
self.active_alerts: dict[str, Alert] = {}
self.alert_history: list[Alert] = []
self.notification_channels = []
# Configurações de throttling
self.alert_cooldown = {}
self.cooldown_duration = timedelta(minutes=15)
def add_notification_channel(self, channel):
"""Adicionar canal de notificação"""
self.notification_channels.append(channel)
async def trigger_alert(self, alert: Alert):
"""Disparar alerta"""
alert_key = f"{alert.name}_{alert.severity}"
# Verificar cooldown
if self._is_in_cooldown(alert_key):
return
# Adicionar aos alertas ativos
self.active_alerts[alert_key] = alert
self.alert_history.append(alert)
# Definir cooldown
self.alert_cooldown[alert_key] = datetime.utcnow()
# Enviar notificações
await self._send_notifications(alert)
logger.error(
f"Alert triggered: {alert.name}",
alert_name=alert.name,
severity=alert.severity,
message=alert.message,
details=alert.details
)
async def resolve_alert(self, alert_name: str, severity: AlertSeverity):
"""Resolver alerta"""
alert_key = f"{alert_name}_{severity}"
if alert_key in self.active_alerts:
alert = self.active_alerts[alert_key]
alert.resolved = True
del self.active_alerts[alert_key]
# Notificar resolução
await self._send_resolution_notification(alert)
logger.info(
f"Alert resolved: {alert_name}",
alert_name=alert_name,
severity=severity
)
def _is_in_cooldown(self, alert_key: str) -> bool:
"""Verificar se alerta está em cooldown"""
if alert_key not in self.alert_cooldown:
return False
last_alert = self.alert_cooldown[alert_key]
return datetime.utcnow() - last_alert < self.cooldown_duration
async def _send_notifications(self, alert: Alert):
"""Enviar notificações para todos os canais"""
tasks = []
for channel in self.notification_channels:
tasks.append(channel.send_alert(alert))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def _send_resolution_notification(self, alert: Alert):
"""Enviar notificação de resolução"""
tasks = []
for channel in self.notification_channels:
tasks.append(channel.send_resolution(alert))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
# Instância global
alert_manager = AlertManager()
9. Endpoints de Monitoramento#
9.1 API de Monitoramento#
# app/api/monitoring.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import JSONResponse, Response
from app.core.health import health_monitor, HealthStatus
from app.core.metrics import metrics
from app.core.logging import logger
monitoring_router = APIRouter(prefix="/monitoring", tags=["monitoring"])
@monitoring_router.get("/health")
async def health_check():
"""Health check básico para load balancers"""
try:
health_data = await health_monitor.run_all_checks()
if health_data["status"] == HealthStatus.UNHEALTHY:
return JSONResponse(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content={"status": "unhealthy"}
)
return {"status": "healthy"}
except Exception as e:
logger.error("Health check failed", error=str(e))
return JSONResponse(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content={"status": "unhealthy", "error": str(e)}
)
@monitoring_router.get("/health/detailed")
async def detailed_health_check():
"""Health check detalhado"""
try:
health_data = await health_monitor.run_all_checks()
return health_data
except Exception as e:
logger.error("Detailed health check failed", error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Health check failed"
)
@monitoring_router.get("/metrics")
async def get_metrics():
"""Endpoint para métricas Prometheus"""
try:
metrics_data = metrics.get_metrics()
return Response(content=metrics_data, media_type="text/plain")
except Exception as e:
logger.error("Failed to get metrics", error=str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to get metrics"
)
@monitoring_router.get("/info")
async def app_info():
"""Informações da aplicação"""
import os
import sys
return {
"name": "FastAPI Application",
"version": os.getenv("APP_VERSION", "unknown"),
"environment": os.getenv("ENVIRONMENT", "unknown"),
"python_version": sys.version,
"uptime": time.time() - start_time
}
10. Configuração de Ferramentas Externas#
10.1 Docker Compose com Stack de Monitoramento#
# docker-compose.monitoring.yml
version: '3.8'
services:
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
- '--storage.tsdb.retention.time=200h'
- '--web.enable-lifecycle'
networks:
- monitoring
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
- ./monitoring/grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./monitoring/grafana/datasources:/etc/grafana/provisioning/datasources
networks:
- monitoring
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.security.enabled=false
ports:
- "9200:9200"
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
networks:
- monitoring
kibana:
image: docker.elastic.co/kibana/kibana:8.8.0
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
networks:
- monitoring
volumes:
prometheus_data:
grafana_data:
elasticsearch_data:
networks:
monitoring:
driver: bridge
10.2 Configuração Prometheus#
# monitoring/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alert_rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
scrape_configs:
- job_name: 'fastapi-app'
static_configs:
- targets: ['app:8000']
metrics_path: '/monitoring/metrics'
scrape_interval: 10s
- job_name: 'node-exporter'
static_configs:
- targets: ['node-exporter:9100']
- job_name: 'redis'
static_configs:
- targets: ['redis-exporter:9121']
- job_name: 'postgres'
static_configs:
- targets: ['postgres-exporter:9187']
11. Scripts de Deployment#
11.1 Script de Deploy para Produção#
#!/bin/bash
# scripts/deploy-production.sh
set -e
# Configurações
REGISTRY="ghcr.io"
IMAGE_NAME="your-org/fastapi-app"
TAG=${1:-latest}
COMPOSE_FILE="docker-compose.prod.yml"
echo "🚀 Starting production deployment..."
echo "📦 Image: $REGISTRY/$IMAGE_NAME:$TAG"
# Verificar se todas as variáveis de ambiente estão definidas
required_vars=("DATABASE_URL" "REDIS_URL" "SECRET_KEY")
for var in "${required_vars[@]}"; do
if [ -z "${!var}" ]; then
echo "❌ Environment variable $var is not set"
exit 1
fi
done
# Fazer backup da base de dados
echo "💾 Creating database backup..."
./scripts/backup-database.sh
# Fazer pull da nova imagem
echo "📥 Pulling new image..."
docker pull $REGISTRY/$IMAGE_NAME:$TAG
# Executar migrações
echo "📊 Running database migrations..."
docker run --rm \
--network host \
-e DATABASE_URL="$DATABASE_URL" \
$REGISTRY/$IMAGE_NAME:$TAG \
alembic upgrade head
# Deploy com zero downtime
echo "🔄 Performing rolling update..."
docker-compose -f $COMPOSE_FILE pull
docker-compose -f $COMPOSE_FILE up -d --no-deps app
# Aguardar nova versão ficar pronta
echo "⏳ Waiting for new version to be ready..."
sleep 30
# Verificar saúde da aplicação
echo "🏥 Checking application health..."
for i in {1..10}; do
if curl -f https://yourdomain.com/health > /dev/null 2>&1; then
echo "✅ Application is healthy"
break
fi
if [ $i -eq 10 ]; then
echo "❌ Health check failed, rolling back..."
docker-compose -f $COMPOSE_FILE rollback
exit 1
fi
echo "⏳ Attempt $i/10 failed, retrying..."
sleep 10
done
# Limpeza de imagens antigas
echo "🧹 Cleaning up old images..."
docker image prune -f
echo "🎉 Production deployment completed successfully!"
Próximos Passos#
Com este step completo, você implementou:
Containerização completa com Docker multi-stage
CI/CD robusto com GitHub Actions
Logging estruturado para debugging eficiente
Métricas Prometheus para monitoramento de performance
Health checks avançados para alta disponibilidade
Sistema de alertas para resposta rápida a problemas
Configuração Nginx para load balancing e SSL
Scripts de deployment automatizados
Exercícios Práticos#
Deploy Completo: Configure um ambiente de produção completo
Monitoramento Avançado: Implemente dashboards Grafana personalizados
Alertas Personalizados: Configure alertas específicos para seu domínio
Backup Automatizado: Implemente estratégia de backup e recovery
Scaling Horizontal: Configure auto-scaling baseado em métricas
Recursos Adicionais#
Deploy Script (scripts/deploy.sh)#
#!/bin/bash
set -e
# Configurações
ENVIRONMENT=${1:-production}
IMAGE_TAG=${2:-latest}
REGISTRY="ghcr.io/your-username/fastapi-app"
echo "🚀 Deploying FastAPI app to $ENVIRONMENT"
# Verificar se Docker está rodando
if ! docker info > /dev/null 2>&1; then
echo "❌ Docker is not running"
exit 1
fi
# Pull da imagem mais recente
echo "📥 Pulling latest image..."
docker pull $REGISTRY:$IMAGE_TAG
# Parar containers antigos
echo "🛑 Stopping old containers..."
docker-compose -f docker-compose.$ENVIRONMENT.yml down
# Backup do banco de dados
if [ "$ENVIRONMENT" = "production" ]; then
echo "💾 Creating database backup..."
docker exec postgres pg_dump -U postgres fastapi_db > backup_$(date +%Y%m%d_%H%M%S).sql
fi
# Iniciar novos containers
echo "🔄 Starting new containers..."
docker-compose -f docker-compose.$ENVIRONMENT.yml up -d
# Aguardar containers ficarem saudáveis
echo "⏳ Waiting for containers to be healthy..."
sleep 30
# Verificar health checks
echo "🔍 Checking health status..."
for i in {1..10}; do
if curl -f http://localhost/health > /dev/null 2>&1; then
echo "✅ Application is healthy"
break
fi
if [ $i -eq 10 ]; then
echo "❌ Application failed health check"
exit 1
fi
echo "⏳ Waiting for application to be ready... ($i/10)"
sleep 10
done
# Executar migrações se necessário
echo "🔄 Running database migrations..."
docker-compose -f docker-compose.$ENVIRONMENT.yml exec app alembic upgrade head
echo "🎉 Deployment completed successfully!"
# Limpar imagens antigas
echo "🧹 Cleaning up old images..."
docker image prune -f
echo "📊 Deployment summary:"
docker-compose -f docker-compose.$ENVIRONMENT.yml ps
📊 Monitoramento e Observabilidade#
Configuração do Prometheus (monitoring/prometheus.yml)#
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alert_rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
scrape_configs:
- job_name: 'fastapi-app'
static_configs:
- targets: ['app:8000']
metrics_path: '/metrics'
scrape_interval: 10s
- job_name: 'postgres'
static_configs:
- targets: ['postgres-exporter:9187']
- job_name: 'redis'
static_configs:
- targets: ['redis-exporter:9121']
- job_name: 'nginx'
static_configs:
- targets: ['nginx-exporter:9113']
- job_name: 'node'
static_configs:
- targets: ['node-exporter:9100']
Métricas Customizadas (core/metrics.py)#
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from prometheus_client.core import CollectorRegistry
from fastapi import Request, Response
from typing import Callable
import time
import psutil
import asyncio
# Registry customizado
REGISTRY = CollectorRegistry()
# Métricas HTTP
http_requests_total = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status_code'],
registry=REGISTRY
)
http_request_duration_seconds = Histogram(
'http_request_duration_seconds',
'HTTP request duration in seconds',
['method', 'endpoint'],
registry=REGISTRY
)
# Métricas de aplicação
active_connections = Gauge(
'websocket_active_connections',
'Number of active WebSocket connections',
registry=REGISTRY
)
database_connections = Gauge(
'database_connections_active',
'Number of active database connections',
registry=REGISTRY
)
cache_hits_total = Counter(
'cache_hits_total',
'Total cache hits',
['cache_type'],
registry=REGISTRY
)
cache_misses_total = Counter(
'cache_misses_total',
'Total cache misses',
['cache_type'],
registry=REGISTRY
)
# Métricas de sistema
system_cpu_usage = Gauge(
'system_cpu_usage_percent',
'System CPU usage percentage',
registry=REGISTRY
)
system_memory_usage = Gauge(
'system_memory_usage_bytes',
'System memory usage in bytes',
registry=REGISTRY
)
system_disk_usage = Gauge(
'system_disk_usage_bytes',
'System disk usage in bytes',
['device'],
registry=REGISTRY
)
class MetricsMiddleware:
"""Middleware para coletar métricas HTTP."""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive)
start_time = time.time()
# Wrapper para capturar response
response_info = {"status_code": 500}
async def send_wrapper(message):
if message["type"] == "http.response.start":
response_info["status_code"] = message["status"]
await send(message)
try:
await self.app(scope, receive, send_wrapper)
finally:
# Registrar métricas
duration = time.time() - start_time
method = request.method
endpoint = self._get_endpoint(request)
status_code = str(response_info["status_code"])
http_requests_total.labels(
method=method,
endpoint=endpoint,
status_code=status_code
).inc()
http_request_duration_seconds.labels(
method=method,
endpoint=endpoint
).observe(duration)
def _get_endpoint(self, request: Request) -> str:
"""Extrair endpoint da request."""
if hasattr(request, 'url') and hasattr(request.url, 'path'):
path = request.url.path
# Normalizar paths com IDs
import re
path = re.sub(r'/\d+', '/{id}', path)
return path
return "unknown"
async def update_system_metrics():
"""Atualizar métricas de sistema periodicamente."""
while True:
try:
# CPU
cpu_percent = psutil.cpu_percent(interval=1)
system_cpu_usage.set(cpu_percent)
# Memória
memory = psutil.virtual_memory()
system_memory_usage.set(memory.used)
# Disco
for disk in psutil.disk_partitions():
try:
usage = psutil.disk_usage(disk.mountpoint)
system_disk_usage.labels(device=disk.device).set(usage.used)
except (PermissionError, OSError):
continue
except Exception as e:
print(f"Error updating system metrics: {e}")
await asyncio.sleep(30) # Atualizar a cada 30 segundos
def get_metrics() -> str:
"""Obter métricas no formato Prometheus."""
return generate_latest(REGISTRY).decode('utf-8')
# Funções auxiliares para métricas customizadas
def increment_cache_hit(cache_type: str):
"""Incrementar contador de cache hits."""
cache_hits_total.labels(cache_type=cache_type).inc()
def increment_cache_miss(cache_type: str):
"""Incrementar contador de cache misses."""
cache_misses_total.labels(cache_type=cache_type).inc()
def set_active_connections(count: int):
"""Definir número de conexões ativas."""
active_connections.set(count)
def set_database_connections(count: int):
"""Definir número de conexões de banco ativas."""
database_connections.set(count)
Health Checks (core/health.py)#
from fastapi import HTTPException
from sqlalchemy.orm import Session
from db.session import SessionLocal, engine
from core.redis import redis_client
from typing import Any
import asyncio
import time
import psutil
class HealthChecker:
"""Verificador de saúde da aplicação."""
def __init__(self):
self.checks = {
"database": self._check_database,
"redis": self._check_redis,
"disk_space": self._check_disk_space,
"memory": self._check_memory,
}
async def check_health(self) -> dict[str, Any]:
"""Executar todos os health checks."""
results = {}
overall_status = "healthy"
for check_name, check_func in self.checks.items():
try:
start_time = time.time()
result = await check_func()
duration = time.time() - start_time
results[check_name] = {
"status": "healthy" if result["healthy"] else "unhealthy",
"details": result.get("details", {}),
"duration_ms": round(duration * 1000, 2)
}
if not result["healthy"]:
overall_status = "unhealthy"
except Exception as e:
results[check_name] = {
"status": "error",
"error": str(e),
"duration_ms": 0
}
overall_status = "unhealthy"
return {
"status": overall_status,
"timestamp": time.time(),
"checks": results
}
async def _check_database(self) -> dict[str, Any]:
"""Verificar conexão com banco de dados."""
try:
db = SessionLocal()
try:
# Executar query simples
result = db.execute("SELECT 1").fetchone()
# Verificar pool de conexões
pool = engine.pool
pool_status = {
"size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
"invalid": pool.invalid()
}
return {
"healthy": result is not None,
"details": {
"connection_pool": pool_status,
"query_result": result[0] if result else None
}
}
finally:
db.close()
except Exception as e:
return {
"healthy": False,
"details": {"error": str(e)}
}
async def _check_redis(self) -> dict[str, Any]:
"""Verificar conexão com Redis."""
try:
# Ping Redis
pong = await redis_client.ping()
# Obter informações do Redis
info = await redis_client.info()
return {
"healthy": pong,
"details": {
"ping": pong,
"connected_clients": info.get("connected_clients", 0),
"used_memory": info.get("used_memory", 0),
"uptime_in_seconds": info.get("uptime_in_seconds", 0)
}
}
except Exception as e:
return {
"healthy": False,
"details": {"error": str(e)}
}
async def _check_disk_space(self) -> dict[str, Any]:
"""Verificar espaço em disco."""
try:
disk_usage = psutil.disk_usage('/')
free_percent = (disk_usage.free / disk_usage.total) * 100
return {
"healthy": free_percent > 10, # Pelo menos 10% livre
"details": {
"total_gb": round(disk_usage.total / (1024**3), 2),
"used_gb": round(disk_usage.used / (1024**3), 2),
"free_gb": round(disk_usage.free / (1024**3), 2),
"free_percent": round(free_percent, 2)
}
}
except Exception as e:
return {
"healthy": False,
"details": {"error": str(e)}
}
async def _check_memory(self) -> dict[str, Any]:
"""Verificar uso de memória."""
try:
memory = psutil.virtual_memory()
available_percent = memory.available / memory.total * 100
return {
"healthy": available_percent > 10, # Pelo menos 10% disponível
"details": {
"total_gb": round(memory.total / (1024**3), 2),
"available_gb": round(memory.available / (1024**3), 2),
"used_gb": round(memory.used / (1024**3), 2),
"available_percent": round(available_percent, 2)
}
}
except Exception as e:
return {
"healthy": False,
"details": {"error": str(e)}
}
# Instância global
health_checker = HealthChecker()
Logging Estruturado (core/logging.py)#
import logging
import json
import sys
from datetime import datetime
from typing import Any
from pythonjsonlogger import jsonlogger
from core.config import settings
class CustomJsonFormatter(jsonlogger.JsonFormatter):
"""Formatter JSON customizado."""
def add_fields(self, log_record: dict[str, Any], record: logging.LogRecord, message_dict: dict[str, Any]):
super().add_fields(log_record, record, message_dict)
# Adicionar timestamp
log_record['timestamp'] = datetime.utcnow().isoformat()
# Adicionar informações da aplicação
log_record['service'] = 'fastapi-app'
log_record['version'] = getattr(settings, 'VERSION', '1.0.0')
log_record['environment'] = settings.ENVIRONMENT
# Adicionar nível do log
log_record['level'] = record.levelname
# Adicionar informações do módulo
log_record['module'] = record.module
log_record['function'] = record.funcName
log_record['line'] = record.lineno
def setup_logging():
"""Configurar logging estruturado."""
# Configurar formatter
formatter = CustomJsonFormatter(
'%(timestamp)s %(level)s %(service)s %(module)s %(funcName)s %(message)s'
)
# Configurar handler para stdout
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
# Configurar logger raiz
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, settings.LOG_LEVEL.upper()))
root_logger.addHandler(handler)
# Configurar loggers específicos
loggers = [
'uvicorn',
'uvicorn.access',
'sqlalchemy.engine',
'alembic',
'fastapi'
]
for logger_name in loggers:
logger = logging.getLogger(logger_name)
logger.setLevel(getattr(logging, settings.LOG_LEVEL.upper()))
logger.propagate = True
class StructuredLogger:
"""Logger estruturado para a aplicação."""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
def info(self, message: str, **kwargs):
"""Log info com contexto adicional."""
extra = self._prepare_extra(**kwargs)
self.logger.info(message, extra=extra)
def warning(self, message: str, **kwargs):
"""Log warning com contexto adicional."""
extra = self._prepare_extra(**kwargs)
self.logger.warning(message, extra=extra)
def error(self, message: str, **kwargs):
"""Log error com contexto adicional."""
extra = self._prepare_extra(**kwargs)
self.logger.error(message, extra=extra)
def debug(self, message: str, **kwargs):
"""Log debug com contexto adicional."""
extra = self._prepare_extra(**kwargs)
self.logger.debug(message, extra=extra)
def _prepare_extra(self, **kwargs) -> dict[str, Any]:
"""Preparar dados extras para o log."""
extra = {}
for key, value in kwargs.items():
# Serializar objetos complexos
if isinstance(value, (dict, list)):
extra[key] = json.dumps(value, default=str)
else:
extra[key] = str(value)
return extra
# Configurar logging na inicialização
setup_logging()
# Logger padrão da aplicação
logger = StructuredLogger(__name__)
📈 Dashboard de Monitoramento#
Grafana Dashboard (monitoring/grafana-dashboard.json)#
{
"dashboard": {
"id": null,
"title": "FastAPI Application Dashboard",
"tags": ["fastapi", "python"],
"timezone": "browser",
"panels": [
{
"id": 1,
"title": "HTTP Requests Rate",
"type": "graph",
"targets": [
{
"expr": "rate(http_requests_total[5m])",
"legendFormat": "{{method}} {{endpoint}}"
}
],
"yAxes": [
{
"label": "Requests/sec"
}
]
},
{
"id": 2,
"title": "Response Time",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))",
"legendFormat": "95th percentile"
},
{
"expr": "histogram_quantile(0.50, rate(http_request_duration_seconds_bucket[5m]))",
"legendFormat": "50th percentile"
}
]
},
{
"id": 3,
"title": "Error Rate",
"type": "graph",
"targets": [
{
"expr": "rate(http_requests_total{status_code=~\"4..|5..\"}[5m])",
"legendFormat": "Error rate"
}
]
},
{
"id": 4,
"title": "Active WebSocket Connections",
"type": "singlestat",
"targets": [
{
"expr": "websocket_active_connections",
"legendFormat": "Connections"
}
]
},
{
"id": 5,
"title": "Database Connections",
"type": "graph",
"targets": [
{
"expr": "database_connections_active",
"legendFormat": "Active connections"
}
]
},
{
"id": 6,
"title": "Cache Hit Rate",
"type": "graph",
"targets": [
{
"expr": "rate(cache_hits_total[5m]) / (rate(cache_hits_total[5m]) + rate(cache_misses_total[5m]))",
"legendFormat": "{{cache_type}} hit rate"
}
]
},
{
"id": 7,
"title": "System Resources",
"type": "graph",
"targets": [
{
"expr": "system_cpu_usage_percent",
"legendFormat": "CPU %"
},
{
"expr": "system_memory_usage_bytes / 1024 / 1024 / 1024",
"legendFormat": "Memory GB"
}
]
}
],
"time": {
"from": "now-1h",
"to": "now"
},
"refresh": "5s"
}
}
🚨 Alertas e Notificações#
Regras de Alerta (monitoring/alert_rules.yml)#
groups:
- name: fastapi_alerts
rules:
- alert: HighErrorRate
expr: rate(http_requests_total{status_code=~"5.."}[5m]) > 0.1
for: 2m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} errors per second"
- alert: HighResponseTime
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "High response time detected"
description: "95th percentile response time is {{ $value }} seconds"
- alert: DatabaseConnectionsHigh
expr: database_connections_active > 80
for: 5m
labels:
severity: warning
annotations:
summary: "High number of database connections"
description: "{{ $value }} active database connections"
- alert: LowDiskSpace
expr: (node_filesystem_free_bytes / node_filesystem_size_bytes) * 100 < 10
for: 5m
labels:
severity: critical
annotations:
summary: "Low disk space"
description: "Disk space is {{ $value }}% full"
- alert: HighMemoryUsage
expr: (system_memory_usage_bytes / node_memory_MemTotal_bytes) * 100 > 90
for: 5m
labels:
severity: warning
annotations:
summary: "High memory usage"
description: "Memory usage is {{ $value }}%"
- alert: ApplicationDown
expr: up{job="fastapi-app"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "FastAPI application is down"
description: "The FastAPI application has been down for more than 1 minute"
🎯 Próximos Passos#
Com deploy e monitoramento implementados, você tem:
Aplicação completa em produção
Monitoramento abrangente
CI/CD automatizado
Observabilidade total
📝 Exercícios Práticos#
Exercício 1: Multi-Cloud Deploy#
Configure deploy em:
AWS ECS/EKS
Google Cloud Run
Azure Container Instances
Exercício 2: Disaster Recovery#
Implemente:
Backup automatizado
Restore procedures
Failover automático
Exercício 3: Performance Optimization#
Adicione:
APM (Application Performance Monitoring)
Distributed tracing
Custom metrics
Anterior: Step 6: WebSockets | Início: Step 0: Fundamentos