docs: add authentication example

This commit is contained in:
2026-03-09 13:44:51 -04:00
parent 92036d6b88
commit c3d1fe977d
9 changed files with 425 additions and 0 deletions

View File

@@ -0,0 +1 @@
# Authentication

View File

@@ -0,0 +1,9 @@
from fastapi import FastAPI
from fastapi_toolsets.exceptions import init_exceptions_handlers
from .routes import router
app = FastAPI()
init_exceptions_handlers(app=app)
app.include_router(router=router)

View File

@@ -0,0 +1,9 @@
from fastapi_toolsets.crud import CrudFactory
from .models import OAuthAccount, OAuthProvider, Team, User, UserToken
TeamCrud = CrudFactory(model=Team)
UserCrud = CrudFactory(model=User)
UserTokenCrud = CrudFactory(model=UserToken)
OAuthProviderCrud = CrudFactory(model=OAuthProvider)
OAuthAccountCrud = CrudFactory(model=OAuthAccount)

View File

@@ -0,0 +1,15 @@
from fastapi import Depends
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from fastapi_toolsets.db import create_db_context, create_db_dependency
DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost:5432/postgres"
engine = create_async_engine(url=DATABASE_URL, future=True)
async_session_maker = async_sessionmaker(bind=engine, expire_on_commit=False)
get_db = create_db_dependency(session_maker=async_session_maker)
get_db_context = create_db_context(session_maker=async_session_maker)
SessionDep = Depends(get_db)

View File

@@ -0,0 +1,105 @@
import enum
from datetime import datetime
from uuid import UUID
from sqlalchemy import (
Boolean,
DateTime,
Enum,
ForeignKey,
Integer,
String,
UniqueConstraint,
)
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from fastapi_toolsets.models import TimestampMixin, UUIDMixin
class Base(DeclarativeBase, UUIDMixin):
type_annotation_map = {
str: String(),
int: Integer(),
UUID: PG_UUID(as_uuid=True),
datetime: DateTime(timezone=True),
}
class UserRole(enum.Enum):
admin = "admin"
moderator = "moderator"
user = "user"
class Team(Base, TimestampMixin):
__tablename__ = "teams"
name: Mapped[str] = mapped_column(String, unique=True, index=True)
users: Mapped[list["User"]] = relationship(back_populates="team")
class User(Base, TimestampMixin):
__tablename__ = "users"
username: Mapped[str] = mapped_column(String, unique=True, index=True)
email: Mapped[str | None] = mapped_column(
String, unique=True, index=True, nullable=True
)
hashed_password: Mapped[str | None] = mapped_column(String, nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
role: Mapped[UserRole] = mapped_column(Enum(UserRole), default=UserRole.user)
team_id: Mapped[UUID | None] = mapped_column(ForeignKey("teams.id"), nullable=True)
team: Mapped["Team | None"] = relationship(back_populates="users")
oauth_accounts: Mapped[list["OAuthAccount"]] = relationship(back_populates="user")
tokens: Mapped[list["UserToken"]] = relationship(back_populates="user")
class UserToken(Base, TimestampMixin):
"""API tokens for a user (multiple allowed)."""
__tablename__ = "user_tokens"
user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"))
# Store hashed token value
token_hash: Mapped[str] = mapped_column(String, unique=True, index=True)
name: Mapped[str | None] = mapped_column(String, nullable=True)
expires_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
user: Mapped["User"] = relationship(back_populates="tokens")
class OAuthProvider(Base, TimestampMixin):
"""Configurable OAuth2 / OpenID Connect provider."""
__tablename__ = "oauth_providers"
slug: Mapped[str] = mapped_column(String, unique=True, index=True)
name: Mapped[str] = mapped_column(String)
client_id: Mapped[str] = mapped_column(String)
client_secret: Mapped[str] = mapped_column(String)
discovery_url: Mapped[str] = mapped_column(String, nullable=False)
scopes: Mapped[str] = mapped_column(String, default="openid email profile")
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
accounts: Mapped[list["OAuthAccount"]] = relationship(back_populates="provider")
class OAuthAccount(Base, TimestampMixin):
"""OAuth2 / OpenID Connect account linked to a user."""
__tablename__ = "oauth_accounts"
__table_args__ = (
UniqueConstraint("provider_id", "subject", name="uq_oauth_provider_subject"),
)
user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"))
provider_id: Mapped[UUID] = mapped_column(ForeignKey("oauth_providers.id"))
# OAuth `sub` / OpenID subject identifier
subject: Mapped[str] = mapped_column(String)
user: Mapped["User"] = relationship(back_populates="oauth_accounts")
provider: Mapped["OAuthProvider"] = relationship(back_populates="accounts")

View File

@@ -0,0 +1,122 @@
from typing import Annotated
from uuid import UUID
import bcrypt
from fastapi import APIRouter, Form, HTTPException, Response, Security
from fastapi_toolsets.dependencies import PathDependency
from .crud import UserCrud, UserTokenCrud
from .db import SessionDep
from .models import OAuthProvider, User, UserToken
from .schemas import (
ApiTokenCreateRequest,
ApiTokenResponse,
RegisterRequest,
UserCreate,
UserResponse,
)
from .security import auth, cookie_auth, create_api_token
ProviderDep = PathDependency(
model=OAuthProvider,
field=OAuthProvider.slug,
session_dep=SessionDep,
param_name="slug",
)
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def verify_password(plain: str, hashed: str) -> bool:
return bcrypt.checkpw(plain.encode(), hashed.encode())
router = APIRouter(prefix="/auth")
@router.post("/register", response_model=UserResponse, status_code=201)
async def register(body: RegisterRequest, session: SessionDep):
existing = await UserCrud.first(
session=session, filters=[User.username == body.username]
)
if existing:
raise HTTPException(status_code=409, detail="Username already taken")
user = await UserCrud.create(
session=session,
obj=UserCreate(
username=body.username,
email=body.email,
hashed_password=hash_password(body.password),
),
)
return user
@router.post("/token", status_code=204)
async def login(
session: SessionDep,
response: Response,
username: Annotated[str, Form()],
password: Annotated[str, Form()],
):
user = await UserCrud.first(session=session, filters=[User.username == username])
if (
not user
or not user.hashed_password
or not verify_password(password, user.hashed_password)
):
raise HTTPException(status_code=401, detail="Invalid credentials")
if not user.is_active:
raise HTTPException(status_code=403, detail="Account disabled")
cookie_auth.set_cookie(response, str(user.id))
@router.post("/logout", status_code=204)
async def logout(response: Response):
cookie_auth.delete_cookie(response)
@router.get("/me", response_model=UserResponse)
async def me(user: User = Security(auth)):
return user
@router.post("/tokens", response_model=ApiTokenResponse, status_code=201)
async def create_token(
body: ApiTokenCreateRequest,
user: User = Security(auth),
):
raw, token_row = await create_api_token(
user.id, name=body.name, expires_at=body.expires_at
)
return ApiTokenResponse(
id=token_row.id,
name=token_row.name,
expires_at=token_row.expires_at,
created_at=token_row.created_at,
token=raw,
)
@router.delete("/tokens/{token_id}", status_code=204)
async def revoke_token(
session: SessionDep,
token_id: UUID,
user: User = Security(auth),
):
if not await UserTokenCrud.first(
session=session,
filters=[UserToken.id == token_id, UserToken.user_id == user.id],
):
raise HTTPException(status_code=404, detail="Token not found")
await UserTokenCrud.delete(
session=session,
filters=[UserToken.id == token_id, UserToken.user_id == user.id],
)

View File

@@ -0,0 +1,64 @@
from datetime import datetime
from uuid import UUID
from pydantic import EmailStr
from fastapi_toolsets.schemas import PydanticBase
class RegisterRequest(PydanticBase):
username: str
password: str
email: EmailStr | None = None
class UserResponse(PydanticBase):
id: UUID
username: str
email: str | None
role: str
is_active: bool
model_config = {"from_attributes": True}
class ApiTokenCreateRequest(PydanticBase):
name: str | None = None
expires_at: datetime | None = None
class ApiTokenResponse(PydanticBase):
id: UUID
name: str | None
expires_at: datetime | None
created_at: datetime
# Only populated on creation
token: str | None = None
model_config = {"from_attributes": True}
class OAuthProviderResponse(PydanticBase):
slug: str
name: str
model_config = {"from_attributes": True}
class UserCreate(PydanticBase):
username: str
email: str | None = None
hashed_password: str | None = None
class UserTokenCreate(PydanticBase):
user_id: UUID
token_hash: str
name: str | None = None
expires_at: datetime | None = None
class OAuthAccountCreate(PydanticBase):
user_id: UUID
provider_id: UUID
subject: str

View File

@@ -0,0 +1,100 @@
import hashlib
from datetime import datetime, timezone
from uuid import UUID
from fastapi import HTTPException
from sqlalchemy.orm import selectinload
from fastapi_toolsets.exceptions import UnauthorizedError
from fastapi_toolsets.security import (
APIKeyHeaderAuth,
BearerTokenAuth,
CookieAuth,
MultiAuth,
)
from .crud import UserCrud, UserTokenCrud
from .db import get_db_context
from .models import User, UserRole, UserToken
from .schemas import UserTokenCreate
SESSION_COOKIE = "session"
SECRET_KEY = "123456789"
def _hash_token(token: str) -> str:
return hashlib.sha256(token.encode()).hexdigest()
async def _verify_token(token: str, role: UserRole | None = None) -> User:
async with get_db_context() as db:
user_token = await UserTokenCrud.first(
session=db,
filters=[UserToken.token_hash == _hash_token(token)],
load_options=[selectinload(UserToken.user)],
)
if user_token is None or not user_token.user.is_active:
raise UnauthorizedError()
if user_token.expires_at and user_token.expires_at < datetime.now(timezone.utc):
raise UnauthorizedError()
user = user_token.user
if role is not None and user.role != role:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
async def _verify_cookie(user_id: str, role: UserRole | None = None) -> User:
async with get_db_context() as db:
user = await UserCrud.first(
session=db,
filters=[User.id == UUID(user_id)],
)
if not user or not user.is_active:
raise UnauthorizedError()
if role is not None and user.role != role:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
bearer_auth = BearerTokenAuth(
validator=_verify_token,
prefix="ctf_",
)
header_auth = APIKeyHeaderAuth(
name="X-API-Key",
validator=_verify_token,
)
cookie_auth = CookieAuth(
name=SESSION_COOKIE,
validator=_verify_cookie,
secret_key=SECRET_KEY,
)
auth = MultiAuth(bearer_auth, header_auth, cookie_auth)
async def create_api_token(
user_id: UUID,
*,
name: str | None = None,
expires_at: datetime | None = None,
) -> tuple[str, UserToken]:
raw = bearer_auth.generate_token()
async with get_db_context() as db:
token_row = await UserTokenCrud.create(
session=db,
obj=UserTokenCreate(
user_id=user_id,
token_hash=_hash_token(raw),
name=name,
expires_at=expires_at,
),
)
return raw, token_row