Cache e Otimizações#
Implementar sistema de cache robusto e otimizações de performance para melhorar a velocidade e eficiência da API.
🎯 O que você vai aprender#
Cache em memória com Redis
Cache de queries de banco de dados
Cache de responses HTTP
Otimização de queries SQL
Compressão de responses
Rate limiting
Monitoramento de performance
📖 Conceitos Fundamentais de Cache e Performance#
O que é Cache?#
Cache é uma técnica de armazenamento temporário que mantém dados frequentemente acessados em uma localização de acesso rápido, reduzindo a necessidade de buscar os mesmos dados repetidamente de fontes mais lentas.
Por que usar Cache?#
1. Performance#
Reduz tempo de resposta
Diminui latência
Melhora experiência do usuário
2. Escalabilidade#
Reduz carga no banco de dados
Permite atender mais usuários
Otimiza uso de recursos
3. Disponibilidade#
Mantém dados disponíveis mesmo com falhas
Reduz dependência de serviços externos
Melhora resiliência
4. Economia#
Reduz custos de infraestrutura
Diminui uso de CPU e memória
Otimiza largura de banda
5. Experiência do Usuário#
Páginas carregam mais rápido
Menos tempo de espera
Interface mais responsiva
Tipos de Cache#
1. Cache em Memória (In-Memory)#
# Exemplo simples com dicionário Python
cache = {}
def get_user_expensive(user_id: int):
if user_id in cache:
print("Cache HIT!")
return cache[user_id]
print("Cache MISS - buscando no banco...")
# Simular operação custosa
import time
time.sleep(2)
user_data = {"id": user_id, "name": f"User {user_id}"}
cache[user_id] = user_data
return user_data
# Uso
user = get_user_expensive(1) # Demora 2s (MISS)
user = get_user_expensive(1) # Instantâneo (HIT)
2. Cache Distribuído (Redis/Memcached)#
import redis
# Conexão Redis
r = redis.Redis(host='localhost', port=6379, db=0)
def get_user_redis(user_id: int):
cache_key = f"user:{user_id}"
# Tentar buscar no cache
cached_user = r.get(cache_key)
if cached_user:
print("Cache HIT!")
return json.loads(cached_user)
print("Cache MISS - buscando no banco...")
# Buscar no banco de dados
user_data = database.get_user(user_id)
# Armazenar no cache por 1 hora
r.setex(cache_key, 3600, json.dumps(user_data))
return user_data
3. Cache de Banco de Dados#
Query result cache
Buffer pool
Index cache
4. Cache CDN (Content Delivery Network)#
Cache de arquivos estáticos
Cache geográfico
Cache de imagens
Estratégias de Cache#
1. Cache-Aside (Lazy Loading)#
def get_user_cache_aside(user_id: int):
# 1. Verificar cache primeiro
user = cache.get(f"user:{user_id}")
if user is None:
# 2. Se não estiver no cache, buscar no banco
user = database.get_user(user_id)
# 3. Armazenar no cache
cache.set(f"user:{user_id}", user, ttl=3600)
return user
def update_user_cache_aside(user_id: int, data: dict):
# 1. Atualizar no banco
user = database.update_user(user_id, data)
# 2. Invalidar cache
cache.delete(f"user:{user_id}")
return user
2. Cache-Aside com Invalidação#
def update_user_with_cache_invalidation(user_id: int, data: dict):
# 1. Atualizar no banco
user = database.update_user(user_id, data)
# 2. Invalidar caches relacionados
cache.delete(f"user:{user_id}")
cache.delete(f"user_profile:{user_id}")
cache.delete("active_users") # Lista que pode incluir este usuário
return user
3. Write-Through#
def update_user_write_through(user_id: int, data: dict):
# 1. Atualizar no banco
user = database.update_user(user_id, data)
# 2. Atualizar no cache imediatamente
cache.set(f"user:{user_id}", user, ttl=3600)
return user
4. Write-Behind (Write-Back)#
import asyncio
from collections import deque
class WriteBehindCache:
def __init__(self):
self.cache = {}
self.write_queue = deque()
self.dirty_keys = set()
def set(self, key: str, value: any):
# 1. Atualizar cache imediatamente
self.cache[key] = value
# 2. Marcar como "sujo" para escrita posterior
self.dirty_keys.add(key)
self.write_queue.append((key, value))
async def flush_to_database(self):
"""Escrever dados sujos no banco de dados."""
while self.write_queue:
key, value = self.write_queue.popleft()
if key in self.dirty_keys:
# Escrever no banco
await database.update(key, value)
self.dirty_keys.remove(key)
# Executar flush periodicamente
async def periodic_flush():
while True:
await cache.flush_to_database()
await asyncio.sleep(30) # Flush a cada 30 segundos
5. Refresh-Ahead#
import asyncio
from datetime import datetime, timedelta
class RefreshAheadCache:
def __init__(self):
self.cache = {}
self.refresh_times = {}
self.ttl = 3600 # 1 hora
self.refresh_threshold = 0.8 # Refresh quando 80% do TTL passou
async def get(self, key: str):
# Verificar se existe no cache
if key in self.cache:
# Verificar se precisa de refresh
if self._needs_refresh(key):
# Refresh em background
asyncio.create_task(self._refresh_key(key))
return self.cache[key]
# Cache miss - buscar e armazenar
value = await self._fetch_from_source(key)
self._store(key, value)
return value
def _needs_refresh(self, key: str) -> bool:
if key not in self.refresh_times:
return True
elapsed = datetime.now() - self.refresh_times[key]
return elapsed.total_seconds() > (self.ttl * self.refresh_threshold)
async def _refresh_key(self, key: str):
"""Refresh key em background."""
try:
new_value = await self._fetch_from_source(key)
self._store(key, new_value)
except Exception as e:
print(f"Erro ao fazer refresh de {key}: {e}")
def _store(self, key: str, value: any):
self.cache[key] = value
self.refresh_times[key] = datetime.now()
Invalidação de Cache#
1. TTL (Time To Live)#
# Cache com expiração automática
cache.set("user:123", user_data, ttl=3600) # Expira em 1 hora
# Verificar TTL restante
remaining_ttl = cache.ttl("user:123")
print(f"Cache expira em {remaining_ttl} segundos")
2. Invalidação Manual#
def invalidate_user_cache(user_id: int):
"""Invalidar todos os caches relacionados ao usuário."""
patterns = [
f"user:{user_id}",
f"user_profile:{user_id}",
f"user_orders:{user_id}",
f"user_preferences:{user_id}"
]
for pattern in patterns:
cache.delete(pattern)
Métricas de Cache#
1. Hit Rate (Taxa de Acerto)#
class CacheMetrics:
def __init__(self):
self.hits = 0
self.misses = 0
def record_hit(self):
self.hits += 1
def record_miss(self):
self.misses += 1
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return (self.hits / total * 100) if total > 0 else 0
@property
def miss_rate(self) -> float:
return 100 - self.hit_rate
# Uso
metrics = CacheMetrics()
def get_with_metrics(key: str):
if key in cache:
metrics.record_hit()
return cache[key]
else:
metrics.record_miss()
# Buscar dados...
return data
print(f"Hit Rate: {metrics.hit_rate:.2f}%")
2. Miss Rate (Taxa de Erro)#
Complemento do Hit Rate
Indica eficiência do cache
Meta: < 10% para caches críticos
3. Eviction Rate (Taxa de Remoção)#
Frequência de remoção de itens
Indica se cache está dimensionado corretamente
Algoritmos de Remoção (Eviction)#
1. LRU (Least Recently Used)#
from collections import OrderedDict
class LRUCache:
def __init__(self, capacity: int):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key: str):
if key in self.cache:
# Mover para o final (mais recente)
self.cache.move_to_end(key)
return self.cache[key]
return None
def set(self, key: str, value: any):
if key in self.cache:
# Atualizar e mover para o final
self.cache.move_to_end(key)
elif len(self.cache) >= self.capacity:
# Remover o menos recente (primeiro)
self.cache.popitem(last=False)
self.cache[key] = value
# Uso
lru = LRUCache(capacity=3)
lru.set("a", 1)
lru.set("b", 2)
lru.set("c", 3)
lru.get("a") # 'a' se torna o mais recente
lru.set("d", 4) # 'b' é removido (menos recente)
2. LFU (Least Frequently Used)#
Remove itens menos acessados
Mantém contador de acessos
3. FIFO (First In, First Out)#
Remove itens mais antigos
Simples de implementar
4. Random#
Remove itens aleatoriamente
Útil quando padrão de acesso é imprevisível
Cache Warming#
async def warm_cache():
"""Pré-carregar dados importantes no cache."""
# 1. Usuários mais ativos
active_users = await database.get_active_users(limit=100)
for user in active_users:
cache.set(f"user:{user.id}", user, ttl=3600)
# 2. Produtos mais vendidos
popular_products = await database.get_popular_products(limit=50)
for product in popular_products:
cache.set(f"product:{product.id}", product, ttl=7200)
# 3. Configurações do sistema
settings = await database.get_system_settings()
cache.set("system_settings", settings, ttl=86400) # 24 horas
print("Cache warming completed!")
# Executar durante inicialização da aplicação
await warm_cache()
Problemas Comuns de Cache#
1. Cache Stampede#
import asyncio
from typing import Dict
class StampedeProtection:
def __init__(self):
self.locks: Dict[str, asyncio.Lock] = {}
async def get_or_compute(self, key: str, compute_func, ttl: int = 3600):
# Verificar cache primeiro
cached_value = cache.get(key)
if cached_value is not None:
return cached_value
# Usar lock para evitar múltiplas computações
if key not in self.locks:
self.locks[key] = asyncio.Lock()
async with self.locks[key]:
# Verificar novamente após adquirir lock
cached_value = cache.get(key)
if cached_value is not None:
return cached_value
# Computar valor
value = await compute_func()
cache.set(key, value, ttl=ttl)
return value
# Uso
protection = StampedeProtection()
async def expensive_computation():
await asyncio.sleep(5) # Simular operação custosa
return "computed_value"
# Múltiplas chamadas simultâneas só executarão a função uma vez
value = await protection.get_or_compute("expensive_key", expensive_computation)
2. Hot Keys#
# Problema: Uma chave muito acessada sobrecarrega o servidor Redis
# Solução 1: Replicação local
local_cache = {}
async def get_hot_key(key: str):
# Verificar cache local primeiro
if key in local_cache:
return local_cache[key]
# Buscar no Redis
value = await redis_cache.get(key)
# Armazenar localmente por pouco tempo
local_cache[key] = value
# Limpar cache local periodicamente
asyncio.create_task(clear_local_cache_later(key, delay=60))
return value
# Solução 2: Sharding
def get_shard_key(key: str, shard_count: int = 10) -> str:
import hashlib
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
shard = hash_value % shard_count
return f"{key}:shard:{shard}"
async def get_with_sharding(key: str):
shard_key = get_shard_key(key)
return await redis_cache.get(shard_key)
3. Cache Penetration#
# Problema: Queries para dados inexistentes sempre vão ao banco
async def get_user_safe(user_id: int):
cache_key = f"user:{user_id}"
# Verificar cache
cached_value = await cache.get(cache_key)
if cached_value is not None:
# Verificar se é um "null" cacheado
if cached_value == "NULL":
return None
return cached_value
# Buscar no banco
user = await database.get_user(user_id)
if user is None:
# Cachear "null" por pouco tempo
await cache.set(cache_key, "NULL", ttl=300) # 5 minutos
return None
# Cachear resultado válido
await cache.set(cache_key, user, ttl=3600)
return user
Monitoramento de Cache#
class CacheMonitor:
def __init__(self):
self.metrics = {
"hits": 0,
"misses": 0,
"sets": 0,
"deletes": 0,
"errors": 0
}
def record_hit(self):
self.metrics["hits"] += 1
def record_miss(self):
self.metrics["misses"] += 1
def record_set(self):
self.metrics["sets"] += 1
def record_delete(self):
self.metrics["deletes"] += 1
def record_error(self):
self.metrics["errors"] += 1
def get_stats(self) -> dict:
total_requests = self.metrics["hits"] + self.metrics["misses"]
return {
"total_requests": total_requests,
"hit_rate": (self.metrics["hits"] / total_requests * 100) if total_requests > 0 else 0,
"miss_rate": (self.metrics["misses"] / total_requests * 100) if total_requests > 0 else 0,
"operations": {
"hits": self.metrics["hits"],
"misses": self.metrics["misses"],
"sets": self.metrics["sets"],
"deletes": self.metrics["deletes"],
"errors": self.metrics["errors"]
}
}
# Instância global
cache_monitor = CacheMonitor()
# Wrapper para monitoramento
async def monitored_cache_get(key: str):
try:
value = await cache.get(key)
if value is not None:
cache_monitor.record_hit()
else:
cache_monitor.record_miss()
return value
except Exception as e:
cache_monitor.record_error()
raise
🚀 Configuração Prática de Redis e Cache#
Docker Compose para Desenvolvimento#
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
container_name: fastapi_redis
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: >
redis-server
--appendonly yes
--maxmemory 256mb
--maxmemory-policy allkeys-lru
networks:
- app_network
restart: unless-stopped
app:
build: .
container_name: fastapi_app
ports:
- "8000:8000"
environment:
- REDIS_URL=redis://redis:6379/0
- DATABASE_URL=postgresql://user:password@postgres:5432/fastapi_db
depends_on:
- redis
- postgres
networks:
- app_network
restart: unless-stopped
postgres:
image: postgres:15-alpine
container_name: fastapi_postgres
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: password
POSTGRES_DB: fastapi_db
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
networks:
- app_network
restart: unless-stopped
volumes:
redis_data:
postgres_data:
networks:
app_network:
driver: bridge
Configuração de Ambiente#
# app/core/config.py
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
# Redis Configuration
REDIS_URL: str = "redis://localhost:6379/0"
REDIS_PASSWORD: Optional[str] = None
REDIS_MAX_CONNECTIONS: int = 20
REDIS_SOCKET_TIMEOUT: int = 5
REDIS_SOCKET_CONNECT_TIMEOUT: int = 5
# Cache TTL Settings (em segundos)
CACHE_TTL_USER: int = 3600 # 1 hora
CACHE_TTL_ITEM: int = 7200 # 2 horas
CACHE_TTL_SEARCH: int = 1800 # 30 minutos
CACHE_TTL_DEFAULT: int = 3600 # 1 hora
# Cache Configuration
CACHE_ENABLED: bool = True
CACHE_KEY_PREFIX: str = "fastapi"
# Database
DATABASE_URL: str = "postgresql://user:password@localhost:5432/fastapi_db"
class Config:
env_file = ".env"
settings = Settings()
Cliente Redis Assíncrono#
# app/core/cache.py
import redis.asyncio as redis
from typing import Optional, Any, Union, List
import json
import pickle
from datetime import timedelta
import logging
from app.core.config import settings
logger = logging.getLogger(__name__)
class RedisClient:
"""Cliente Redis assíncrono com funcionalidades avançadas."""
def __init__(self):
self.redis: Optional[redis.Redis] = None
self._connected = False
async def connect(self):
"""Conectar ao Redis."""
try:
self.redis = redis.from_url(
settings.REDIS_URL,
password=settings.REDIS_PASSWORD,
max_connections=settings.REDIS_MAX_CONNECTIONS,
socket_timeout=settings.REDIS_SOCKET_TIMEOUT,
socket_connect_timeout=settings.REDIS_SOCKET_CONNECT_TIMEOUT,
decode_responses=False, # Para suportar pickle
retry_on_timeout=True
)
# Testar conexão
await self.redis.ping()
self._connected = True
logger.info("✅ Conectado ao Redis")
except Exception as e:
logger.error(f"❌ Falha ao conectar ao Redis: {e}")
self._connected = False
raise
async def disconnect(self):
"""Desconectar do Redis."""
if self.redis:
await self.redis.close()
self._connected = False
logger.info("Redis desconectado")
async def is_connected(self) -> bool:
"""Verificar se está conectado ao Redis."""
if not self._connected or not self.redis:
return False
try:
await self.redis.ping()
return True
except:
self._connected = False
return False
def _build_key(self, key: str) -> str:
"""Construir chave com prefixo."""
return f"{settings.CACHE_KEY_PREFIX}:{key}"
async def get(self, key: str, deserialize: str = "json") -> Optional[Any]:
"""Obter valor do Redis."""
if not await self.is_connected():
return None
try:
full_key = self._build_key(key)
value = await self.redis.get(full_key)
if value is None:
return None
if deserialize == "json":
return json.loads(value)
elif deserialize == "pickle":
return pickle.loads(value)
else:
return value.decode('utf-8')
except Exception as e:
logger.error(f"Erro ao obter chave {key}: {e}")
return None
async def set(
self,
key: str,
value: Any,
ttl: Optional[Union[int, timedelta]] = None,
serialize: str = "json"
) -> bool:
"""Definir valor no Redis."""
if not await self.is_connected():
return False
try:
full_key = self._build_key(key)
# Serializar valor
if serialize == "json":
serialized_value = json.dumps(value, default=str)
elif serialize == "pickle":
serialized_value = pickle.dumps(value)
else:
serialized_value = str(value)
# Definir TTL
if ttl is None:
ttl = settings.CACHE_TTL_DEFAULT
return await self.redis.setex(full_key, ttl, serialized_value)
except Exception as e:
logger.error(f"Erro ao definir chave {key}: {e}")
return False
async def delete(self, key: str) -> bool:
"""Deletar chave do Redis."""
if not await self.is_connected():
return False
try:
full_key = self._build_key(key)
result = await self.redis.delete(full_key)
return result > 0
except Exception as e:
logger.error(f"Erro ao deletar chave {key}: {e}")
return False
async def delete_pattern(self, pattern: str) -> int:
"""Deletar chaves que correspondem ao padrão."""
if not await self.is_connected():
return 0
try:
full_pattern = self._build_key(pattern)
keys = await self.redis.keys(full_pattern)
if keys:
return await self.redis.delete(*keys)
return 0
except Exception as e:
logger.error(f"Erro ao deletar padrão {pattern}: {e}")
return 0
async def exists(self, key: str) -> bool:
"""Verificar se chave existe."""
if not await self.is_connected():
return False
try:
full_key = self._build_key(key)
return await self.redis.exists(full_key) > 0
except Exception as e:
logger.error(f"Erro ao verificar chave {key}: {e}")
return False
async def ttl(self, key: str) -> int:
"""Obter TTL da chave."""
if not await self.is_connected():
return -1
try:
full_key = self._build_key(key)
return await self.redis.ttl(full_key)
except Exception as e:
logger.error(f"Erro ao obter TTL da chave {key}: {e}")
return -1
async def incr(self, key: str, amount: int = 1) -> Optional[int]:
"""Incrementar valor numérico."""
if not await self.is_connected():
return None
try:
full_key = self._build_key(key)
return await self.redis.incrby(full_key, amount)
except Exception as e:
logger.error(f"Erro ao incrementar chave {key}: {e}")
return None
async def expire(self, key: str, ttl: Union[int, timedelta]) -> bool:
"""Definir expiração para chave existente."""
if not await self.is_connected():
return False
try:
full_key = self._build_key(key)
return await self.redis.expire(full_key, ttl)
except Exception as e:
logger.error(f"Erro ao definir expiração para chave {key}: {e}")
return False
# Instância global
redis_client = RedisClient()
Decorador para Cache de Resultados#
# app/core/decorators.py
import functools
import hashlib
import inspect
from typing import Any, Callable, Optional, Union
from datetime import timedelta
from app.core.cache import redis_client
from app.core.config import settings
def cache_result(
ttl: Optional[Union[int, timedelta]] = None,
key_prefix: Optional[str] = None,
serialize: str = "json"
):
"""
Decorador para cachear resultados de funções.
Args:
ttl: Tempo de vida do cache
key_prefix: Prefixo personalizado para a chave
serialize: Método de serialização ('json' ou 'pickle')
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs) -> Any:
if not settings.CACHE_ENABLED:
return await func(*args, **kwargs)
# Construir chave do cache
cache_key = _build_cache_key(func, args, kwargs, key_prefix)
# Tentar obter do cache
cached_result = await redis_client.get(cache_key, deserialize=serialize)
if cached_result is not None:
return cached_result
# Executar função e cachear resultado
result = await func(*args, **kwargs)
if result is not None:
cache_ttl = ttl or settings.CACHE_TTL_DEFAULT
await redis_client.set(cache_key, result, ttl=cache_ttl, serialize=serialize)
return result
return wrapper
return decorator
def invalidate_cache(patterns: Union[str, list]):
"""
Decorador para invalidar cache após execução da função.
Args:
patterns: Padrão(ões) de chaves para invalidar
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs) -> Any:
result = await func(*args, **kwargs)
if settings.CACHE_ENABLED:
# Invalidar padrões especificados
if isinstance(patterns, str):
await redis_client.delete_pattern(patterns)
else:
for pattern in patterns:
await redis_client.delete_pattern(pattern)
return result
return wrapper
return decorator
def _build_cache_key(func: Callable, args: tuple, kwargs: dict, prefix: Optional[str] = None) -> str:
"""Construir chave única para cache baseada na função e argumentos."""
# Nome da função
func_name = f"{func.__module__}.{func.__qualname__}"
# Processar argumentos
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
# Criar string dos argumentos
args_str = ""
for param_name, param_value in bound_args.arguments.items():
# Pular 'self' e 'cls'
if param_name in ('self', 'cls'):
continue
# Converter valor para string
if hasattr(param_value, 'id'):
# Para objetos com ID (como modelos)
value_str = f"{param_value.__class__.__name__}:{param_value.id}"
else:
value_str = str(param_value)
args_str += f"{param_name}:{value_str}:"
# Criar hash dos argumentos
args_hash = hashlib.md5(args_str.encode()).hexdigest()[:8]
# Construir chave final
if prefix:
return f"{prefix}:{func_name}:{args_hash}"
else:
return f"func:{func_name}:{args_hash}"
Cache de Modelos Pydantic#
# app/core/model_cache.py
from typing import List, Optional, Type, TypeVar, Union
from pydantic import BaseModel
from app.core.cache import redis_client
from app.core.config import settings
import logging
logger = logging.getLogger(__name__)
T = TypeVar('T', bound=BaseModel)
class ModelCache:
"""Cache especializado para modelos Pydantic."""
@staticmethod
async def get_model(
key: str,
model_class: Type[T],
ttl_refresh: bool = False
) -> Optional[T]:
"""
Obter modelo do cache.
Args:
key: Chave do cache
model_class: Classe do modelo Pydantic
ttl_refresh: Se deve renovar TTL ao acessar
"""
try:
cached_data = await redis_client.get(key)
if cached_data is None:
return None
# Renovar TTL se solicitado
if ttl_refresh:
await redis_client.expire(key, settings.CACHE_TTL_DEFAULT)
# Validar e retornar modelo
return model_class.model_validate(cached_data)
except Exception as e:
logger.error(f"Erro ao obter modelo do cache {key}: {e}")
return None
@staticmethod
async def set_model(
key: str,
model: BaseModel,
ttl: Optional[Union[int, timedelta]] = None
) -> bool:
"""
Armazenar modelo no cache.
Args:
key: Chave do cache
model: Instância do modelo Pydantic
ttl: Tempo de vida do cache
"""
try:
# Converter modelo para dict
model_data = model.model_dump()
return await redis_client.set(key, model_data, ttl=ttl)
except Exception as e:
logger.error(f"Erro ao armazenar modelo no cache {key}: {e}")
return False
@staticmethod
async def get_model_list(
key: str,
model_class: Type[T]
) -> Optional[List[T]]:
"""Obter lista de modelos do cache."""
try:
cached_data = await redis_client.get(key)
if cached_data is None:
return None
# Validar cada item da lista
models = []
for item_data in cached_data:
try:
model = model_class.model_validate(item_data)
models.append(model)
except Exception as e:
logger.warning(f"Erro ao validar item da lista: {e}")
continue
return models
except Exception as e:
logger.error(f"Erro ao obter lista do cache {key}: {e}")
return None
@staticmethod
async def set_model_list(
key: str,
models: List[BaseModel],
ttl: Optional[Union[int, timedelta]] = None
) -> bool:
"""Armazenar lista de modelos no cache."""
try:
# Converter modelos para lista de dicts
models_data = [model.model_dump() for model in models]
return await redis_client.set(key, models_data, ttl=ttl)
except Exception as e:
logger.error(f"Erro ao armazenar lista no cache {key}: {e}")
return False
@staticmethod
async def invalidate_model(model_id: Union[int, str], model_name: str):
"""Invalidar cache de um modelo específico."""
patterns = [
f"{model_name}:{model_id}",
f"{model_name}:*:{model_id}",
f"list:{model_name}:*"
]
for pattern in patterns:
await redis_client.delete_pattern(pattern)
Implementação em Services#
# app/services/user_service.py
from typing import List, Optional
from sqlalchemy.orm import Session
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate, UserResponse
from app.repositories.user_repository import UserRepository
from app.core.decorators import cache_result, invalidate_cache
from app.core.model_cache import ModelCache
from app.core.config import settings
class UserService:
def __init__(self, db: Session):
self.db = db
self.repository = UserRepository(db)
@cache_result(ttl=settings.CACHE_TTL_USER, key_prefix="user")
async def get_user_by_id(self, user_id: int) -> Optional[UserResponse]:
"""Obter usuário por ID com cache."""
user = await self.repository.get_by_id(user_id)
if user:
return UserResponse.model_validate(user)
return None
@cache_result(ttl=settings.CACHE_TTL_USER, key_prefix="user_email")
async def get_user_by_email(self, email: str) -> Optional[UserResponse]:
"""Obter usuário por email com cache."""
user = await self.repository.get_by_email(email)
if user:
return UserResponse.model_validate(user)
return None
@cache_result(ttl=settings.CACHE_TTL_SEARCH, key_prefix="user_search")
async def search_users(
self,
query: str,
limit: int = 50,
offset: int = 0
) -> List[UserResponse]:
"""Buscar usuários com cache."""
users = await self.repository.search(query, limit, offset)
return [UserResponse.model_validate(user) for user in users]
@invalidate_cache(["user:*", "user_email:*", "user_search:*"])
async def create_user(self, user_data: UserCreate) -> UserResponse:
"""Criar usuário e invalidar cache."""
user = await self.repository.create(user_data)
user_response = UserResponse.model_validate(user)
# Cachear o novo usuário
await ModelCache.set_model(
f"user:{user.id}",
user_response,
ttl=settings.CACHE_TTL_USER
)
return user_response
@invalidate_cache(["user:*", "user_email:*", "user_search:*"])
async def update_user(self, user_id: int, user_data: UserUpdate) -> Optional[UserResponse]:
"""Atualizar usuário e invalidar cache."""
user = await self.repository.update(user_id, user_data)
if user:
user_response = UserResponse.model_validate(user)
# Atualizar cache
await ModelCache.set_model(
f"user:{user.id}",
user_response,
ttl=settings.CACHE_TTL_USER
)
return user_response
return None
@invalidate_cache(["user:*", "user_email:*", "user_search:*"])
async def delete_user(self, user_id: int) -> bool:
"""Deletar usuário e invalidar cache."""
return await self.repository.delete(user_id)
Cache para Queries Complexas#
# app/services/analytics_service.py
from typing import Dict, List, Any
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from app.core.decorators import cache_result
from app.core.config import settings
class AnalyticsService:
def __init__(self, db: Session):
self.db = db
@cache_result(ttl=3600, key_prefix="analytics_daily") # Cache por 1 hora
async def get_daily_stats(self, date: datetime) -> Dict[str, Any]:
"""Estatísticas diárias com cache."""
# Query complexa que demora para executar
query = """
SELECT
COUNT(DISTINCT u.id) as active_users,
COUNT(o.id) as total_orders,
SUM(o.total_amount) as total_revenue,
AVG(o.total_amount) as avg_order_value
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
AND DATE(o.created_at) = :date
WHERE u.last_login >= :date
"""
result = await self.db.execute(query, {"date": date.date()})
row = result.first()
return {
"date": date.date(),
"active_users": row.active_users or 0,
"total_orders": row.total_orders or 0,
"total_revenue": float(row.total_revenue or 0),
"avg_order_value": float(row.avg_order_value or 0)
}
@cache_result(ttl=7200, key_prefix="analytics_popular") # Cache por 2 horas
async def get_popular_items(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Itens mais populares com cache."""
query = """
SELECT
i.id,
i.name,
i.price,
COUNT(oi.id) as order_count,
SUM(oi.quantity) as total_quantity
FROM items i
JOIN order_items oi ON i.id = oi.item_id
JOIN orders o ON oi.order_id = o.id
WHERE o.created_at >= NOW() - INTERVAL '30 days'
GROUP BY i.id, i.name, i.price
ORDER BY order_count DESC, total_quantity DESC
LIMIT :limit
"""
result = await self.db.execute(query, {"limit": limit})
return [
{
"id": row.id,
"name": row.name,
"price": float(row.price),
"order_count": row.order_count,
"total_quantity": row.total_quantity
}
for row in result
]
@cache_result(ttl=1800, key_prefix="analytics_segments") # Cache por 30 min
async def get_user_segments(self) -> Dict[str, int]:
"""Segmentação de usuários com cache."""
query = """
SELECT
CASE
WHEN total_spent >= 1000 THEN 'premium'
WHEN total_spent >= 500 THEN 'regular'
WHEN total_spent > 0 THEN 'occasional'
ELSE 'inactive'
END as segment,
COUNT(*) as user_count
FROM (
SELECT
u.id,
COALESCE(SUM(o.total_amount), 0) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
GROUP BY u.id
) user_totals
GROUP BY segment
"""
result = await self.db.execute(query)
return {row.segment: row.user_count for row in result}
Middleware de Cache HTTP#
# app/middleware/http_cache_middleware.py
import hashlib
import json
from typing import Optional
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from app.core.cache import redis_client
from app.core.config import settings
class HTTPCacheMiddleware(BaseHTTPMiddleware):
"""Middleware para cache de respostas HTTP."""
def __init__(
self,
app,
default_ttl: int = 300, # 5 minutos
cache_methods: list = None,
cache_status_codes: list = None
):
super().__init__(app)
self.default_ttl = default_ttl
self.cache_methods = cache_methods or ["GET"]
self.cache_status_codes = cache_status_codes or [200, 201]
async def dispatch(self, request: Request, call_next):
# Verificar se deve cachear
if not self._should_cache_request(request):
return await call_next(request)
# Gerar chave do cache
cache_key = self._generate_cache_key(request)
# Tentar obter resposta do cache
cached_response = await self._get_cached_response(cache_key)
if cached_response:
return cached_response
# Executar request
response = await call_next(request)
# Cachear resposta se apropriado
if self._should_cache_response(response):
await self._cache_response(cache_key, response)
return response
def _should_cache_request(self, request: Request) -> bool:
"""Verificar se deve cachear o request."""
# Só cachear métodos específicos
if request.method not in self.cache_methods:
return False
# Não cachear se tiver parâmetros de cache
if "no-cache" in request.headers.get("cache-control", ""):
return False
# Não cachear requests autenticados (opcional)
if request.headers.get("authorization"):
return False
return True
def _should_cache_response(self, response: Response) -> bool:
"""Verificar se deve cachear a resposta."""
return response.status_code in self.cache_status_codes
def _generate_cache_key(self, request: Request) -> str:
"""Gerar chave única para o request."""
# Incluir método, path e query parameters
key_parts = [
request.method,
str(request.url.path),
str(request.url.query)
]
# Incluir headers relevantes
relevant_headers = ["accept", "accept-language", "user-agent"]
for header in relevant_headers:
if header in request.headers:
key_parts.append(f"{header}:{request.headers[header]}")
# Criar hash
key_string = "|".join(key_parts)
key_hash = hashlib.md5(key_string.encode()).hexdigest()
return f"http_cache:{key_hash}"
async def _get_cached_response(self, cache_key: str) -> Optional[Response]:
"""Obter resposta do cache."""
try:
cached_data = await redis_client.get(cache_key)
if cached_data is None:
return None
# Reconstruir resposta
response_data = json.loads(cached_data)
response = JSONResponse(
content=response_data["content"],
status_code=response_data["status_code"],
headers=response_data["headers"]
)
# Adicionar header indicando cache hit
response.headers["X-Cache"] = "HIT"
return response
except Exception as e:
# Log erro mas não falhe
print(f"Erro ao obter resposta do cache: {e}")
return None
async def _cache_response(self, cache_key: str, response: Response):
"""Cachear resposta."""
try:
# Ler conteúdo da resposta
body = b""
async for chunk in response.body_iterator:
body += chunk
# Recriar iterator
response.body_iterator = self._create_body_iterator(body)
# Preparar dados para cache
response_data = {
"content": json.loads(body.decode()) if body else None,
"status_code": response.status_code,
"headers": dict(response.headers)
}
# Cachear
await redis_client.set(
cache_key,
json.dumps(response_data),
ttl=self.default_ttl
)
# Adicionar header indicando cache miss
response.headers["X-Cache"] = "MISS"
except Exception as e:
# Log erro mas não falhe
print(f"Erro ao cachear resposta: {e}")
def _create_body_iterator(self, body: bytes):
"""Criar iterator para o corpo da resposta."""
yield body
Configuração no FastAPI#
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.core.cache import redis_client
from app.middleware.http_cache_middleware import HTTPCacheMiddleware
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await redis_client.connect()
yield
# Shutdown
await redis_client.disconnect()
app = FastAPI(lifespan=lifespan)
# Adicionar middleware de cache HTTP
app.add_middleware(
HTTPCacheMiddleware,
default_ttl=300, # 5 minutos
cache_methods=["GET"],
cache_status_codes=[200, 201]
)
🚀 Otimização de Performance e Monitoramento#
Profiling de Queries de Banco de Dados#
# app/middleware/database_profiling_middleware.py
import time
import logging
from typing import Dict, List, Any
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from sqlalchemy import event
from sqlalchemy.engine import Engine
from collections import defaultdict
import re
logger = logging.getLogger(__name__)
class DatabaseProfilingMiddleware(BaseHTTPMiddleware):
"""Middleware para monitorar performance de queries do banco."""
def __init__(self, app, slow_query_threshold: float = 0.1):
super().__init__(app)
self.slow_query_threshold = slow_query_threshold
self.query_stats = defaultdict(lambda: {
'count': 0,
'total_time': 0.0,
'avg_time': 0.0,
'min_time': float('inf'),
'max_time': 0.0,
'times': []
})
# Registrar eventos do SQLAlchemy
self._setup_sqlalchemy_events()
def _setup_sqlalchemy_events(self):
"""Configurar eventos do SQLAlchemy para capturar queries."""
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
context._query_start_time = time.time()
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total_time = time.time() - context._query_start_time
# Normalizar query (remover valores específicos)
normalized_query = self._normalize_query(statement)
# Atualizar estatísticas
stats = self.query_stats[normalized_query]
stats['count'] += 1
stats['total_time'] += total_time
stats['times'].append(total_time)
stats['min_time'] = min(stats['min_time'], total_time)
stats['max_time'] = max(stats['max_time'], total_time)
stats['avg_time'] = stats['total_time'] / stats['count']
# Calcular mediana
sorted_times = sorted(stats['times'])
n = len(sorted_times)
if n % 2 == 0:
stats['median_time'] = (sorted_times[n//2-1] + sorted_times[n//2]) / 2
else:
stats['median_time'] = sorted_times[n//2]
# Log queries lentas
if total_time > self.slow_query_threshold:
logger.warning(
f"Slow query detected: {total_time:.3f}s\n"
f"Query: {statement[:200]}..."
)
def _normalize_query(self, query: str) -> str:
"""Normalizar query removendo valores específicos."""
# Remover valores literais
normalized = re.sub(r"'[^']*'", "'?'", query)
normalized = re.sub(r'\b\d+\b', '?', normalized)
# Remover espaços extras e quebras de linha
normalized = re.sub(r'\s+', ' ', normalized.strip())
return normalized
async def dispatch(self, request: Request, call_next):
start_time = time.time()
response = await call_next(request)
total_time = time.time() - start_time
# Adicionar headers de performance
response.headers["X-Response-Time"] = f"{total_time:.3f}s"
return response
def get_query_stats(self) -> Dict[str, Any]:
"""Obter estatísticas das queries."""
return dict(self.query_stats)
def reset_stats(self):
"""Resetar estatísticas."""
self.query_stats.clear()
Otimização de Queries com Índices#
# app/models/optimized_models.py
from sqlalchemy import Column, Integer, String, DateTime, Boolean, ForeignKey, Index, Text
from sqlalchemy.orm import relationship
from app.models.base import Base
class OptimizedUser(Base):
"""Modelo User otimizado com índices."""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
email = Column(String(100), unique=True, index=True, nullable=False)
full_name = Column(String(100), index=True)
is_active = Column(Boolean, default=True, index=True)
created_at = Column(DateTime, index=True)
last_login = Column(DateTime, index=True)
# Relacionamentos
orders = relationship("OptimizedOrder", back_populates="user")
# Índices compostos para queries comuns
__table_args__ = (
# Índice para buscar usuários ativos criados recentemente
Index('idx_users_active_created', 'is_active', 'created_at'),
# Índice para buscar por nome completo e email
Index('idx_users_search', 'full_name', 'email'),
# Índice para ordenação por último login
Index('idx_users_last_login', 'last_login', 'is_active'),
)
class OptimizedOrder(Base):
"""Modelo Order otimizado com índices."""
__tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), index=True)
status = Column(String(20), index=True)
total_amount = Column(Integer) # Em centavos
created_at = Column(DateTime, index=True)
updated_at = Column(DateTime, index=True)
# Relacionamentos
user = relationship("OptimizedUser", back_populates="orders")
# Índices compostos
__table_args__ = (
# Índice para buscar pedidos por usuário e data
Index('idx_orders_user_date', 'user_id', 'created_at'),
# Índice para relatórios de status
Index('idx_orders_status_date', 'status', 'created_at'),
# Índice para análise de receita
Index('idx_orders_amount_date', 'total_amount', 'created_at'),
)
Repository Otimizado#
# app/repositories/optimized_user_repository.py
from typing import List, Optional, Dict, Any
from sqlalchemy.orm import Session, selectinload, joinedload
from sqlalchemy import func, and_, or_, desc
from app.models.optimized_models import OptimizedUser, OptimizedOrder
from app.schemas.user import UserCreate, UserUpdate
class OptimizedUserRepository:
"""Repository otimizado para operações de usuário."""
def __init__(self, db: Session):
self.db = db
async def get_active_users_with_orders(
self,
limit: int = 50,
offset: int = 0
) -> List[OptimizedUser]:
"""
Buscar usuários ativos com seus pedidos.
Usa selectinload para evitar problema N+1.
"""
return (
self.db.query(OptimizedUser)
.options(selectinload(OptimizedUser.orders))
.filter(OptimizedUser.is_active == True)
.order_by(desc(OptimizedUser.last_login))
.offset(offset)
.limit(limit)
.all()
)
async def search_users_optimized(
self,
query: str,
limit: int = 50,
offset: int = 0
) -> List[OptimizedUser]:
"""
Busca otimizada de usuários usando índices.
Aproveita o índice idx_users_search.
"""
search_filter = or_(
OptimizedUser.full_name.ilike(f"%{query}%"),
OptimizedUser.email.ilike(f"%{query}%"),
OptimizedUser.username.ilike(f"%{query}%")
)
return (
self.db.query(OptimizedUser)
.filter(
and_(
OptimizedUser.is_active == True,
search_filter
)
)
.order_by(OptimizedUser.full_name)
.offset(offset)
.limit(limit)
.all()
)
async def get_user_statistics(self) -> Dict[str, Any]:
"""
Obter estatísticas agregadas de usuários.
Usa queries otimizadas com agregação.
"""
# Query única para múltiplas estatísticas
stats = (
self.db.query(
func.count(OptimizedUser.id).label('total_users'),
func.count(
func.nullif(OptimizedUser.is_active, False)
).label('active_users'),
func.count(
func.nullif(OptimizedUser.last_login, None)
).label('users_with_login'),
func.avg(
func.extract('epoch',
func.now() - OptimizedUser.created_at
) / 86400
).label('avg_days_since_creation')
)
.first()
)
return {
'total_users': stats.total_users,
'active_users': stats.active_users,
'inactive_users': stats.total_users - stats.active_users,
'users_with_login': stats.users_with_login,
'avg_days_since_creation': float(stats.avg_days_since_creation or 0)
}
class OptimizedOrderRepository:
"""Repository otimizado para operações de pedidos."""
def __init__(self, db: Session):
self.db = db
async def get_orders_with_pagination(
self,
user_id: Optional[int] = None,
status: Optional[str] = None,
limit: int = 50,
offset: int = 0
) -> List[OptimizedOrder]:
"""
Buscar pedidos com paginação otimizada.
Usa joinedload para carregar usuário em uma query.
"""
query = (
self.db.query(OptimizedOrder)
.options(joinedload(OptimizedOrder.user))
)
# Aplicar filtros usando índices
if user_id:
query = query.filter(OptimizedOrder.user_id == user_id)
if status:
query = query.filter(OptimizedOrder.status == status)
return (
query
.order_by(desc(OptimizedOrder.created_at))
.offset(offset)
.limit(limit)
.all()
)
async def get_revenue_report_by_period(
self,
start_date: datetime,
end_date: datetime,
group_by: str = "day" # day, week, month
) -> List[Dict[str, Any]]:
"""
Relatório de receita por período com agregação otimizada.
Aproveita o índice idx_orders_amount_date.
"""
# Definir formato de agrupamento
date_formats = {
"day": func.date(OptimizedOrder.created_at),
"week": func.date_trunc('week', OptimizedOrder.created_at),
"month": func.date_trunc('month', OptimizedOrder.created_at)
}
date_group = date_formats.get(group_by, date_formats["day"])
results = (
self.db.query(
date_group.label('period'),
func.count(OptimizedOrder.id).label('order_count'),
func.sum(OptimizedOrder.total_amount).label('total_revenue'),
func.avg(OptimizedOrder.total_amount).label('avg_order_value'),
func.count(func.distinct(OptimizedOrder.user_id)).label('unique_customers')
)
.filter(
and_(
OptimizedOrder.created_at >= start_date,
OptimizedOrder.created_at <= end_date
)
)
.group_by(date_group)
.order_by(date_group)
.all()
)
return [
{
'period': result.period,
'order_count': result.order_count,
'total_revenue': float(result.total_revenue or 0) / 100, # Converter de centavos
'avg_order_value': float(result.avg_order_value or 0) / 100,
'unique_customers': result.unique_customers
}
for result in results
]
Middleware de Compressão#
# app/middleware/compression_middleware.py
import gzip
import brotli
from typing import Optional
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response as StarletteResponse
class CompressionMiddleware(BaseHTTPMiddleware):
"""Middleware para compressão de respostas HTTP."""
def __init__(
self,
app,
minimum_size: int = 500, # Tamanho mínimo para compressão
compressible_types: list = None
):
super().__init__(app)
self.minimum_size = minimum_size
self.compressible_types = compressible_types or [
"application/json",
"application/javascript",
"text/css",
"text/html",
"text/plain",
"text/xml",
"application/xml"
]
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Verificar se deve comprimir
if not self._should_compress(request, response):
return response
# Obter método de compressão preferido
compression_method = self._get_compression_method(request)
if not compression_method:
return response
# Comprimir resposta
compressed_response = await self._compress_response(response, compression_method)
return compressed_response
def _should_compress(self, request: Request, response: Response) -> bool:
"""Verificar se deve comprimir a resposta."""
# Verificar se já está comprimida
if response.headers.get("content-encoding"):
return False
# Verificar tipo de conteúdo
content_type = response.headers.get("content-type", "").split(";")[0]
if content_type not in self.compressible_types:
return False
# Verificar tamanho mínimo
content_length = response.headers.get("content-length")
if content_length and int(content_length) < self.minimum_size:
return False
return True
def _get_compression_method(self, request: Request) -> Optional[str]:
"""Obter método de compressão baseado no Accept-Encoding."""
accept_encoding = request.headers.get("accept-encoding", "")
# Priorizar brotli se disponível
if "br" in accept_encoding:
return "br"
elif "gzip" in accept_encoding:
return "gzip"
return None
async def _compress_response(self, response: Response, method: str) -> Response:
"""Comprimir o conteúdo da resposta."""
# Ler conteúdo da resposta
body = b""
async for chunk in response.body_iterator:
body += chunk
# Verificar tamanho mínimo
if len(body) < self.minimum_size:
# Recriar response original
response.body_iterator = self._create_body_iterator(body)
return response
# Comprimir baseado no método
if method == "gzip":
compressed_body = gzip.compress(body)
encoding = "gzip"
elif method == "br":
compressed_body = brotli.compress(body)
encoding = "br"
else:
# Fallback para não comprimido
response.body_iterator = self._create_body_iterator(body)
return response
# Verificar se compressão foi eficiente
compression_ratio = len(compressed_body) / len(body)
if compression_ratio > 0.9: # Menos de 10% de economia
response.body_iterator = self._create_body_iterator(body)
return response
# Criar nova resposta comprimida
compressed_response = StarletteResponse(
content=compressed_body,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type
)
# Atualizar headers
compressed_response.headers["content-encoding"] = encoding
compressed_response.headers["content-length"] = str(len(compressed_body))
compressed_response.headers["vary"] = "Accept-Encoding"
return compressed_response
def _create_body_iterator(self, body: bytes):
"""Criar iterator para o corpo da resposta."""
yield body
Rate Limiting#
# app/middleware/rate_limit_middleware.py
import time
import json
from typing import Dict, Optional, Tuple
from fastapi import Request, Response, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from app.core.cache import redis_client
import hashlib
class RateLimitMiddleware(BaseHTTPMiddleware):
"""Middleware para rate limiting usando sliding window."""
def __init__(
self,
app,
default_requests: int = 100,
default_window: int = 3600, # 1 hora
per_endpoint_limits: Dict[str, Tuple[int, int]] = None
):
super().__init__(app)
self.default_requests = default_requests
self.default_window = default_window
self.per_endpoint_limits = per_endpoint_limits or {}
async def dispatch(self, request: Request, call_next):
# Obter identificador do cliente
client_id = await self._get_client_id(request)
# Obter limites para o endpoint
endpoint = f"{request.method}:{request.url.path}"
requests_limit, window_size = self.per_endpoint_limits.get(
endpoint,
(self.default_requests, self.default_window)
)
# Verificar rate limit
allowed, remaining, reset_time = await self._check_rate_limit(
client_id, endpoint, requests_limit, window_size
)
if not allowed:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded",
headers={
"X-RateLimit-Limit": str(requests_limit),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(reset_time),
"Retry-After": str(int(reset_time - time.time()))
}
)
# Processar request
response = await call_next(request)
# Adicionar headers de rate limit
response.headers["X-RateLimit-Limit"] = str(requests_limit)
response.headers["X-RateLimit-Remaining"] = str(remaining)
response.headers["X-RateLimit-Reset"] = str(reset_time)
return response
async def _get_client_id(self, request: Request) -> str:
"""Obter identificador único do cliente."""
# Tentar obter user ID do JWT
auth_header = request.headers.get("authorization")
if auth_header and auth_header.startswith("Bearer "):
try:
# Aqui você decodificaria o JWT para obter o user_id
# Por simplicidade, vamos usar um hash do token
token = auth_header.split(" ")[1]
user_hash = hashlib.md5(token.encode()).hexdigest()[:8]
return f"user:{user_hash}"
except:
pass
# Fallback para IP
client_ip = request.client.host
return f"ip:{client_ip}"
async def _check_rate_limit(
self,
client_id: str,
endpoint: str,
requests_limit: int,
window_size: int
) -> Tuple[bool, int, int]:
"""
Verificar rate limit usando sliding window.
Returns:
(allowed, remaining_requests, reset_timestamp)
"""
current_time = int(time.time())
window_start = current_time - window_size
# Chave do Redis
key = f"rate_limit:{client_id}:{endpoint}"
try:
# Obter requests no window atual
requests_data = await redis_client.get(key)
if requests_data is None:
requests = []
else:
requests = json.loads(requests_data)
# Filtrar requests dentro do window
recent_requests = [
req_time for req_time in requests
if req_time > window_start
]
# Verificar se excedeu o limite
if len(recent_requests) >= requests_limit:
# Calcular quando o rate limit será resetado
oldest_request = min(recent_requests)
reset_time = oldest_request + window_size
return False, 0, reset_time
# Adicionar request atual
recent_requests.append(current_time)
# Salvar no Redis
await redis_client.set(
key,
json.dumps(recent_requests),
ttl=window_size
)
remaining = requests_limit - len(recent_requests)
reset_time = current_time + window_size
return True, remaining, reset_time
except Exception as e:
# Em caso de erro, permitir o request
print(f"Rate limit error: {e}")
return True, requests_limit, current_time + window_size
# Decorador para rate limiting específico por endpoint
def endpoint_rate_limit(requests: int, window: int):
"""
Decorador para aplicar rate limiting específico a um endpoint.
Args:
requests: Número de requests permitidos
window: Janela de tempo em segundos
"""
def decorator(func):
func._rate_limit = (requests, window)
return func
return decorator
Sistema de Métricas e Monitoramento#
# app/monitoring/metrics.py
import time
import psutil
from typing import Dict, Any, List
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from app.core.cache import redis_client
import asyncio
@dataclass
class PerformanceMetrics:
"""Estrutura para métricas de performance."""
timestamp: float
# Métricas do sistema
cpu_percent: float
memory_percent: float
disk_usage_percent: float
network_bytes_sent: int
network_bytes_recv: int
# Métricas da aplicação
active_requests: int
requests_per_second: float
avg_response_time: float
error_rate: float
uptime_seconds: float
# Métricas do cache
cache_hit_rate: float
cache_memory_usage: int
cache_connected_clients: int
# Métricas do banco de dados
db_connections: int
db_slow_queries: int
db_avg_query_time: float
class MetricsCollector:
"""Coletor de métricas do sistema e aplicação."""
def __init__(self):
self.start_time = time.time()
self.request_count = 0
self.error_count = 0
self.response_times = []
self.last_network_stats = psutil.net_io_counters()
async def collect_metrics(self) -> PerformanceMetrics:
"""Coletar todas as métricas."""
# Métricas do sistema
system_metrics = await self._collect_system_metrics()
# Métricas da aplicação
app_metrics = await self._collect_app_metrics()
# Métricas do cache
cache_metrics = await self._collect_cache_metrics()
# Métricas do banco (simuladas - implementar conforme seu setup)
db_metrics = await self._collect_db_metrics()
return PerformanceMetrics(
timestamp=time.time(),
**system_metrics,
**app_metrics,
**cache_metrics,
**db_metrics
)
async def _collect_system_metrics(self) -> Dict[str, Any]:
"""Coletar métricas do sistema."""
# CPU
cpu_percent = psutil.cpu_percent(interval=1)
# Memória
memory = psutil.virtual_memory()
memory_percent = memory.percent
# Disco
disk = psutil.disk_usage('/')
disk_usage_percent = (disk.used / disk.total) * 100
# Rede
network_stats = psutil.net_io_counters()
network_bytes_sent = network_stats.bytes_sent - self.last_network_stats.bytes_sent
network_bytes_recv = network_stats.bytes_recv - self.last_network_stats.bytes_recv
self.last_network_stats = network_stats
return {
'cpu_percent': cpu_percent,
'memory_percent': memory_percent,
'disk_usage_percent': disk_usage_percent,
'network_bytes_sent': network_bytes_sent,
'network_bytes_recv': network_bytes_recv
}
async def _collect_app_metrics(self) -> Dict[str, Any]:
"""Coletar métricas da aplicação."""
current_time = time.time()
uptime = current_time - self.start_time
# Calcular RPS (requests por segundo)
rps = self.request_count / max(uptime, 1)
# Calcular tempo médio de resposta
avg_response_time = (
sum(self.response_times) / len(self.response_times)
if self.response_times else 0
)
# Calcular taxa de erro
error_rate = (
(self.error_count / self.request_count) * 100
if self.request_count > 0 else 0
)
return {
'active_requests': 0, # Implementar contador de requests ativos
'requests_per_second': rps,
'avg_response_time': avg_response_time,
'error_rate': error_rate,
'uptime_seconds': uptime
}
async def _collect_cache_metrics(self) -> Dict[str, Any]:
"""Coletar métricas do cache Redis."""
try:
if await redis_client.is_connected():
# Obter informações do Redis
info = await redis_client.redis.info()
# Calcular hit rate
hits = info.get('keyspace_hits', 0)
misses = info.get('keyspace_misses', 0)
total_requests = hits + misses
hit_rate = (hits / total_requests * 100) if total_requests > 0 else 0
return {
'cache_hit_rate': hit_rate,
'cache_memory_usage': info.get('used_memory', 0),
'cache_connected_clients': info.get('connected_clients', 0)
}
else:
return {
'cache_hit_rate': 0,
'cache_memory_usage': 0,
'cache_connected_clients': 0
}
except Exception as e:
print(f"Error collecting cache metrics: {e}")
return {
'cache_hit_rate': 0,
'cache_memory_usage': 0,
'cache_connected_clients': 0
}
async def _collect_db_metrics(self) -> Dict[str, Any]:
"""Coletar métricas do banco de dados."""
# Implementar baseado no seu setup de banco
# Por enquanto, retornar valores simulados
return {
'db_connections': 5,
'db_slow_queries': 0,
'db_avg_query_time': 0.05
}
def record_request(self, response_time: float, is_error: bool = False):
"""Registrar uma requisição."""
self.request_count += 1
self.response_times.append(response_time)
if is_error:
self.error_count += 1
# Manter apenas os últimos 1000 tempos de resposta
if len(self.response_times) > 1000:
self.response_times = self.response_times[-1000:]
# Instância global
metrics_collector = MetricsCollector()
Middleware de Métricas#
# app/middleware/metrics_middleware.py
import time
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from app.monitoring.metrics import metrics_collector
class MetricsMiddleware(BaseHTTPMiddleware):
"""Middleware para coletar métricas de requests."""
async def dispatch(self, request: Request, call_next):
start_time = time.time()
try:
response = await call_next(request)
# Calcular tempo de resposta
response_time = time.time() - start_time
# Verificar se é erro
is_error = response.status_code >= 400
# Registrar métricas
metrics_collector.record_request(response_time, is_error)
return response
except Exception as e:
# Registrar como erro
response_time = time.time() - start_time
metrics_collector.record_request(response_time, is_error=True)
raise
API de Monitoramento#
# app/api/monitoring.py
from fastapi import APIRouter, HTTPException
from typing import Dict, Any, List
from datetime import datetime, timedelta
from app.monitoring.metrics import metrics_collector, PerformanceMetrics
import asyncio
router = APIRouter(prefix="/monitoring", tags=["monitoring"])
@router.get("/metrics")
async def get_metrics() -> Dict[str, Any]:
"""Obter métricas atuais do sistema."""
try:
metrics = await metrics_collector.collect_metrics()
return asdict(metrics)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error collecting metrics: {e}")
@router.get("/health")
async def health_check() -> Dict[str, Any]:
"""Verificação detalhada de saúde do sistema."""
metrics = await metrics_collector.collect_metrics()
# Determinar status baseado nas métricas
status = "healthy"
issues = []
# Verificar CPU
if metrics.cpu_percent > 80:
status = "warning"
issues.append(f"High CPU usage: {metrics.cpu_percent:.1f}%")
# Verificar memória
if metrics.memory_percent > 85:
status = "critical" if metrics.memory_percent > 95 else "warning"
issues.append(f"High memory usage: {metrics.memory_percent:.1f}%")
# Verificar taxa de erro
if metrics.error_rate > 5:
status = "critical" if metrics.error_rate > 10 else "warning"
issues.append(f"High error rate: {metrics.error_rate:.1f}%")
# Verificar tempo de resposta
if metrics.avg_response_time > 1.0:
status = "warning"
issues.append(f"Slow response time: {metrics.avg_response_time:.3f}s")
return {
"status": status,
"timestamp": datetime.utcnow().isoformat(),
"uptime_seconds": metrics.uptime_seconds,
"issues": issues,
"metrics": asdict(metrics)
}
@router.get("/alerts")
async def get_alerts() -> List[Dict[str, Any]]:
"""Obter alertas baseados nas métricas."""
metrics = await metrics_collector.collect_metrics()
alerts = []
# Alertas críticos
if metrics.cpu_percent > 90:
alerts.append({
"level": "critical",
"message": f"CPU usage is critically high: {metrics.cpu_percent:.1f}%",
"metric": "cpu_percent",
"value": metrics.cpu_percent,
"threshold": 90
})
if metrics.memory_percent > 95:
alerts.append({
"level": "critical",
"message": f"Memory usage is critically high: {metrics.memory_percent:.1f}%",
"metric": "memory_percent",
"value": metrics.memory_percent,
"threshold": 95
})
if metrics.error_rate > 10:
alerts.append({
"level": "critical",
"message": f"Error rate is critically high: {metrics.error_rate:.1f}%",
"metric": "error_rate",
"value": metrics.error_rate,
"threshold": 10
})
# Alertas de warning
if 80 <= metrics.cpu_percent <= 90:
alerts.append({
"level": "warning",
"message": f"CPU usage is high: {metrics.cpu_percent:.1f}%",
"metric": "cpu_percent",
"value": metrics.cpu_percent,
"threshold": 80
})
if 85 <= metrics.memory_percent <= 95:
alerts.append({
"level": "warning",
"message": f"Memory usage is high: {metrics.memory_percent:.1f}%",
"metric": "memory_percent",
"value": metrics.memory_percent,
"threshold": 85
})
if metrics.avg_response_time > 1.0:
alerts.append({
"level": "warning",
"message": f"Average response time is slow: {metrics.avg_response_time:.3f}s",
"metric": "avg_response_time",
"value": metrics.avg_response_time,
"threshold": 1.0
})
return alerts
📊 Resumo do Step 5#
Neste step, implementamos um sistema completo de Cache e Otimização de Performance:
✅ O que foi coberto:#
Conceitos Fundamentais de Cache
Tipos de cache (in-memory, distribuído, database, CDN)
Estratégias de cache (Cache-Aside, Write-Through, Write-Behind, Refresh-Ahead)
Padrões de invalidação (TTL, manual, cache tags)
Algoritmos de eviction (LRU, LFU, FIFO, Random)
Problemas comuns (Cache Stampede, Hot Keys, Cache Penetration)
Implementação Prática com Redis
Configuração completa do Redis com Docker
Cliente Redis assíncrono com funcionalidades avançadas
Decoradores para cache automático de funções
Cache especializado para modelos Pydantic
Middleware de cache HTTP
Otimização de Performance
Profiling de queries de banco de dados
Índices compostos para otimização de queries
Compressão de respostas HTTP (gzip/brotli)
Rate limiting com sliding window
Sistema de métricas e monitoramento
Monitoramento e Observabilidade
Coleta de métricas do sistema, aplicação e cache
Dashboard de saúde da aplicação
Sistema de alertas baseado em thresholds
APIs para monitoramento em tempo real
🎯 Próximos passos:#
No Step 6, vamos implementar WebSockets e Comunicação em Tempo Real para criar funcionalidades interativas e colaborativas.
async def incr(self, key: str, amount: int = 1) -> int:
"""Increment a key's value."""
try:
return await self.redis.incr(key, amount)
except Exception as e:
print(f"Redis INCR error: {e}")
return 0
async def get_pattern(self, pattern: str) -> list:
"""Get all keys matching a pattern."""
try:
keys = await self.redis.keys(pattern)
return [key.decode('utf-8') if isinstance(key, bytes) else key for key in keys]
except Exception as e:
print(f"Redis KEYS error: {e}")
return []
async def delete_pattern(self, pattern: str) -> int:
"""Delete all keys matching a pattern."""
try:
keys = await self.get_pattern(pattern)
if keys:
return await self.delete(*keys)
return 0
except Exception as e:
print(f"Redis DELETE_PATTERN error: {e}")
return 0
Instância global#
redis_client = RedisClient()
Dependency para FastAPI#
async def get_redis() -> RedisClient: return redis_client
### Configuração no Settings (core/config.py)
```python
class Settings(BaseSettings):
# ... outras configurações ...
# Redis
REDIS_URL: str = "redis://localhost:6379/0"
REDIS_CACHE_TTL: int = 3600 # 1 hora
REDIS_SESSION_TTL: int = 86400 # 24 horas
# Cache
ENABLE_CACHE: bool = True
CACHE_DEFAULT_TTL: int = 300 # 5 minutos
CACHE_MAX_SIZE: int = 1000
🎯 Sistema de Cache#
Cache Manager (core/cache.py)#
from typing import Optional, Any, Callable, Union
from datetime import timedelta
import hashlib
import inspect
from functools import wraps
from core.redis import redis_client
from core.config import settings
class CacheManager:
def __init__(self, redis_client):
self.redis = redis_client
self.default_ttl = settings.CACHE_DEFAULT_TTL
def _generate_cache_key(self, prefix: str, *args, **kwargs) -> str:
"""Generate a unique cache key."""
# Criar string única baseada nos argumentos
key_data = f"{prefix}:{args}:{sorted(kwargs.items())}"
# Hash para evitar keys muito longas
key_hash = hashlib.md5(key_data.encode()).hexdigest()
return f"cache:{prefix}:{key_hash}"
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache."""
if not settings.ENABLE_CACHE:
return None
return await self.redis.get(key)
async def set(
self,
key: str,
value: Any,
ttl: Optional[int] = None
) -> bool:
"""Set value in cache."""
if not settings.ENABLE_CACHE:
return False
expire_time = ttl or self.default_ttl
return await self.redis.set(key, value, expire=expire_time)
async def delete(self, key: str) -> bool:
"""Delete value from cache."""
return bool(await self.redis.delete(key))
async def clear_pattern(self, pattern: str) -> int:
"""Clear all cache keys matching pattern."""
return await self.redis.delete_pattern(f"cache:{pattern}*")
def cache_result(
self,
prefix: str,
ttl: Optional[int] = None,
key_builder: Optional[Callable] = None
):
"""Decorator to cache function results."""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
# Gerar chave do cache
if key_builder:
cache_key = key_builder(*args, **kwargs)
else:
cache_key = self._generate_cache_key(prefix, *args, **kwargs)
# Tentar obter do cache
cached_result = await self.get(cache_key)
if cached_result is not None:
return cached_result
# Executar função e cachear resultado
result = await func(*args, **kwargs)
await self.set(cache_key, result, ttl)
return result
@wraps(func)
def sync_wrapper(*args, **kwargs):
# Para funções síncronas, usar versão assíncrona internamente
import asyncio
return asyncio.run(async_wrapper(*args, **kwargs))
# Retornar wrapper apropriado baseado no tipo da função
if inspect.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator
# Instância global
cache_manager = CacheManager(redis_client)
Cache de Repository (db/repository/cached_repository.py)#
from typing import Optional, List, Any, Dict
from sqlalchemy.orm import Session
from core.cache import cache_manager
from db.repository.base import BaseRepository
class CachedRepository(BaseRepository):
"""Repository base with caching capabilities."""
def __init__(self, db: Session, model, cache_prefix: str):
super().__init__(db, model)
self.cache_prefix = cache_prefix
self.cache_ttl = 300 # 5 minutos
async def get_cached(self, id: Any) -> Optional[Any]:
"""Get object by ID with caching."""
cache_key = f"{self.cache_prefix}:get:{id}"
# Tentar cache primeiro
cached_obj = await cache_manager.get(cache_key)
if cached_obj:
return cached_obj
# Buscar no banco
obj = self.get(id)
if obj:
# Cachear resultado
await cache_manager.set(cache_key, obj, self.cache_ttl)
return obj
async def get_multi_cached(
self,
skip: int = 0,
limit: int = 100,
filters: Optional[Dict] = None
) -> tuple[List[Any], int]:
"""Get multiple objects with caching."""
# Gerar chave baseada nos parâmetros
filter_str = str(sorted(filters.items())) if filters else "none"
cache_key = f"{self.cache_prefix}:list:{skip}:{limit}:{filter_str}"
# Tentar cache
cached_result = await cache_manager.get(cache_key)
if cached_result:
return cached_result
# Buscar no banco
objects, total = self.get_multi(skip=skip, limit=limit, filters=filters)
# Cachear resultado
result = (objects, total)
await cache_manager.set(cache_key, result, self.cache_ttl)
return result
async def invalidate_cache(self, id: Optional[Any] = None):
"""Invalidate cache for this repository."""
if id:
# Invalidar cache específico
cache_key = f"{self.cache_prefix}:get:{id}"
await cache_manager.delete(cache_key)
# Invalidar cache de listagens
await cache_manager.clear_pattern(f"{self.cache_prefix}:list")
def create(self, obj_in) -> Any:
"""Create object and invalidate cache."""
obj = super().create(obj_in)
# Invalidar cache de listagens
asyncio.create_task(self.invalidate_cache())
return obj
def update(self, db_obj, obj_in) -> Any:
"""Update object and invalidate cache."""
obj = super().update(db_obj, obj_in)
# Invalidar cache específico e listagens
asyncio.create_task(self.invalidate_cache(obj.id))
return obj
def remove(self, id: Any) -> Any:
"""Remove object and invalidate cache."""
obj = super().remove(id)
# Invalidar cache específico e listagens
asyncio.create_task(self.invalidate_cache(id))
return obj
Item Repository com Cache (db/repository/item_repository.py)#
from typing import Optional, List, Tuple
from sqlalchemy.orm import Session
from sqlalchemy import or_, and_
from db.repository.cached_repository import CachedRepository
from db.models.item import Item
from schemas.item import ItemCreate, ItemUpdate
from core.cache import cache_manager
class ItemRepository(CachedRepository):
def __init__(self, db: Session):
super().__init__(db, Item, "item")
@cache_manager.cache_result("item_by_name", ttl=600)
async def get_by_name_cached(self, name: str) -> Optional[Item]:
"""Get item by name with caching."""
return self.db.query(Item).filter(Item.name == name).first()
@cache_manager.cache_result("items_by_category", ttl=300)
async def get_by_category_cached(self, category_id: int) -> List[Item]:
"""Get items by category with caching."""
return self.db.query(Item).filter(
Item.category_id == category_id,
Item.is_available == True
).all()
@cache_manager.cache_result("items_search", ttl=180)
async def search_items_cached(
self,
search_term: str,
skip: int = 0,
limit: int = 100
) -> Tuple[List[Item], int]:
"""Search items with caching."""
query = self.db.query(Item).filter(
or_(
Item.name.ilike(f"%{search_term}%"),
Item.description.ilike(f"%{search_term}%")
),
Item.is_available == True
)
total = query.count()
items = query.offset(skip).limit(limit).all()
return items, total
@cache_manager.cache_result("popular_items", ttl=1800) # 30 minutos
async def get_popular_items_cached(self, limit: int = 10) -> List[Item]:
"""Get popular items with caching."""
# Assumindo que temos um campo de popularidade ou vendas
return self.db.query(Item).filter(
Item.is_available == True
).order_by(Item.view_count.desc()).limit(limit).all()
async def increment_view_count(self, item_id: int):
"""Increment item view count and update cache."""
item = self.get(item_id)
if item:
item.view_count = (item.view_count or 0) + 1
self.db.commit()
# Invalidar caches relacionados
await self.invalidate_cache(item_id)
await cache_manager.clear_pattern("popular_items")
🌐 Cache de Response HTTP#
Response Cache Middleware (middleware/cache_middleware.py)#
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response as StarletteResponse
import hashlib
import json
from core.cache import cache_manager
from core.config import settings
class ResponseCacheMiddleware(BaseHTTPMiddleware):
def __init__(self, app, cache_ttl: int = 300):
super().__init__(app)
self.cache_ttl = cache_ttl
self.cacheable_methods = {"GET"}
self.cacheable_status_codes = {200, 201}
def _should_cache(self, request: Request, response: Response) -> bool:
"""Determine if response should be cached."""
# Não cachear se cache está desabilitado
if not settings.ENABLE_CACHE:
return False
# Só cachear métodos específicos
if request.method not in self.cacheable_methods:
return False
# Só cachear status codes de sucesso
if response.status_code not in self.cacheable_status_codes:
return False
# Não cachear se há autenticação
if "authorization" in request.headers:
return False
# Não cachear endpoints específicos
excluded_paths = ["/api/v1/auth/", "/api/v1/admin/"]
if any(request.url.path.startswith(path) for path in excluded_paths):
return False
return True
def _generate_cache_key(self, request: Request) -> str:
"""Generate cache key for request."""
# Incluir método, path e query parameters
key_data = {
"method": request.method,
"path": request.url.path,
"query": str(request.query_params),
"headers": {
k: v for k, v in request.headers.items()
if k.lower() in ["accept", "accept-language", "content-type"]
}
}
key_string = json.dumps(key_data, sort_keys=True)
key_hash = hashlib.md5(key_string.encode()).hexdigest()
return f"response:{key_hash}"
async def dispatch(self, request: Request, call_next):
# Gerar chave do cache
cache_key = self._generate_cache_key(request)
# Tentar obter response do cache
if request.method in self.cacheable_methods:
cached_response = await cache_manager.get(cache_key)
if cached_response:
return StarletteResponse(
content=cached_response["content"],
status_code=cached_response["status_code"],
headers=cached_response["headers"],
media_type=cached_response.get("media_type")
)
# Processar request
response = await call_next(request)
# Cachear response se apropriado
if self._should_cache(request, response):
# Ler conteúdo da response
response_body = b""
async for chunk in response.body_iterator:
response_body += chunk
# Preparar dados para cache
cache_data = {
"content": response_body.decode(),
"status_code": response.status_code,
"headers": dict(response.headers),
"media_type": response.media_type
}
# Cachear response
await cache_manager.set(cache_key, cache_data, self.cache_ttl)
# Recriar response com o conteúdo lido
response = StarletteResponse(
content=response_body,
status_code=response.status_code,
headers=response.headers,
media_type=response.media_type
)
return response
Cache Headers Middleware (middleware/cache_headers.py)#
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from datetime import datetime, timedelta
class CacheHeadersMiddleware(BaseHTTPMiddleware):
def __init__(self, app):
super().__init__(app)
self.cache_rules = {
"/api/v1/items": {"max_age": 300, "public": True}, # 5 minutos
"/api/v1/categories": {"max_age": 1800, "public": True}, # 30 minutos
"/static": {"max_age": 86400, "public": True}, # 24 horas
}
def _get_cache_rule(self, path: str) -> dict:
"""Get cache rule for path."""
for pattern, rule in self.cache_rules.items():
if path.startswith(pattern):
return rule
return {}
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Aplicar headers de cache baseado no path
cache_rule = self._get_cache_rule(request.url.path)
if cache_rule and response.status_code == 200:
max_age = cache_rule.get("max_age", 0)
is_public = cache_rule.get("public", False)
if max_age > 0:
# Cache-Control header
cache_control = f"max-age={max_age}"
if is_public:
cache_control += ", public"
else:
cache_control += ", private"
response.headers["Cache-Control"] = cache_control
# Expires header
expires = datetime.utcnow() + timedelta(seconds=max_age)
response.headers["Expires"] = expires.strftime("%a, %d %b %Y %H:%M:%S GMT")
# ETag header (simples baseado no conteúdo)
if hasattr(response, 'body'):
import hashlib
etag = hashlib.md5(str(response.body).encode()).hexdigest()
response.headers["ETag"] = f'"{etag}"'
return response
⚡ Otimizações de Performance#
Query Optimization (db/optimizations.py)#
from sqlalchemy import event, text
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session, joinedload, selectinload
from typing import List, Optional
import time
import logging
# Configurar logging para queries lentas
logging.basicConfig()
logger = logging.getLogger("sqlalchemy.engine")
logger.setLevel(logging.INFO)
# Event listener para monitorar queries lentas
@event.listens_for(Engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
context._query_start_time = time.time()
@event.listens_for(Engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total = time.time() - context._query_start_time
if total > 0.1: # Log queries que demoram mais de 100ms
logger.warning(f"Slow query: {total:.4f}s - {statement[:100]}...")
class QueryOptimizer:
"""Utility class for query optimizations."""
@staticmethod
def optimize_item_queries(db: Session):
"""Apply optimizations for item queries."""
# Eager loading para evitar N+1 queries
return db.query(Item).options(
joinedload(Item.category),
selectinload(Item.reviews)
)
@staticmethod
def optimize_user_queries(db: Session):
"""Apply optimizations for user queries."""
return db.query(User).options(
joinedload(User.role),
selectinload(User.permissions)
)
@staticmethod
def create_indexes(db: Session):
"""Create database indexes for better performance."""
indexes = [
# Items
"CREATE INDEX IF NOT EXISTS idx_items_category_id ON items(category_id)",
"CREATE INDEX IF NOT EXISTS idx_items_is_available ON items(is_available)",
"CREATE INDEX IF NOT EXISTS idx_items_name_search ON items USING gin(to_tsvector('portuguese', name))",
"CREATE INDEX IF NOT EXISTS idx_items_price ON items(price)",
# Users
"CREATE INDEX IF NOT EXISTS idx_users_username ON users(username)",
"CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)",
"CREATE INDEX IF NOT EXISTS idx_users_is_active ON users(is_active)",
# Categories
"CREATE INDEX IF NOT EXISTS idx_categories_is_active ON categories(is_active)",
# Composite indexes
"CREATE INDEX IF NOT EXISTS idx_items_category_available ON items(category_id, is_available)",
]
for index_sql in indexes:
try:
db.execute(text(index_sql))
db.commit()
print(f"✅ Created index: {index_sql.split()[-1]}")
except Exception as e:
print(f"❌ Failed to create index: {e}")
db.rollback()
# Optimized repository methods
class OptimizedItemRepository(ItemRepository):
def get_with_relations(self, id: int) -> Optional[Item]:
"""Get item with all relations loaded."""
return self.db.query(Item).options(
joinedload(Item.category),
selectinload(Item.reviews).joinedload(Review.user)
).filter(Item.id == id).first()
def get_popular_with_stats(self, limit: int = 10) -> List[Item]:
"""Get popular items with aggregated stats."""
return self.db.query(Item).options(
joinedload(Item.category)
).filter(
Item.is_available == True
).order_by(
Item.view_count.desc(),
Item.created_at.desc()
).limit(limit).all()
def search_optimized(
self,
search_term: str,
category_id: Optional[int] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
skip: int = 0,
limit: int = 100
) -> tuple[List[Item], int]:
"""Optimized search with filters."""
query = self.db.query(Item).options(
joinedload(Item.category)
).filter(Item.is_available == True)
# Text search usando PostgreSQL full-text search
if search_term:
query = query.filter(
text("to_tsvector('portuguese', name || ' ' || description) @@ plainto_tsquery('portuguese', :search)")
).params(search=search_term)
# Filtros adicionais
if category_id:
query = query.filter(Item.category_id == category_id)
if min_price:
query = query.filter(Item.price >= min_price)
if max_price:
query = query.filter(Item.price <= max_price)
# Count otimizado
total = query.count()
# Resultados paginados
items = query.offset(skip).limit(limit).all()
return items, total
Connection Pooling (db/session.py)#
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
from core.config import settings
# Engine otimizado com connection pooling
engine = create_engine(
settings.DATABASE_URL,
poolclass=QueuePool,
pool_size=20, # Número de conexões permanentes
max_overflow=30, # Conexões extras quando necessário
pool_pre_ping=True, # Verificar conexões antes de usar
pool_recycle=3600, # Reciclar conexões a cada hora
echo=settings.DEBUG, # Log SQL queries em desenvolvimento
echo_pool=settings.DEBUG, # Log pool events
)
SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine
)
# Dependency otimizada
async def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
🛡️ Rate Limiting#
Rate Limiter (middleware/rate_limit.py)#
from fastapi import Request, HTTPException, status
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
from core.redis import redis_client
from core.config import settings
import time
from typing import Dict, Optional
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(
self,
app,
calls: int = 100,
period: int = 60,
per_ip: bool = True
):
super().__init__(app)
self.calls = calls
self.period = period
self.per_ip = per_ip
# Rate limits específicos por endpoint
self.endpoint_limits = {
"/api/v1/auth/login": {"calls": 5, "period": 300}, # 5 tentativas por 5 min
"/api/v1/auth/register": {"calls": 3, "period": 3600}, # 3 registros por hora
"/api/v1/items/": {"calls": 1000, "period": 60}, # 1000 requests por minuto
}
def _get_client_id(self, request: Request) -> str:
"""Get client identifier for rate limiting."""
if self.per_ip:
# Usar IP real considerando proxies
forwarded_for = request.headers.get("X-Forwarded-For")
if forwarded_for:
return forwarded_for.split(",")[0].strip()
return request.client.host
else:
# Usar user ID se autenticado
user_id = getattr(request.state, "user_id", None)
return str(user_id) if user_id else request.client.host
def _get_rate_limit(self, path: str) -> Dict[str, int]:
"""Get rate limit for specific endpoint."""
for endpoint, limits in self.endpoint_limits.items():
if path.startswith(endpoint):
return limits
return {"calls": self.calls, "period": self.period}
async def _is_rate_limited(
self,
client_id: str,
endpoint: str,
calls: int,
period: int
) -> tuple[bool, Dict[str, int]]:
"""Check if client is rate limited."""
key = f"rate_limit:{endpoint}:{client_id}"
# Usar sliding window com Redis
current_time = int(time.time())
window_start = current_time - period
# Remover entradas antigas
await redis_client.redis.zremrangebyscore(key, 0, window_start)
# Contar requests na janela atual
current_requests = await redis_client.redis.zcard(key)
# Informações para headers
info = {
"limit": calls,
"remaining": max(0, calls - current_requests),
"reset": current_time + period,
"retry_after": period if current_requests >= calls else 0
}
if current_requests >= calls:
return True, info
# Adicionar request atual
await redis_client.redis.zadd(key, {str(current_time): current_time})
await redis_client.redis.expire(key, period)
# Atualizar remaining
info["remaining"] = max(0, calls - current_requests - 1)
return False, info
async def dispatch(self, request: Request, call_next):
# Pular rate limiting para alguns paths
skip_paths = ["/health", "/metrics", "/docs", "/openapi.json"]
if any(request.url.path.startswith(path) for path in skip_paths):
return await call_next(request)
client_id = self._get_client_id(request)
endpoint = request.url.path
rate_limit = self._get_rate_limit(endpoint)
# Verificar rate limit
is_limited, info = await self._is_rate_limited(
client_id,
endpoint,
rate_limit["calls"],
rate_limit["period"]
)
if is_limited:
return JSONResponse(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
content={
"error": "Rate limit exceeded",
"message": f"Too many requests. Try again in {info['retry_after']} seconds.",
"retry_after": info["retry_after"]
},
headers={
"X-RateLimit-Limit": str(info["limit"]),
"X-RateLimit-Remaining": str(info["remaining"]),
"X-RateLimit-Reset": str(info["reset"]),
"Retry-After": str(info["retry_after"])
}
)
# Processar request
response = await call_next(request)
# Adicionar headers de rate limit
response.headers["X-RateLimit-Limit"] = str(info["limit"])
response.headers["X-RateLimit-Remaining"] = str(info["remaining"])
response.headers["X-RateLimit-Reset"] = str(info["reset"])
return response
# Decorator para rate limiting específico
def rate_limit(calls: int, period: int):
"""Decorator for endpoint-specific rate limiting."""
def decorator(func):
func._rate_limit = {"calls": calls, "period": period}
return func
return decorator
📊 Monitoramento de Performance#
Performance Monitor (core/monitoring.py)#
import time
import psutil
from typing import Dict, Any
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
from core.redis import redis_client
import json
from datetime import datetime, timedelta
class PerformanceMonitor:
def __init__(self):
self.metrics_key = "performance_metrics"
async def record_request(
self,
method: str,
path: str,
status_code: int,
duration: float,
memory_usage: float,
cpu_usage: float
):
"""Record request metrics."""
timestamp = int(time.time())
metric = {
"timestamp": timestamp,
"method": method,
"path": path,
"status_code": status_code,
"duration": duration,
"memory_usage": memory_usage,
"cpu_usage": cpu_usage
}
# Armazenar métrica no Redis (com TTL de 24 horas)
key = f"{self.metrics_key}:{timestamp}"
await redis_client.set(key, metric, expire=86400)
async def get_metrics_summary(self, hours: int = 1) -> Dict[str, Any]:
"""Get performance metrics summary."""
end_time = int(time.time())
start_time = end_time - (hours * 3600)
# Buscar métricas no período
pattern = f"{self.metrics_key}:*"
keys = await redis_client.get_pattern(pattern)
metrics = []
for key in keys:
timestamp = int(key.split(":")[-1])
if start_time <= timestamp <= end_time:
metric = await redis_client.get(key)
if metric:
metrics.append(metric)
if not metrics:
return {"message": "No metrics found"}
# Calcular estatísticas
total_requests = len(metrics)
avg_duration = sum(m["duration"] for m in metrics) / total_requests
max_duration = max(m["duration"] for m in metrics)
min_duration = min(m["duration"] for m in metrics)
# Agrupar por status code
status_codes = {}
for metric in metrics:
code = metric["status_code"]
status_codes[code] = status_codes.get(code, 0) + 1
# Endpoints mais lentos
endpoint_stats = {}
for metric in metrics:
path = metric["path"]
if path not in endpoint_stats:
endpoint_stats[path] = {"count": 0, "total_duration": 0}
endpoint_stats[path]["count"] += 1
endpoint_stats[path]["total_duration"] += metric["duration"]
slowest_endpoints = sorted(
[
{
"path": path,
"avg_duration": stats["total_duration"] / stats["count"],
"count": stats["count"]
}
for path, stats in endpoint_stats.items()
],
key=lambda x: x["avg_duration"],
reverse=True
)[:10]
return {
"period_hours": hours,
"total_requests": total_requests,
"avg_response_time": round(avg_duration, 4),
"max_response_time": round(max_duration, 4),
"min_response_time": round(min_duration, 4),
"status_codes": status_codes,
"slowest_endpoints": slowest_endpoints,
"requests_per_hour": round(total_requests / hours, 2)
}
# Instância global
performance_monitor = PerformanceMonitor()
class PerformanceMiddleware(BaseHTTPMiddleware):
def __init__(self, app):
super().__init__(app)
self.process = psutil.Process()
async def dispatch(self, request: Request, call_next):
# Métricas iniciais
start_time = time.time()
start_memory = self.process.memory_info().rss / 1024 / 1024 # MB
start_cpu = self.process.cpu_percent()
# Processar request
response = await call_next(request)
# Métricas finais
end_time = time.time()
end_memory = self.process.memory_info().rss / 1024 / 1024 # MB
end_cpu = self.process.cpu_percent()
# Calcular métricas
duration = end_time - start_time
memory_usage = end_memory - start_memory
cpu_usage = end_cpu - start_cpu
# Registrar métricas
await performance_monitor.record_request(
method=request.method,
path=request.url.path,
status_code=response.status_code,
duration=duration,
memory_usage=memory_usage,
cpu_usage=cpu_usage
)
# Adicionar headers de performance
response.headers["X-Response-Time"] = f"{duration:.4f}s"
return response
Health Check Endpoint (api/v1/health.py)#
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from core.redis import redis_client
from db.session import get_db
from core.monitoring import performance_monitor
import psutil
import time
router = APIRouter()
@router.get("/health")
async def health_check(db: Session = Depends(get_db)):
"""Comprehensive health check."""
health_status = {
"status": "healthy",
"timestamp": int(time.time()),
"checks": {}
}
# Database check
try:
db.execute("SELECT 1")
health_status["checks"]["database"] = {"status": "healthy"}
except Exception as e:
health_status["checks"]["database"] = {
"status": "unhealthy",
"error": str(e)
}
health_status["status"] = "unhealthy"
# Redis check
try:
await redis_client.redis.ping()
health_status["checks"]["redis"] = {"status": "healthy"}
except Exception as e:
health_status["checks"]["redis"] = {
"status": "unhealthy",
"error": str(e)
}
health_status["status"] = "unhealthy"
# System metrics
health_status["system"] = {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"disk_percent": psutil.disk_usage('/').percent
}
return health_status
@router.get("/metrics")
async def get_metrics(hours: int = 1):
"""Get performance metrics."""
return await performance_monitor.get_metrics_summary(hours)
🚀 Configuração Final#
Main Application (main.py)#
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from contextlib import asynccontextmanager
from core.config import settings
from core.redis import redis_client
from middleware.cache_middleware import ResponseCacheMiddleware
from middleware.cache_headers import CacheHeadersMiddleware
from middleware.rate_limit import RateLimitMiddleware
from middleware.performance import PerformanceMiddleware
from api.v1.api import api_router
from api.v1.health import router as health_router
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await redis_client.connect()
yield
# Shutdown
await redis_client.disconnect()
app = FastAPI(
title=settings.PROJECT_NAME,
version=settings.VERSION,
lifespan=lifespan
)
# Middlewares (ordem importa!)
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(PerformanceMiddleware)
app.add_middleware(RateLimitMiddleware, calls=1000, period=60)
app.add_middleware(ResponseCacheMiddleware, cache_ttl=300)
app.add_middleware(CacheHeadersMiddleware)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_HOSTS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Routes
app.include_router(api_router, prefix="/api/v1")
app.include_router(health_router, prefix="/api/v1", tags=["health"])
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=settings.DEBUG,
workers=4 if not settings.DEBUG else 1
)
🎯 Próximos Passos#
Com cache e otimizações implementados, você pode:
Step 6: Implementar WebSockets e comunicação em tempo real
Step 7: Deploy, monitoramento e observabilidade
📝 Exercícios Práticos#
Exercício 1: Cache Personalizado#
Implemente cache personalizado para:
Resultados de busca complexa
Dados de usuário frequentemente acessados
Estatísticas do sistema
Exercício 2: Otimização de Queries#
Otimize queries para:
Relatórios complexos
Agregações pesadas
Buscas full-text
Exercício 3: Monitoramento Avançado#
Implemente:
Alertas para performance degradada
Métricas customizadas
Dashboard de monitoramento
Anterior: Step 4: Testes | Próximo: Step 6: WebSockets