Autenticação e Autorização#
Implementar um sistema completo de autenticação JWT com autorização baseada em roles, incluindo middleware de segurança e proteção de endpoints.
🎯 O que você vai aprender#
Implementar autenticação JWT
Criar sistema de roles e permissões
Configurar middleware de segurança
Proteger endpoints com dependências
Hash seguro de senhas
Refresh tokens e logout
Arquitetura Avançada: Clean Architecture, DDD, Repository Pattern e Use Cases
🏗️ Arquitetura Avançada e Design Patterns#
Clean Architecture com FastAPI#
A Clean Architecture organiza o código em camadas concêntricas, onde as dependências apontam sempre para dentro (das camadas externas para as internas).
graph TB subgraph "🌐 Presentation Layer" API[FastAPI Controllers] SCHEMAS[Pydantic Schemas] DEPS[Dependencies] end subgraph "📋 Application Layer" UC[Use Cases] INTERFACES[Interfaces] end subgraph "🎯 Domain Layer" ENTITIES[Entities] VO[Value Objects] REPO_INT[Repository Interfaces] EVENTS[Domain Events] end subgraph "🔧 Infrastructure Layer" DB[Database] REPO_IMPL[Repository Implementation] EXTERNAL[External Services] MESSAGING[Messaging] end API --> UC UC --> ENTITIES UC --> REPO_INT REPO_IMPL --> REPO_INT REPO_IMPL --> DB
Estrutura de Diretórios#
src/
├── domain/ # 🎯 Camada de Domínio
│ ├── entities/ # Entidades de negócio
│ ├── value_objects/ # Objetos de valor
│ ├── repositories/ # Interfaces dos repositórios
│ └── events/ # Eventos de domínio
├── application/ # 📋 Camada de Aplicação
│ ├── use_cases/ # Casos de uso
│ ├── interfaces/ # Interfaces de serviços
│ └── dto/ # Data Transfer Objects
├── infrastructure/ # 🔧 Camada de Infraestrutura
│ ├── database/ # Configuração do banco
│ ├── external/ # Serviços externos
│ ├── repositories/ # Implementação dos repositórios
│ └── messaging/ # Sistema de mensageria
└── presentation/ # 🌐 Camada de Apresentação
├── api/ # Controllers FastAPI
├── schemas/ # Schemas de entrada/saída
└── dependencies/ # Injeção de dependências
🎯 Entidades de Domínio#
Entidade User#
# src/domain/entities/user.py
from dataclasses import dataclass
from typing import Optional
from datetime import datetime
from enum import Enum
class UserStatus(Enum):
ACTIVE = "active"
INACTIVE = "inactive"
SUSPENDED = "suspended"
PENDING_VERIFICATION = "pending_verification"
@dataclass
class User:
"""Entidade User - representa um usuário no domínio"""
id: int | None
email: str
username: str
full_name: str
status: UserStatus
created_at: datetime | None = None
updated_at: datetime | None = None
last_login: datetime | None = None
def activate(self):
"""Regra de negócio: ativar usuário"""
if self.status == UserStatus.SUSPENDED:
raise ValueError("Cannot activate suspended user")
self.status = UserStatus.ACTIVE
self.updated_at = datetime.now()
def suspend(self, reason: str = None):
"""Regra de negócio: suspender usuário"""
if not self.is_active():
raise ValueError("Can only suspend active users")
self.status = UserStatus.SUSPENDED
self.updated_at = datetime.now()
# Emitir evento de domínio
from domain.events.user_events import UserSuspendedEvent
return UserSuspendedEvent(user_id=self.id, reason=reason)
def is_active(self) -> bool:
"""Verificar se usuário está ativo"""
return self.status == UserStatus.ACTIVE
def can_login(self) -> bool:
"""Regra de negócio: usuário pode fazer login?"""
return self.status in [UserStatus.ACTIVE]
def update_last_login(self):
"""Atualizar último login"""
self.last_login = datetime.now()
self.updated_at = datetime.now()
Value Objects#
# src/domain/value_objects/email.py
from dataclasses import dataclass
import re
@dataclass(frozen=True)
class Email:
"""Value Object para email"""
value: str
def __post_init__(self):
if not self._is_valid_email(self.value):
raise ValueError(f"Invalid email: {self.value}")
def _is_valid_email(self, email: str) -> bool:
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def domain(self) -> str:
"""Extrair domínio do email"""
return self.value.split('@')[1]
def is_corporate(self) -> bool:
"""Verificar se é email corporativo"""
corporate_domains = ['gmail.com', 'yahoo.com', 'hotmail.com']
return self.domain() not in corporate_domains
# src/domain/value_objects/money.py
from dataclasses import dataclass
from decimal import Decimal
from typing import Union
@dataclass(frozen=True)
class Money:
"""Value Object para valores monetários"""
amount: Decimal
currency: str = "BRL"
def __post_init__(self):
if self.amount < 0:
raise ValueError("Money amount cannot be negative")
if not self.currency:
raise ValueError("Currency is required")
def add(self, other: 'Money') -> 'Money':
"""Somar valores monetários"""
if self.currency != other.currency:
raise ValueError("Cannot add different currencies")
return Money(self.amount + other.amount, self.currency)
def multiply(self, factor: Union[int, float, Decimal]) -> 'Money':
"""Multiplicar valor"""
return Money(self.amount * Decimal(str(factor)), self.currency)
def is_zero(self) -> bool:
"""Verificar se valor é zero"""
return self.amount == 0
🔧 Casos de Uso (Use Cases)#
User Use Cases#
# src/application/use_cases/user_use_cases.py
from abc import ABC, abstractmethod
from typing import Optional
from domain.entities.user import User, UserStatus
from domain.value_objects.email import Email
from domain.repositories.user_repository import UserRepositoryInterface
from application.interfaces.email_service import EmailServiceInterface
class CreateUserUseCase:
"""Caso de uso: criar novo usuário"""
def __init__(
self,
user_repository: UserRepositoryInterface,
email_service: EmailServiceInterface
):
self.user_repository = user_repository
self.email_service = email_service
async def execute(self, email: str, username: str, full_name: str) -> User:
"""Executar caso de uso"""
# Validar email
email_vo = Email(email)
# Validações de negócio
existing_user = await self.user_repository.find_by_email(email)
if existing_user:
raise ValueError("User with this email already exists")
existing_username = await self.user_repository.find_by_username(username)
if existing_username:
raise ValueError("Username already taken")
# Criar entidade
user = User(
id=None,
email=email,
username=username,
full_name=full_name,
status=UserStatus.PENDING_VERIFICATION
)
# Persistir
saved_user = await self.user_repository.save(user)
# Enviar email de verificação
await self.email_service.send_verification_email(saved_user)
return saved_user
class AuthenticateUserUseCase:
"""Caso de uso: autenticar usuário"""
def __init__(self, user_repository: UserRepositoryInterface):
self.user_repository = user_repository
async def execute(self, username: str, password: str) -> User | None:
"""Executar autenticação"""
# Buscar usuário
user = await self.user_repository.find_by_username(username)
if not user:
raise ValueError("Invalid credentials")
# Verificar se pode fazer login
if not user.can_login():
raise ValueError("User cannot login")
# Verificar senha (implementar hash)
# if not self._verify_password(password, user.password_hash):
# raise ValueError("Invalid credentials")
# Atualizar último login
user.update_last_login()
await self.user_repository.save(user)
return user
class SuspendUserUseCase:
"""Caso de uso: suspender usuário"""
def __init__(self, user_repository: UserRepositoryInterface):
self.user_repository = user_repository
async def execute(self, user_id: int, reason: str = None) -> User:
"""Executar suspensão"""
user = await self.user_repository.find_by_id(user_id)
if not user:
raise ValueError("User not found")
# Aplicar regra de negócio
event = user.suspend(reason)
# Persistir mudança
updated_user = await self.user_repository.save(user)
# Publicar evento (se necessário)
# await self.event_publisher.publish(event)
return updated_user
🗄️ Repository Pattern#
Interface do Repositório#
# src/domain/repositories/user_repository.py
from abc import ABC, abstractmethod
from typing import Optional
from domain.entities.user import User
class UserRepositoryInterface(ABC):
"""Interface do repositório de usuários"""
@abstractmethod
async def save(self, user: User) -> User:
"""Salvar usuário"""
pass
@abstractmethod
async def find_by_id(self, user_id: int) -> Optional[User]:
"""Buscar usuário por ID"""
pass
@abstractmethod
async def find_by_email(self, email: str) -> Optional[User]:
"""Buscar usuário por email"""
pass
@abstractmethod
async def find_by_username(self, username: str) -> User | None:
"""Buscar usuário por username"""
pass
@abstractmethod
async def find_all(
self,
skip: int = 0,
limit: int = 100
) -> list[User]:
"""Listar usuários com paginação"""
pass
@abstractmethod
async def delete(self, user_id: int) -> bool:
"""Deletar usuário"""
pass
@abstractmethod
async def count(self) -> int:
"""Contar total de usuários"""
pass
Implementação SQLAlchemy#
# src/infrastructure/repositories/sqlalchemy_user_repository.py
from sqlalchemy.orm import Session
from sqlalchemy import func
from typing import Optional
from domain.repositories.user_repository import UserRepositoryInterface
from domain.entities.user import User, UserStatus
from infrastructure.database.models import UserModel
class SQLAlchemyUserRepository(UserRepositoryInterface):
"""Implementação do repositório usando SQLAlchemy"""
def __init__(self, session: Session):
self.session = session
async def save(self, user: User) -> User:
"""Salvar usuário"""
if user.id is None:
# Criar novo
db_user = UserModel(
email=user.email,
username=user.username,
full_name=user.full_name,
status=user.status.value
)
self.session.add(db_user)
else:
# Atualizar existente
db_user = self.session.query(UserModel).filter(
UserModel.id == user.id
).first()
if db_user:
db_user.email = user.email
db_user.username = user.username
db_user.full_name = user.full_name
db_user.status = user.status.value
db_user.updated_at = user.updated_at
db_user.last_login = user.last_login
self.session.commit()
self.session.refresh(db_user)
# Converter para entidade de domínio
return self._to_domain_entity(db_user)
async def find_by_id(self, user_id: int) -> Optional[User]:
"""Buscar usuário por ID"""
db_user = self.session.query(UserModel).filter(
UserModel.id == user_id
).first()
return self._to_domain_entity(db_user) if db_user else None
async def find_by_email(self, email: str) -> Optional[User]:
"""Buscar usuário por email"""
db_user = self.session.query(UserModel).filter(
UserModel.email == email
).first()
return self._to_domain_entity(db_user) if db_user else None
async def find_by_username(self, username: str) -> Optional[User]:
"""Buscar usuário por username"""
db_user = self.session.query(UserModel).filter(
UserModel.username == username
).first()
return self._to_domain_entity(db_user) if db_user else None
async def find_all(self, skip: int = 0, limit: int = 100) -> List[User]:
"""Listar usuários com paginação"""
db_users = self.session.query(UserModel).offset(skip).limit(limit).all()
return [self._to_domain_entity(db_user) for db_user in db_users]
async def delete(self, user_id: int) -> bool:
"""Deletar usuário"""
db_user = self.session.query(UserModel).filter(
UserModel.id == user_id
).first()
if db_user:
self.session.delete(db_user)
self.session.commit()
return True
return False
async def count(self) -> int:
"""Contar total de usuários"""
return self.session.query(func.count(UserModel.id)).scalar()
def _to_domain_entity(self, db_user: UserModel) -> User:
"""Converter modelo do banco para entidade de domínio"""
return User(
id=db_user.id,
email=db_user.email,
username=db_user.username,
full_name=db_user.full_name,
status=UserStatus(db_user.status),
created_at=db_user.created_at,
updated_at=db_user.updated_at,
last_login=db_user.last_login
)
🌐 Controllers FastAPI#
User Controller#
# src/presentation/api/user_controller.py
from fastapi import APIRouter, Depends, HTTPException, status
from typing import Optional
from application.use_cases.user_use_cases import (
CreateUserUseCase,
AuthenticateUserUseCase,
SuspendUserUseCase
)
from presentation.schemas.user_schemas import (
UserCreateRequest,
UserResponse,
UserListResponse,
LoginRequest,
LoginResponse
)
from presentation.dependencies.user_dependencies import (
get_create_user_use_case,
get_authenticate_user_use_case,
get_suspend_user_use_case
)
router = APIRouter(prefix="/users", tags=["Users"])
@router.post(
"/",
response_model=UserResponse,
status_code=status.HTTP_201_CREATED,
summary="Criar usuário"
)
async def create_user(
request: UserCreateRequest,
use_case: CreateUserUseCase = Depends(get_create_user_use_case)
):
"""
Criar um novo usuário no sistema.
- **email**: Email único do usuário
- **username**: Nome de usuário único
- **full_name**: Nome completo do usuário
"""
try:
user = await use_case.execute(
email=request.email,
username=request.username,
full_name=request.full_name
)
return UserResponse.from_domain_entity(user)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.post(
"/login",
response_model=LoginResponse,
summary="Autenticar usuário"
)
async def login(
request: LoginRequest,
use_case: AuthenticateUserUseCase = Depends(get_authenticate_user_use_case)
):
"""
Autenticar usuário no sistema.
- **username**: Nome de usuário ou email
- **password**: Senha do usuário
"""
try:
user = await use_case.execute(
username=request.username,
password=request.password
)
# Gerar token JWT (implementar)
token = "fake-jwt-token"
return LoginResponse(
access_token=token,
token_type="bearer",
user=UserResponse.from_domain_entity(user)
)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=str(e)
)
@router.patch(
"/{user_id}/suspend",
response_model=UserResponse,
summary="Suspender usuário"
)
async def suspend_user(
user_id: int,
reason: str = None,
use_case: SuspendUserUseCase = Depends(get_suspend_user_use_case)
):
"""
Suspender um usuário do sistema.
- **user_id**: ID do usuário a ser suspenso
- **reason**: Motivo da suspensão (opcional)
"""
try:
user = await use_case.execute(user_id=user_id, reason=reason)
return UserResponse.from_domain_entity(user)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
🔌 Dependency Injection#
Configuração de Dependências#
# src/presentation/dependencies/user_dependencies.py
from fastapi import Depends
from sqlalchemy.orm import Session
from application.use_cases.user_use_cases import (
CreateUserUseCase,
AuthenticateUserUseCase,
SuspendUserUseCase
)
from infrastructure.repositories.sqlalchemy_user_repository import SQLAlchemyUserRepository
from infrastructure.services.email_service import EmailService
from core.database import get_db
def get_user_repository(db: Session = Depends(get_db)) -> SQLAlchemyUserRepository:
"""Dependência para repositório de usuários"""
return SQLAlchemyUserRepository(db)
def get_email_service() -> EmailService:
"""Dependência para serviço de email"""
return EmailService()
def get_create_user_use_case(
user_repository: SQLAlchemyUserRepository = Depends(get_user_repository),
email_service: EmailService = Depends(get_email_service)
) -> CreateUserUseCase:
"""Dependência para caso de uso de criação de usuário"""
return CreateUserUseCase(user_repository, email_service)
def get_authenticate_user_use_case(
user_repository: SQLAlchemyUserRepository = Depends(get_user_repository)
) -> AuthenticateUserUseCase:
"""Dependência para caso de uso de autenticação"""
return AuthenticateUserUseCase(user_repository)
def get_suspend_user_use_case(
user_repository: SQLAlchemyUserRepository = Depends(get_user_repository)
) -> SuspendUserUseCase:
"""Dependência para caso de uso de suspensão"""
return SuspendUserUseCase(user_repository)
📊 Schemas de Apresentação#
# src/presentation/schemas/user_schemas.py
from pydantic import BaseModel, Field, EmailStr
from typing import Optional
from datetime import datetime
from domain.entities.user import User, UserStatus
class UserCreateRequest(BaseModel):
"""Schema para criação de usuário"""
email: EmailStr = Field(..., description="Email do usuário")
username: str = Field(..., min_length=3, max_length=50, description="Nome de usuário")
full_name: str = Field(..., min_length=2, max_length=200, description="Nome completo")
class LoginRequest(BaseModel):
"""Schema para login"""
username: str = Field(..., description="Username ou email")
password: str = Field(..., min_length=8, description="Senha")
class UserResponse(BaseModel):
"""Schema de resposta para usuário"""
id: int
email: str
username: str
full_name: str
status: UserStatus
created_at: datetime
updated_at: datetime | None = None
last_login: datetime | None = None
@classmethod
def from_domain_entity(cls, user: User) -> 'UserResponse':
"""Converter entidade de domínio para schema de resposta"""
return cls(
id=user.id,
email=user.email,
username=user.username,
full_name=user.full_name,
status=user.status,
created_at=user.created_at,
updated_at=user.updated_at,
last_login=user.last_login
)
class Config:
from_attributes = True
class LoginResponse(BaseModel):
"""Schema de resposta para login"""
access_token: str
token_type: str = "bearer"
user: UserResponse
class UserListResponse(BaseModel):
"""Schema para lista de usuários"""
users: List[UserResponse]
total: int
skip: int
limit: int
🧪 Testando a Arquitetura#
Teste de Use Case#
# tests/unit/test_user_use_cases.py
import pytest
from unittest.mock import Mock, AsyncMock
from application.use_cases.user_use_cases import CreateUserUseCase
from domain.entities.user import User, UserStatus
@pytest.mark.asyncio
async def test_create_user_success():
"""Teste de criação de usuário com sucesso"""
# Arrange
user_repository = Mock()
email_service = Mock()
user_repository.find_by_email = AsyncMock(return_value=None)
user_repository.find_by_username = AsyncMock(return_value=None)
user_repository.save = AsyncMock(return_value=User(
id=1,
email="test@example.com",
username="testuser",
full_name="Test User",
status=UserStatus.PENDING_VERIFICATION
))
email_service.send_verification_email = AsyncMock()
use_case = CreateUserUseCase(user_repository, email_service)
# Act
result = await use_case.execute(
email="test@example.com",
username="testuser",
full_name="Test User"
)
# Assert
assert result.email == "test@example.com"
assert result.username == "testuser"
assert result.status == UserStatus.PENDING_VERIFICATION
user_repository.save.assert_called_once()
email_service.send_verification_email.assert_called_once()
@pytest.mark.asyncio
async def test_create_user_email_already_exists():
"""Teste de criação com email já existente"""
# Arrange
user_repository = Mock()
email_service = Mock()
existing_user = User(
id=1,
email="test@example.com",
username="existing",
full_name="Existing User",
status=UserStatus.ACTIVE
)
user_repository.find_by_email = AsyncMock(return_value=existing_user)
use_case = CreateUserUseCase(user_repository, email_service)
# Act & Assert
with pytest.raises(ValueError, match="User with this email already exists"):
await use_case.execute(
email="test@example.com",
username="testuser",
full_name="Test User"
)
🔐 Configuração de Segurança#
Dependências de Segurança#
pip install python-jose[cryptography] passlib[bcrypt] python-multipart
Configurações de Segurança (core/security.py)#
from datetime import datetime, timedelta
from typing import Optional, Union, Any
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseSettings
import secrets
class SecuritySettings(BaseSettings):
# JWT Configuration
SECRET_KEY: str = secrets.token_urlsafe(32)
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
# Password Configuration
PWD_CONTEXT_SCHEMES: list = ["bcrypt"]
PWD_CONTEXT_DEPRECATED: str = "auto"
# Security Headers
CORS_ORIGINS: list = ["http://localhost:3000", "http://localhost:8000"]
CORS_ALLOW_CREDENTIALS: bool = True
CORS_ALLOW_METHODS: list = ["*"]
CORS_ALLOW_HEADERS: list = ["*"]
class Config:
env_file = ".env"
case_sensitive = True
security_settings = SecuritySettings()
# Configurar contexto de senha
pwd_context = CryptContext(
schemes=security_settings.PWD_CONTEXT_SCHEMES,
deprecated=security_settings.PWD_CONTEXT_DEPRECATED
)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verificar senha"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Gerar hash da senha"""
return pwd_context.hash(password)
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = None
) -> str:
"""Criar token de acesso JWT"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=security_settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode.update({"exp": expire, "type": "access"})
encoded_jwt = jwt.encode(
to_encode,
security_settings.SECRET_KEY,
algorithm=security_settings.ALGORITHM
)
return encoded_jwt
def create_refresh_token(data: dict) -> str:
"""Criar refresh token"""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(
days=security_settings.REFRESH_TOKEN_EXPIRE_DAYS
)
to_encode.update({"exp": expire, "type": "refresh"})
encoded_jwt = jwt.encode(
to_encode,
security_settings.SECRET_KEY,
algorithm=security_settings.ALGORITHM
)
return encoded_jwt
def verify_token(token: str, token_type: str = "access") -> Optional[dict]:
"""Verificar e decodificar token"""
try:
payload = jwt.decode(
token,
security_settings.SECRET_KEY,
algorithms=[security_settings.ALGORITHM]
)
# Verificar tipo do token
if payload.get("type") != token_type:
return None
return payload
except JWTError:
return None
👤 Modelos de Usuário e Roles#
Modelo de Role (db/models/role.py)#
from sqlalchemy import Column, String, Boolean, Text
from sqlalchemy.orm import relationship
from db.models.base import BaseModel
class Role(BaseModel):
__tablename__ = "roles"
name = Column(String(50), unique=True, nullable=False, index=True)
description = Column(Text, nullable=True)
is_active = Column(Boolean, default=True, nullable=False)
# Relacionamentos
users = relationship("User", back_populates="role")
permissions = relationship("Permission", secondary="role_permissions", back_populates="roles")
def __repr__(self):
return f"<Role(id={self.id}, name='{self.name}')>"
Modelo de Permissão (db/models/permission.py)#
from sqlalchemy import Column, String, Text, Boolean, Table, ForeignKey
from sqlalchemy.orm import relationship
from db.models.base import BaseModel
from db.session import Base
# Tabela de associação many-to-many
role_permissions = Table(
'role_permissions',
Base.metadata,
Column('role_id', ForeignKey('roles.id'), primary_key=True),
Column('permission_id', ForeignKey('permissions.id'), primary_key=True)
)
class Permission(BaseModel):
__tablename__ = "permissions"
name = Column(String(100), unique=True, nullable=False, index=True)
resource = Column(String(50), nullable=False) # users, items, orders
action = Column(String(20), nullable=False) # create, read, update, delete
description = Column(Text, nullable=True)
is_active = Column(Boolean, default=True, nullable=False)
# Relacionamentos
roles = relationship("Role", secondary=role_permissions, back_populates="permissions")
def __repr__(self):
return f"<Permission(id={self.id}, name='{self.name}')>"
@property
def full_name(self):
return f"{self.resource}:{self.action}"
Modelo de Usuário Atualizado (db/models/user.py)#
from sqlalchemy import Column, String, Boolean, DateTime, Integer, ForeignKey
from sqlalchemy.orm import relationship
from db.models.base import BaseModel
class User(BaseModel):
__tablename__ = "users"
email = Column(String(255), unique=True, index=True, nullable=False)
username = Column(String(50), unique=True, index=True, nullable=False)
full_name = Column(String(100), nullable=True)
hashed_password = Column(String(255), nullable=False)
is_active = Column(Boolean, default=True)
is_superuser = Column(Boolean, default=False)
last_login = Column(DateTime(timezone=True), nullable=True)
# Foreign Key para Role
role_id = Column(Integer, ForeignKey("roles.id"), nullable=True)
# Relacionamentos
role = relationship("Role", back_populates="users")
def __repr__(self):
return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"
def has_permission(self, resource: str, action: str) -> bool:
"""Verificar se usuário tem permissão específica"""
if self.is_superuser:
return True
if not self.role or not self.role.is_active:
return False
for permission in self.role.permissions:
if (permission.resource == resource and
permission.action == action and
permission.is_active):
return True
return False
def get_permissions(self) -> list:
"""Obter lista de permissões do usuário"""
if self.is_superuser:
return ["*:*"] # Superuser tem todas as permissões
if not self.role or not self.role.is_active:
return []
return [
permission.full_name
for permission in self.role.permissions
if permission.is_active
]
📋 Schemas de Autenticação#
Schemas de Token (schemas/auth.py)#
from pydantic import BaseModel, EmailStr
from typing import Optional
from datetime import datetime
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int
class TokenData(BaseModel):
username: str | None = None
user_id: int | None = None
permissions: list[str] = []
class LoginRequest(BaseModel):
username: str # Pode ser username ou email
password: str
class RefreshTokenRequest(BaseModel):
refresh_token: str
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: str
confirm_password: str
class UserLogin(BaseModel):
id: int
username: str
email: EmailStr
full_name: Optional[str]
is_active: bool
role_name: Optional[str]
permissions: List[str]
last_login: Optional[datetime]
class Config:
from_attributes = True
Schemas de Usuário (schemas/user.py)#
from pydantic import BaseModel, EmailStr, validator
from typing import Optional
from datetime import datetime
class UserBase(BaseModel):
email: EmailStr
username: str
full_name: Optional[str] = None
is_active: bool = True
class UserCreate(UserBase):
password: str
role_id: Optional[int] = None
@validator('password')
def validate_password(cls, v):
if len(v) < 8:
raise ValueError('Password must be at least 8 characters long')
if not any(c.isupper() for c in v):
raise ValueError('Password must contain at least one uppercase letter')
if not any(c.islower() for c in v):
raise ValueError('Password must contain at least one lowercase letter')
if not any(c.isdigit() for c in v):
raise ValueError('Password must contain at least one digit')
return v
class UserUpdate(BaseModel):
email: EmailStr | None = None
username: str | None = None
full_name: str | None = None
is_active: bool | None = None
role_id: int | None = None
class User(UserBase):
id: int
is_superuser: bool
last_login: datetime | None
role_name: str | None
permissions: list[str]
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class UserInDB(User):
hashed_password: str
🔒 Dependências de Autenticação#
Dependências de Segurança (core/deps.py)#
from typing import Optional, List
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from core.security import verify_token
from db.session import get_db
from db.repository.user_repository import UserRepository
from schemas.auth import TokenData
from schemas.user import User
# Configurar esquema de autenticação
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""Obter usuário atual a partir do token"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Verificar token
payload = verify_token(credentials.credentials, "access")
if payload is None:
raise credentials_exception
# Extrair dados do token
username: str = payload.get("sub")
user_id: int = payload.get("user_id")
if username is None or user_id is None:
raise credentials_exception
# Buscar usuário no banco
user_repo = UserRepository(db)
user = user_repo.get(user_id)
if user is None:
raise credentials_exception
# Converter para schema
user_data = User.from_orm(user)
user_data.role_name = user.role.name if user.role else None
user_data.permissions = user.get_permissions()
return user_data
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""Verificar se usuário está ativo"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return current_user
def require_permissions(required_permissions: List[str]):
"""Decorator para exigir permissões específicas"""
def permission_checker(
current_user: User = Depends(get_current_active_user)
) -> User:
if current_user.is_superuser:
return current_user
user_permissions = set(current_user.permissions)
required_permissions_set = set(required_permissions)
if not required_permissions_set.issubset(user_permissions):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
return permission_checker
def require_role(required_role: str):
"""Decorator para exigir role específica"""
def role_checker(
current_user: User = Depends(get_current_active_user)
) -> User:
if current_user.is_superuser:
return current_user
if current_user.role_name != required_role:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Role '{required_role}' required"
)
return current_user
return role_checker
# Aliases para facilitar uso
RequireAdmin = require_role("admin")
RequireManager = require_role("manager")
RequireUser = require_role("user")
RequireUserRead = require_permissions(["users:read"])
RequireUserWrite = require_permissions(["users:create", "users:update"])
RequireItemManage = require_permissions(["items:create", "items:update", "items:delete"])
🔐 Serviços de Autenticação#
Serviço de Autenticação (services/auth_service.py)#
from typing import Optional, Tuple
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from fastapi import HTTPException, status
from core.security import (
verify_password,
get_password_hash,
create_access_token,
create_refresh_token,
verify_token,
security_settings
)
from db.repository.user_repository import UserRepository
from schemas.auth import Token, LoginRequest, UserLogin
from schemas.user import UserCreate, User
class AuthService:
def __init__(self, user_repository: UserRepository):
self.user_repository = user_repository
def authenticate_user(self, username: str, password: str) -> Optional[User]:
"""Autenticar usuário por username/email e senha"""
# Buscar por username ou email
user = self.user_repository.get_by_username(username)
if not user:
user = self.user_repository.get_by_email(username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def login(self, login_data: LoginRequest) -> Tuple[Token, UserLogin]:
"""Realizar login e retornar tokens"""
# Autenticar usuário
user = self.authenticate_user(login_data.username, login_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
# Atualizar último login
user.last_login = datetime.utcnow()
self.user_repository.db.commit()
# Criar tokens
access_token_expires = timedelta(
minutes=security_settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
token_data = {
"sub": user.username,
"user_id": user.id,
"email": user.email
}
access_token = create_access_token(
data=token_data,
expires_delta=access_token_expires
)
refresh_token = create_refresh_token(data=token_data)
# Preparar resposta
token_response = Token(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in=security_settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
user_response = UserLogin(
id=user.id,
username=user.username,
email=user.email,
full_name=user.full_name,
is_active=user.is_active,
role_name=user.role.name if user.role else None,
permissions=user.get_permissions(),
last_login=user.last_login
)
return token_response, user_response
def refresh_access_token(self, refresh_token: str) -> Token:
"""Renovar access token usando refresh token"""
# Verificar refresh token
payload = verify_token(refresh_token, "refresh")
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
# Buscar usuário
user_id = payload.get("user_id")
user = self.user_repository.get(user_id)
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive"
)
# Criar novo access token
access_token_expires = timedelta(
minutes=security_settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
token_data = {
"sub": user.username,
"user_id": user.id,
"email": user.email
}
access_token = create_access_token(
data=token_data,
expires_delta=access_token_expires
)
return Token(
access_token=access_token,
refresh_token=refresh_token, # Manter o mesmo refresh token
token_type="bearer",
expires_in=security_settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
def register_user(self, user_data: UserCreate) -> User:
"""Registrar novo usuário"""
# Verificar se username já existe
if self.user_repository.get_by_username(user_data.username):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
# Verificar se email já existe
if self.user_repository.get_by_email(user_data.email):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Hash da senha
hashed_password = get_password_hash(user_data.password)
# Criar usuário
user_dict = user_data.dict()
user_dict.pop('password')
user_dict['hashed_password'] = hashed_password
user = self.user_repository.create(user_dict)
return User.from_orm(user)
def change_password(
self,
user_id: int,
current_password: str,
new_password: str
) -> bool:
"""Alterar senha do usuário"""
user = self.user_repository.get(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Verificar senha atual
if not verify_password(current_password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Incorrect current password"
)
# Atualizar senha
new_hashed_password = get_password_hash(new_password)
user.hashed_password = new_hashed_password
self.user_repository.db.commit()
return True
🛡️ Endpoints de Autenticação#
Router de Autenticação (api/v1/auth.py)#
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer
from sqlalchemy.orm import Session
from core.deps import get_current_active_user, get_db
from services.auth_service import AuthService
from db.repository.user_repository import UserRepository
from schemas.auth import (
Token,
LoginRequest,
RefreshTokenRequest,
ChangePasswordRequest,
UserLogin
)
from schemas.user import UserCreate, User
router = APIRouter(prefix="/auth", tags=["Authentication"])
security = HTTPBearer()
def get_auth_service(db: Session = Depends(get_db)) -> AuthService:
"""Dependência para obter serviço de autenticação"""
user_repo = UserRepository(db)
return AuthService(user_repo)
@router.post("/login", response_model=dict)
async def login(
login_data: LoginRequest,
auth_service: AuthService = Depends(get_auth_service)
):
"""
Realizar login e obter tokens de acesso.
- **username**: Username ou email do usuário
- **password**: Senha do usuário
Retorna access_token, refresh_token e dados do usuário.
"""
token, user = auth_service.login(login_data)
return {
"message": "Login successful",
"token": token,
"user": user
}
@router.post("/refresh", response_model=Token)
async def refresh_token(
refresh_data: RefreshTokenRequest,
auth_service: AuthService = Depends(get_auth_service)
):
"""
Renovar access token usando refresh token.
- **refresh_token**: Token de renovação válido
"""
return auth_service.refresh_access_token(refresh_data.refresh_token)
@router.post("/register", response_model=User)
async def register(
user_data: UserCreate,
auth_service: AuthService = Depends(get_auth_service)
):
"""
Registrar novo usuário.
- **email**: Email único do usuário
- **username**: Username único
- **password**: Senha (mín. 8 caracteres, maiúscula, minúscula, número)
- **full_name**: Nome completo (opcional)
"""
return auth_service.register_user(user_data)
@router.post("/change-password")
async def change_password(
password_data: ChangePasswordRequest,
current_user: User = Depends(get_current_active_user),
auth_service: AuthService = Depends(get_auth_service)
):
"""
Alterar senha do usuário atual.
- **current_password**: Senha atual
- **new_password**: Nova senha
- **confirm_password**: Confirmação da nova senha
"""
if password_data.new_password != password_data.confirm_password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="New password and confirmation do not match"
)
success = auth_service.change_password(
current_user.id,
password_data.current_password,
password_data.new_password
)
return {"message": "Password changed successfully"}
@router.get("/me", response_model=UserLogin)
async def get_current_user_info(
current_user: User = Depends(get_current_active_user)
):
"""
Obter informações do usuário atual.
"""
return UserLogin(
id=current_user.id,
username=current_user.username,
email=current_user.email,
full_name=current_user.full_name,
is_active=current_user.is_active,
role_name=current_user.role_name,
permissions=current_user.permissions,
last_login=current_user.last_login
)
@router.post("/logout")
async def logout(
current_user: User = Depends(get_current_active_user)
):
"""
Realizar logout (invalidar tokens no lado cliente).
Nota: Em uma implementação completa, você manteria uma blacklist
de tokens invalidados no Redis ou banco de dados.
"""
return {"message": "Logout successful"}
🔒 Endpoints Protegidos#
Exemplo de Endpoints com Autorização (api/v1/items.py)#
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from typing import List, Optional
from core.deps import (
get_current_active_user,
require_permissions,
RequireAdmin,
get_db
)
from services.item_service import ItemService
from db.repository.item_repository import ItemRepository
from schemas.item import Item, ItemCreate, ItemUpdate
from schemas.user import User
router = APIRouter(prefix="/items", tags=["Items"])
def get_item_service(db: Session = Depends(get_db)) -> ItemService:
item_repo = ItemRepository(db)
return ItemService(item_repo)
@router.get("/", response_model=dict)
async def get_items(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100),
search: str | None = Query(None),
category_id: int | None = Query(None),
available_only: bool = Query(False),
# Qualquer usuário autenticado pode listar itens
current_user: User = Depends(get_current_active_user),
item_service: ItemService = Depends(get_item_service)
):
"""Listar itens com filtros"""
items, total = item_service.get_items(
skip=skip,
limit=limit,
search=search,
category_id=category_id,
available_only=available_only
)
return {
"items": items,
"total": total,
"skip": skip,
"limit": limit
}
@router.get("/{item_id}", response_model=Item)
async def get_item(
item_id: int,
current_user: User = Depends(get_current_active_user),
item_service: ItemService = Depends(get_item_service)
):
"""Obter item por ID"""
item = item_service.get_item(item_id)
if not item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Item not found"
)
return item
@router.post("/", response_model=Item)
async def create_item(
item_data: ItemCreate,
# Apenas usuários com permissão de criar itens
current_user: User = Depends(require_permissions(["items:create"])),
item_service: ItemService = Depends(get_item_service)
):
"""Criar novo item"""
return item_service.create_item(item_data)
@router.put("/{item_id}", response_model=Item)
async def update_item(
item_id: int,
item_data: ItemUpdate,
# Apenas usuários com permissão de atualizar itens
current_user: User = Depends(require_permissions(["items:update"])),
item_service: ItemService = Depends(get_item_service)
):
"""Atualizar item existente"""
item = item_service.update_item(item_id, item_data)
if not item:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Item not found"
)
return item
@router.delete("/{item_id}")
async def delete_item(
item_id: int,
# Apenas administradores podem deletar itens
current_user: User = Depends(RequireAdmin),
item_service: ItemService = Depends(get_item_service)
):
"""Deletar item"""
success = item_service.delete_item(item_id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Item not found"
)
return {"message": "Item deleted successfully"}
🛡️ Middleware de Segurança#
Middleware de Segurança (core/middleware.py)#
from fastapi import Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware
from fastapi.responses import JSONResponse
import time
import logging
from typing import Set
from collections import defaultdict, deque
import asyncio
logger = logging.getLogger(__name__)
class SecurityMiddleware(BaseHTTPMiddleware):
def __init__(self, app, max_requests_per_minute: int = 60):
super().__init__(app)
self.max_requests_per_minute = max_requests_per_minute
self.request_counts: dict[str, deque] = defaultdict(deque)
self.blocked_ips: Set[str] = set()
async def dispatch(self, request: Request, call_next):
# Obter IP do cliente
client_ip = self.get_client_ip(request)
# Verificar se IP está bloqueado
if client_ip in self.blocked_ips:
return JSONResponse(
status_code=429,
content={"detail": "IP blocked due to excessive requests"}
)
# Rate limiting
if not self.check_rate_limit(client_ip):
logger.warning(f"Rate limit exceeded for IP: {client_ip}")
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded"}
)
# Adicionar headers de segurança
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
# Headers de segurança
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["X-Process-Time"] = str(process_time)
return response
def get_client_ip(self, request: Request) -> str:
"""Obter IP real do cliente considerando proxies"""
forwarded_for = request.headers.get("X-Forwarded-For")
if forwarded_for:
return forwarded_for.split(",")[0].strip()
real_ip = request.headers.get("X-Real-IP")
if real_ip:
return real_ip
return request.client.host if request.client else "unknown"
def check_rate_limit(self, client_ip: str) -> bool:
"""Verificar rate limit por IP"""
now = time.time()
minute_ago = now - 60
# Limpar requests antigos
while (self.request_counts[client_ip] and
self.request_counts[client_ip][0] < minute_ago):
self.request_counts[client_ip].popleft()
# Verificar se excedeu o limite
if len(self.request_counts[client_ip]) >= self.max_requests_per_minute:
# Bloquear IP por 5 minutos se exceder muito o limite
if len(self.request_counts[client_ip]) > self.max_requests_per_minute * 2:
self.blocked_ips.add(client_ip)
# Remover do bloqueio após 5 minutos
asyncio.create_task(self.unblock_ip_after_delay(client_ip, 300))
return False
# Adicionar request atual
self.request_counts[client_ip].append(now)
return True
async def unblock_ip_after_delay(self, ip: str, delay: int):
"""Desbloquear IP após delay em segundos"""
await asyncio.sleep(delay)
self.blocked_ips.discard(ip)
logger.info(f"IP {ip} unblocked after {delay} seconds")
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
# Log da requisição
logger.info(
f"Request: {request.method} {request.url.path} "
f"from {request.client.host if request.client else 'unknown'}"
)
response = await call_next(request)
# Log da resposta
process_time = time.time() - start_time
logger.info(
f"Response: {response.status_code} "
f"in {process_time:.4f}s"
)
return response
🎯 Próximos Passos#
Com autenticação e autorização implementadas, você pode:
Step 4: Implementar testes automatizados
Step 5: Adicionar cache e otimizações
Step 6: Configurar WebSockets para tempo real
Step 7: Deploy e monitoramento
📝 Exercícios Práticos#
Exercício 1: Sistema de Roles#
Crie um sistema completo com roles:
Admin: todas as permissões
Manager: gerenciar itens e usuários
User: apenas visualizar
Exercício 2: Auditoria#
Implemente um sistema de auditoria que registra:
Quem fez a ação
Quando foi feita
Que dados foram alterados
Exercício 3: Rate Limiting Avançado#
Implemente rate limiting diferenciado por:
Tipo de usuário (admin tem mais requests)
Endpoint (alguns endpoints têm limites menores)
Horário (limites diferentes por período)
Anterior: Step 2: Banco de Dados | Próximo: Step 4: Testes