Compare commits

...

8 Commits

30 changed files with 2874 additions and 32 deletions

View File

@@ -34,26 +34,17 @@ jobs:
MAJOR=$(echo "$VERSION" | cut -d. -f1)
DEPLOY_VERSION="v$(echo "$VERSION" | cut -d. -f1-2)"
# On new major: consolidate previous major's feature versions into vX
# On new major: keep only the latest feature version of the previous major
PREV_MAJOR=$((MAJOR - 1))
OLD_FEATURE_VERSIONS=$(uv run mike list 2>/dev/null | grep -oE "^v${PREV_MAJOR}\.[0-9]+" || true)
if [ -n "$OLD_FEATURE_VERSIONS" ]; then
LATEST_PREV_TAG=$(git tag -l "v${PREV_MAJOR}.*" | sort -V | tail -1)
if [ -n "$LATEST_PREV_TAG" ]; then
git checkout "$LATEST_PREV_TAG" -- docs/ docs_src/ src/ zensical.toml
if ! grep -q '\[project\.extra\.version\]' zensical.toml; then
printf '\n[project.extra.version]\nprovider = "mike"\ndefault = "stable"\nalias = true\n' >> zensical.toml
fi
uv run mike deploy "v${PREV_MAJOR}"
git checkout HEAD -- docs/ docs_src/ src/ zensical.toml
fi
# Delete old feature versions
LATEST_PREV=$(echo "$OLD_FEATURE_VERSIONS" | sort -t. -k2 -n | tail -1)
echo "$OLD_FEATURE_VERSIONS" | while read -r OLD_V; do
if [ "$OLD_V" != "$LATEST_PREV" ]; then
echo "Deleting $OLD_V"
uv run mike delete "$OLD_V"
fi
done
fi

View File

@@ -0,0 +1 @@
# Authentication

267
docs/module/security.md Normal file
View File

@@ -0,0 +1,267 @@
# Security
Composable authentication helpers for FastAPI that use `Security()` for OpenAPI documentation and accept user-provided validator functions with full type flexibility.
## Overview
The `security` module provides four auth source classes and a `MultiAuth` factory. Each class wraps a FastAPI security scheme for OpenAPI and accepts a validator function called as:
```python
await validator(credential, **kwargs)
```
where `kwargs` are the extra keyword arguments provided at instantiation (roles, permissions, enums, etc.). The validator returns the authenticated identity (e.g. a `User` model) which becomes the route dependency value.
```python
from fastapi import Security
from fastapi_toolsets.security import BearerTokenAuth
async def verify_token(token: str, *, role: str) -> User:
user = await db.get_by_token(token)
if not user or user.role != role:
raise UnauthorizedError()
return user
bearer_admin = BearerTokenAuth(verify_token, role="admin")
@app.get("/admin")
async def admin_route(user: User = Security(bearer_admin)):
return user
```
## Auth sources
### [`BearerTokenAuth`](../reference/security.md#fastapi_toolsets.security.BearerTokenAuth)
Reads the `Authorization: Bearer <token>` header. Wraps `HTTPBearer` for OpenAPI.
```python
from fastapi_toolsets.security import BearerTokenAuth
bearer = BearerTokenAuth(validator=verify_token)
@app.get("/me")
async def me(user: User = Security(bearer)):
return user
```
#### Token prefix
The optional `prefix` parameter restricts a `BearerTokenAuth` instance to tokens
that start with a given string. The prefix is **kept** in the value passed to the
validator — store and compare tokens with their prefix included.
This lets you deploy multiple `BearerTokenAuth` instances in the same application
and disambiguate them efficiently in `MultiAuth`:
```python
user_bearer = BearerTokenAuth(verify_user, prefix="user_") # matches "Bearer user_..."
org_bearer = BearerTokenAuth(verify_org, prefix="org_") # matches "Bearer org_..."
```
Use [`generate_token()`](#token-generation) to create correctly-prefixed tokens.
#### Token generation
`BearerTokenAuth.generate_token()` produces a secure random token ready to store
in your database and return to the client. If a prefix is configured it is
prepended automatically:
```python
bearer = BearerTokenAuth(verify_token, prefix="user_")
token = bearer.generate_token() # e.g. "user_Xk3mN..."
await db.store_token(user_id, token)
return {"access_token": token, "token_type": "bearer"}
```
The client sends `Authorization: Bearer user_Xk3mN...` and the validator receives
the full token (prefix included) to compare against the stored value.
### [`CookieAuth`](../reference/security.md#fastapi_toolsets.security.CookieAuth)
Reads a named cookie. Wraps `APIKeyCookie` for OpenAPI.
```python
from fastapi_toolsets.security import CookieAuth
cookie_auth = CookieAuth("session", validator=verify_session)
@app.get("/me")
async def me(user: User = Security(cookie_auth)):
return user
```
### [`OAuth2Auth`](../reference/security.md#fastapi_toolsets.security.OAuth2Auth)
Reads the `Authorization: Bearer <token>` header and registers the token endpoint
in OpenAPI via `OAuth2PasswordBearer`.
```python
from fastapi_toolsets.security import OAuth2Auth
oauth2_auth = OAuth2Auth(token_url="/token", validator=verify_token)
@app.get("/me")
async def me(user: User = Security(oauth2_auth)):
return user
```
### [`OpenIDAuth`](../reference/security.md#fastapi_toolsets.security.OpenIDAuth)
Reads the `Authorization: Bearer <token>` header and registers the OpenID Connect
discovery URL in OpenAPI via `OpenIdConnect`. Token validation is fully delegated
to your validator — use any OIDC / JWT library (`authlib`, `python-jose`, `PyJWT`).
```python
from fastapi_toolsets.security import OpenIDAuth
async def verify_google_token(token: str, *, audience: str) -> User:
payload = jwt.decode(token, google_public_keys, algorithms=["RS256"],
audience=audience)
return User(email=payload["email"], name=payload["name"])
google_auth = OpenIDAuth(
"https://accounts.google.com/.well-known/openid-configuration",
verify_google_token,
audience="my-client-id",
)
@app.get("/me")
async def me(user: User = Security(google_auth)):
return user
```
The discovery URL is used **only for OpenAPI documentation** — no requests are made
to it by this class. You are responsible for fetching and caching the provider's
public keys in your validator.
Multiple providers work naturally with `MultiAuth`:
```python
multi = MultiAuth(google_auth, github_auth)
@app.get("/data")
async def data(user: User = Security(multi)):
return user
```
## Typed validator kwargs
All auth classes forward extra instantiation keyword arguments to the validator.
Arguments can be any type — enums, strings, integers, etc. The validator returns
the authenticated identity, which FastAPI injects directly into the route handler.
```python
async def verify_token(token: str, *, role: Role, permission: str) -> User:
user = await decode_token(token)
if user.role != role or permission not in user.permissions:
raise UnauthorizedError()
return user
bearer = BearerTokenAuth(verify_token, role=Role.ADMIN, permission="billing:read")
```
Each auth instance is self-contained — create a separate instance per distinct
requirement instead of passing requirements through `Security(scopes=[...])`.
### Using `.require()` inline
If declaring a new top-level variable per role feels verbose, use `.require()` to
create a configured clone directly in the route decorator. The original instance
is not mutated:
```python
bearer = BearerTokenAuth(verify_token)
@app.get("/admin/stats")
async def admin_stats(user: User = Security(bearer.require(role=Role.ADMIN))):
return {"message": f"Hello admin {user.name}"}
@app.get("/profile")
async def profile(user: User = Security(bearer.require(role=Role.USER))):
return {"id": user.id, "name": user.name}
```
`.require()` kwargs are merged over existing ones — new values win on conflict.
The `prefix` (for `BearerTokenAuth`) and cookie name (for `CookieAuth`) are
always preserved.
`.require()` instances work transparently inside `MultiAuth`:
```python
multi = MultiAuth(
user_bearer.require(role=Role.USER),
org_bearer.require(role=Role.ADMIN),
)
```
## MultiAuth
[`MultiAuth`](../reference/security.md#fastapi_toolsets.security.MultiAuth) combines
multiple auth sources into a single callable. Sources are tried in order; the
first one that finds a credential wins.
```python
from fastapi_toolsets.security import MultiAuth
multi = MultiAuth(user_bearer, org_bearer, cookie_auth)
@app.get("/data")
async def data_route(user = Security(multi)):
return user
```
### Using `.require()` on MultiAuth
`MultiAuth` also supports `.require()`, which propagates the kwargs to every
source that implements it. Sources that do not (e.g. custom `AuthSource`
subclasses) are passed through unchanged:
```python
multi = MultiAuth(bearer, cookie)
@app.get("/admin")
async def admin(user: User = Security(multi.require(role=Role.ADMIN))):
return user
```
This is equivalent to calling `.require()` on each source individually:
```python
# These two are identical
multi.require(role=Role.ADMIN)
MultiAuth(
bearer.require(role=Role.ADMIN),
cookie.require(role=Role.ADMIN),
)
```
### Prefix-based dispatch
Because `extract()` is pure string matching (no I/O), prefix-based source
selection is essentially free. Only the matching source's validator (which may
involve DB or network I/O) is ever called:
```python
user_bearer = BearerTokenAuth(verify_user, prefix="user_")
org_bearer = BearerTokenAuth(verify_org, prefix="org_")
multi = MultiAuth(user_bearer, org_bearer)
# "Bearer user_alice" → only verify_user runs, receives "user_alice"
# "Bearer org_acme" → only verify_org runs, receives "org_acme"
```
Tokens are stored and compared **with their prefix** — use `generate_token()` on
each source to issue correctly-prefixed tokens:
```python
user_token = user_bearer.generate_token() # "user_..."
org_token = org_bearer.generate_token() # "org_..."
```
---
[:material-api: API Reference](../reference/security.md)

View File

@@ -0,0 +1,28 @@
# `security`
Here's the reference for the authentication helpers provided by the `security` module.
You can import them directly from `fastapi_toolsets.security`:
```python
from fastapi_toolsets.security import (
AuthSource,
BearerTokenAuth,
CookieAuth,
OAuth2Auth,
OpenIDAuth,
MultiAuth,
)
```
## ::: fastapi_toolsets.security.AuthSource
## ::: fastapi_toolsets.security.BearerTokenAuth
## ::: fastapi_toolsets.security.CookieAuth
## ::: fastapi_toolsets.security.OAuth2Auth
## ::: fastapi_toolsets.security.OpenIDAuth
## ::: fastapi_toolsets.security.MultiAuth

View File

@@ -0,0 +1,9 @@
from fastapi import FastAPI
from fastapi_toolsets.exceptions import init_exceptions_handlers
from .routes import router
app = FastAPI()
init_exceptions_handlers(app=app)
app.include_router(router=router)

View File

@@ -0,0 +1,9 @@
from fastapi_toolsets.crud import CrudFactory
from .models import OAuthAccount, OAuthProvider, Team, User, UserToken
TeamCrud = CrudFactory(model=Team)
UserCrud = CrudFactory(model=User)
UserTokenCrud = CrudFactory(model=UserToken)
OAuthProviderCrud = CrudFactory(model=OAuthProvider)
OAuthAccountCrud = CrudFactory(model=OAuthAccount)

View File

@@ -0,0 +1,15 @@
from fastapi import Depends
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from fastapi_toolsets.db import create_db_context, create_db_dependency
DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost:5432/postgres"
engine = create_async_engine(url=DATABASE_URL, future=True)
async_session_maker = async_sessionmaker(bind=engine, expire_on_commit=False)
get_db = create_db_dependency(session_maker=async_session_maker)
get_db_context = create_db_context(session_maker=async_session_maker)
SessionDep = Depends(get_db)

View File

@@ -0,0 +1,105 @@
import enum
from datetime import datetime
from uuid import UUID
from sqlalchemy import (
Boolean,
DateTime,
Enum,
ForeignKey,
Integer,
String,
UniqueConstraint,
)
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from fastapi_toolsets.models import TimestampMixin, UUIDMixin
class Base(DeclarativeBase, UUIDMixin):
type_annotation_map = {
str: String(),
int: Integer(),
UUID: PG_UUID(as_uuid=True),
datetime: DateTime(timezone=True),
}
class UserRole(enum.Enum):
admin = "admin"
moderator = "moderator"
user = "user"
class Team(Base, TimestampMixin):
__tablename__ = "teams"
name: Mapped[str] = mapped_column(String, unique=True, index=True)
users: Mapped[list["User"]] = relationship(back_populates="team")
class User(Base, TimestampMixin):
__tablename__ = "users"
username: Mapped[str] = mapped_column(String, unique=True, index=True)
email: Mapped[str | None] = mapped_column(
String, unique=True, index=True, nullable=True
)
hashed_password: Mapped[str | None] = mapped_column(String, nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
role: Mapped[UserRole] = mapped_column(Enum(UserRole), default=UserRole.user)
team_id: Mapped[UUID | None] = mapped_column(ForeignKey("teams.id"), nullable=True)
team: Mapped["Team | None"] = relationship(back_populates="users")
oauth_accounts: Mapped[list["OAuthAccount"]] = relationship(back_populates="user")
tokens: Mapped[list["UserToken"]] = relationship(back_populates="user")
class UserToken(Base, TimestampMixin):
"""API tokens for a user (multiple allowed)."""
__tablename__ = "user_tokens"
user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"))
# Store hashed token value
token_hash: Mapped[str] = mapped_column(String, unique=True, index=True)
name: Mapped[str | None] = mapped_column(String, nullable=True)
expires_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
user: Mapped["User"] = relationship(back_populates="tokens")
class OAuthProvider(Base, TimestampMixin):
"""Configurable OAuth2 / OpenID Connect provider."""
__tablename__ = "oauth_providers"
slug: Mapped[str] = mapped_column(String, unique=True, index=True)
name: Mapped[str] = mapped_column(String)
client_id: Mapped[str] = mapped_column(String)
client_secret: Mapped[str] = mapped_column(String)
discovery_url: Mapped[str] = mapped_column(String, nullable=False)
scopes: Mapped[str] = mapped_column(String, default="openid email profile")
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
accounts: Mapped[list["OAuthAccount"]] = relationship(back_populates="provider")
class OAuthAccount(Base, TimestampMixin):
"""OAuth2 / OpenID Connect account linked to a user."""
__tablename__ = "oauth_accounts"
__table_args__ = (
UniqueConstraint("provider_id", "subject", name="uq_oauth_provider_subject"),
)
user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"))
provider_id: Mapped[UUID] = mapped_column(ForeignKey("oauth_providers.id"))
# OAuth `sub` / OpenID subject identifier
subject: Mapped[str] = mapped_column(String)
user: Mapped["User"] = relationship(back_populates="oauth_accounts")
provider: Mapped["OAuthProvider"] = relationship(back_populates="accounts")

View File

@@ -0,0 +1,122 @@
from typing import Annotated
from uuid import UUID
import bcrypt
from fastapi import APIRouter, Form, HTTPException, Response, Security
from fastapi_toolsets.dependencies import PathDependency
from .crud import UserCrud, UserTokenCrud
from .db import SessionDep
from .models import OAuthProvider, User, UserToken
from .schemas import (
ApiTokenCreateRequest,
ApiTokenResponse,
RegisterRequest,
UserCreate,
UserResponse,
)
from .security import auth, cookie_auth, create_api_token
ProviderDep = PathDependency(
model=OAuthProvider,
field=OAuthProvider.slug,
session_dep=SessionDep,
param_name="slug",
)
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def verify_password(plain: str, hashed: str) -> bool:
return bcrypt.checkpw(plain.encode(), hashed.encode())
router = APIRouter(prefix="/auth")
@router.post("/register", response_model=UserResponse, status_code=201)
async def register(body: RegisterRequest, session: SessionDep):
existing = await UserCrud.first(
session=session, filters=[User.username == body.username]
)
if existing:
raise HTTPException(status_code=409, detail="Username already taken")
user = await UserCrud.create(
session=session,
obj=UserCreate(
username=body.username,
email=body.email,
hashed_password=hash_password(body.password),
),
)
return user
@router.post("/token", status_code=204)
async def login(
session: SessionDep,
response: Response,
username: Annotated[str, Form()],
password: Annotated[str, Form()],
):
user = await UserCrud.first(session=session, filters=[User.username == username])
if (
not user
or not user.hashed_password
or not verify_password(password, user.hashed_password)
):
raise HTTPException(status_code=401, detail="Invalid credentials")
if not user.is_active:
raise HTTPException(status_code=403, detail="Account disabled")
cookie_auth.set_cookie(response, str(user.id))
@router.post("/logout", status_code=204)
async def logout(response: Response):
cookie_auth.delete_cookie(response)
@router.get("/me", response_model=UserResponse)
async def me(user: User = Security(auth)):
return user
@router.post("/tokens", response_model=ApiTokenResponse, status_code=201)
async def create_token(
body: ApiTokenCreateRequest,
user: User = Security(auth),
):
raw, token_row = await create_api_token(
user.id, name=body.name, expires_at=body.expires_at
)
return ApiTokenResponse(
id=token_row.id,
name=token_row.name,
expires_at=token_row.expires_at,
created_at=token_row.created_at,
token=raw,
)
@router.delete("/tokens/{token_id}", status_code=204)
async def revoke_token(
session: SessionDep,
token_id: UUID,
user: User = Security(auth),
):
if not await UserTokenCrud.first(
session=session,
filters=[UserToken.id == token_id, UserToken.user_id == user.id],
):
raise HTTPException(status_code=404, detail="Token not found")
await UserTokenCrud.delete(
session=session,
filters=[UserToken.id == token_id, UserToken.user_id == user.id],
)

View File

@@ -0,0 +1,64 @@
from datetime import datetime
from uuid import UUID
from pydantic import EmailStr
from fastapi_toolsets.schemas import PydanticBase
class RegisterRequest(PydanticBase):
username: str
password: str
email: EmailStr | None = None
class UserResponse(PydanticBase):
id: UUID
username: str
email: str | None
role: str
is_active: bool
model_config = {"from_attributes": True}
class ApiTokenCreateRequest(PydanticBase):
name: str | None = None
expires_at: datetime | None = None
class ApiTokenResponse(PydanticBase):
id: UUID
name: str | None
expires_at: datetime | None
created_at: datetime
# Only populated on creation
token: str | None = None
model_config = {"from_attributes": True}
class OAuthProviderResponse(PydanticBase):
slug: str
name: str
model_config = {"from_attributes": True}
class UserCreate(PydanticBase):
username: str
email: str | None = None
hashed_password: str | None = None
class UserTokenCreate(PydanticBase):
user_id: UUID
token_hash: str
name: str | None = None
expires_at: datetime | None = None
class OAuthAccountCreate(PydanticBase):
user_id: UUID
provider_id: UUID
subject: str

View File

@@ -0,0 +1,100 @@
import hashlib
from datetime import datetime, timezone
from uuid import UUID
from fastapi import HTTPException
from sqlalchemy.orm import selectinload
from fastapi_toolsets.exceptions import UnauthorizedError
from fastapi_toolsets.security import (
APIKeyHeaderAuth,
BearerTokenAuth,
CookieAuth,
MultiAuth,
)
from .crud import UserCrud, UserTokenCrud
from .db import get_db_context
from .models import User, UserRole, UserToken
from .schemas import UserTokenCreate
SESSION_COOKIE = "session"
SECRET_KEY = "123456789"
def _hash_token(token: str) -> str:
return hashlib.sha256(token.encode()).hexdigest()
async def _verify_token(token: str, role: UserRole | None = None) -> User:
async with get_db_context() as db:
user_token = await UserTokenCrud.first(
session=db,
filters=[UserToken.token_hash == _hash_token(token)],
load_options=[selectinload(UserToken.user)],
)
if user_token is None or not user_token.user.is_active:
raise UnauthorizedError()
if user_token.expires_at and user_token.expires_at < datetime.now(timezone.utc):
raise UnauthorizedError()
user = user_token.user
if role is not None and user.role != role:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
async def _verify_cookie(user_id: str, role: UserRole | None = None) -> User:
async with get_db_context() as db:
user = await UserCrud.first(
session=db,
filters=[User.id == UUID(user_id)],
)
if not user or not user.is_active:
raise UnauthorizedError()
if role is not None and user.role != role:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
bearer_auth = BearerTokenAuth(
validator=_verify_token,
prefix="ctf_",
)
header_auth = APIKeyHeaderAuth(
name="X-API-Key",
validator=_verify_token,
)
cookie_auth = CookieAuth(
name=SESSION_COOKIE,
validator=_verify_cookie,
secret_key=SECRET_KEY,
)
auth = MultiAuth(bearer_auth, header_auth, cookie_auth)
async def create_api_token(
user_id: UUID,
*,
name: str | None = None,
expires_at: datetime | None = None,
) -> tuple[str, UserToken]:
raw = bearer_auth.generate_token()
async with get_db_context() as db:
token_row = await UserTokenCrud.create(
session=db,
obj=UserTokenCreate(
user_id=user_id,
token_hash=_hash_token(raw),
name=name,
expires_at=expires_at,
),
)
return raw, token_row

View File

@@ -1,6 +1,6 @@
[project]
name = "fastapi-toolsets"
version = "3.0.0"
version = "3.0.1"
description = "Production-ready utilities for FastAPI applications"
readme = "README.md"
license = "MIT"
@@ -66,6 +66,7 @@ manager = "fastapi_toolsets.cli.app:cli"
dev = [
{include-group = "tests"},
{include-group = "docs"},
{include-group = "docs-src"},
"fastapi-toolsets[all]",
"prek>=0.3.8",
"ruff>=0.1.0",
@@ -84,6 +85,9 @@ docs = [
"mkdocstrings-python>=2.0.2",
"zensical>=0.0.30",
]
docs-src = [
"bcrypt>=4.0.0",
]
[build-system]
requires = ["uv_build>=0.10,<0.12.0"]

View File

@@ -21,4 +21,4 @@ Example usage:
return Response(data={"user": user.username}, message="Success")
"""
__version__ = "3.0.0"
__version__ = "3.0.1"

View File

@@ -278,6 +278,18 @@ _EQUALITY_TYPES = (String, Integer, Numeric, Date, DateTime, Time, Enum, Uuid)
"""Column types that support equality / IN filtering in build_filter_by."""
def _coerce_bool(value: Any) -> bool:
"""Coerce a string value to a Python bool for Boolean column filtering."""
if isinstance(value, bool):
return value
if isinstance(value, str):
if value.lower() == "true":
return True
if value.lower() == "false":
return False
raise ValueError(f"Cannot coerce {value!r} to bool")
def build_filter_by(
filter_by: dict[str, Any],
facet_fields: Sequence[FacetFieldType],
@@ -324,16 +336,17 @@ def build_filter_by(
added_join_keys.add(rel_key)
col_type = column.property.columns[0].type
if isinstance(col_type, ARRAY):
if isinstance(col_type, Boolean):
coerce = _coerce_bool
if isinstance(value, list):
filters.append(column.in_([coerce(v) for v in value]))
else:
filters.append(column == coerce(value))
elif isinstance(col_type, ARRAY):
if isinstance(value, list):
filters.append(column.overlap(value))
else:
filters.append(column.any(value))
elif isinstance(col_type, Boolean):
if isinstance(value, list):
filters.append(column.in_(value))
else:
filters.append(column.is_(value))
elif isinstance(col_type, _EQUALITY_TYPES):
if isinstance(value, list):
filters.append(column.in_(value))

View File

@@ -0,0 +1,24 @@
"""Authentication helpers for FastAPI using Security()."""
from .abc import AuthSource
from .oauth import (
oauth_build_authorization_redirect,
oauth_decode_state,
oauth_encode_state,
oauth_fetch_userinfo,
oauth_resolve_provider_urls,
)
from .sources import APIKeyHeaderAuth, BearerTokenAuth, CookieAuth, MultiAuth
__all__ = [
"APIKeyHeaderAuth",
"AuthSource",
"BearerTokenAuth",
"CookieAuth",
"MultiAuth",
"oauth_build_authorization_redirect",
"oauth_decode_state",
"oauth_encode_state",
"oauth_fetch_userinfo",
"oauth_resolve_provider_urls",
]

View File

@@ -0,0 +1,53 @@
"""Abstract base class for authentication sources."""
import inspect
from abc import ABC, abstractmethod
from typing import Any, Callable
from fastapi import Request
from fastapi.security import SecurityScopes
from fastapi_toolsets.exceptions import UnauthorizedError
def _ensure_async(fn: Callable[..., Any]) -> Callable[..., Any]:
"""Wrap *fn* so it can always be awaited, caching the coroutine check at init time."""
if inspect.iscoroutinefunction(fn):
return fn
async def wrapper(*args: Any, **kwargs: Any) -> Any:
return fn(*args, **kwargs)
return wrapper
class AuthSource(ABC):
"""Abstract base class for authentication sources."""
def __init__(self) -> None:
"""Set up the default FastAPI dependency signature."""
source = self
async def _call(
request: Request,
security_scopes: SecurityScopes, # noqa: ARG001
) -> Any:
credential = await source.extract(request)
if credential is None:
raise UnauthorizedError()
return await source.authenticate(credential)
self._call_fn: Callable[..., Any] = _call
self.__signature__ = inspect.signature(_call)
@abstractmethod
async def extract(self, request: Request) -> str | None:
"""Extract the raw credential from the request without validating."""
@abstractmethod
async def authenticate(self, credential: str) -> Any:
"""Validate a credential and return the authenticated identity."""
async def __call__(self, **kwargs: Any) -> Any:
"""FastAPI dependency dispatch."""
return await self._call_fn(**kwargs)

View File

@@ -0,0 +1,140 @@
"""OAuth 2.0 / OIDC helper utilities."""
import base64
from typing import Any
from urllib.parse import urlencode
import httpx
from fastapi.responses import RedirectResponse
_discovery_cache: dict[str, dict] = {}
async def oauth_resolve_provider_urls(
discovery_url: str,
) -> tuple[str, str, str | None]:
"""Fetch the OIDC discovery document and return endpoint URLs.
Args:
discovery_url: URL of the provider's ``/.well-known/openid-configuration``.
Returns:
A ``(authorization_url, token_url, userinfo_url)`` tuple.
*userinfo_url* is ``None`` when the provider does not advertise one.
"""
if discovery_url not in _discovery_cache:
async with httpx.AsyncClient() as client:
resp = await client.get(discovery_url)
resp.raise_for_status()
_discovery_cache[discovery_url] = resp.json()
cfg = _discovery_cache[discovery_url]
return (
cfg["authorization_endpoint"],
cfg["token_endpoint"],
cfg.get("userinfo_endpoint"),
)
async def oauth_fetch_userinfo(
*,
token_url: str,
userinfo_url: str,
code: str,
client_id: str,
client_secret: str,
redirect_uri: str,
) -> dict[str, Any]:
"""Exchange an authorization code for tokens and return the userinfo payload.
Performs the two-step OAuth 2.0 / OIDC token exchange:
1. POSTs the authorization *code* to *token_url* to obtain an access token.
2. GETs *userinfo_url* using that access token as a Bearer credential.
Args:
token_url: Provider's token endpoint.
userinfo_url: Provider's userinfo endpoint.
code: Authorization code received from the provider's callback.
client_id: OAuth application client ID.
client_secret: OAuth application client secret.
redirect_uri: Redirect URI that was used in the authorization request.
Returns:
The JSON payload returned by the userinfo endpoint as a plain ``dict``.
"""
async with httpx.AsyncClient() as client:
token_resp = await client.post(
token_url,
data={
"grant_type": "authorization_code",
"code": code,
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
},
headers={"Accept": "application/json"},
)
token_resp.raise_for_status()
access_token = token_resp.json()["access_token"]
userinfo_resp = await client.get(
userinfo_url,
headers={"Authorization": f"Bearer {access_token}"},
)
userinfo_resp.raise_for_status()
return userinfo_resp.json()
def oauth_build_authorization_redirect(
authorization_url: str,
*,
client_id: str,
scopes: str,
redirect_uri: str,
destination: str,
) -> RedirectResponse:
"""Return an OAuth 2.0 authorization ``RedirectResponse``.
Args:
authorization_url: Provider's authorization endpoint.
client_id: OAuth application client ID.
scopes: Space-separated list of requested scopes.
redirect_uri: URI the provider should redirect back to after authorization.
destination: URL the user should be sent to after the full OAuth flow
completes (encoded as ``state``).
Returns:
A :class:`~fastapi.responses.RedirectResponse` to the provider's
authorization page.
"""
params = urlencode(
{
"client_id": client_id,
"response_type": "code",
"scope": scopes,
"redirect_uri": redirect_uri,
"state": oauth_encode_state(destination),
}
)
return RedirectResponse(f"{authorization_url}?{params}")
def oauth_encode_state(url: str) -> str:
"""Base64url-encode a URL to embed as an OAuth ``state`` parameter."""
return base64.urlsafe_b64encode(url.encode()).decode()
def oauth_decode_state(state: str | None, *, fallback: str) -> str:
"""Decode a base64url OAuth ``state`` parameter.
Handles missing padding (some providers strip ``=``).
Returns *fallback* if *state* is absent, the literal string ``"null"``,
or cannot be decoded.
"""
if not state or state == "null":
return fallback
try:
padded = state + "=" * (4 - len(state) % 4)
return base64.urlsafe_b64decode(padded).decode()
except Exception:
return fallback

View File

@@ -0,0 +1,8 @@
"""Built-in authentication source implementations."""
from .header import APIKeyHeaderAuth
from .bearer import BearerTokenAuth
from .cookie import CookieAuth
from .multi import MultiAuth
__all__ = ["APIKeyHeaderAuth", "BearerTokenAuth", "CookieAuth", "MultiAuth"]

View File

@@ -0,0 +1,120 @@
"""Bearer token authentication source."""
import inspect
import secrets
from typing import Annotated, Any, Callable
from fastapi import Depends
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer, SecurityScopes
from fastapi_toolsets.exceptions import UnauthorizedError
from ..abc import AuthSource, _ensure_async
class BearerTokenAuth(AuthSource):
"""Bearer token authentication source.
Wraps :class:`fastapi.security.HTTPBearer` for OpenAPI documentation.
The validator is called as ``await validator(credential, **kwargs)``
where ``kwargs`` are the extra keyword arguments provided at instantiation.
Args:
validator: Sync or async callable that receives the credential and any
extra keyword arguments, and returns the authenticated identity
(e.g. a ``User`` model). Should raise
:class:`~fastapi_toolsets.exceptions.UnauthorizedError` on failure.
prefix: Optional token prefix (e.g. ``"user_"``). If set, only tokens
whose value starts with this prefix are matched. The prefix is
**kept** in the value passed to the validator — store and compare
tokens with their prefix included. Use :meth:`generate_token` to
create correctly-prefixed tokens. This enables multiple
``BearerTokenAuth`` instances in the same app (e.g. ``"user_"``
for user tokens, ``"org_"`` for org tokens).
**kwargs: Extra keyword arguments forwarded to the validator on every
call (e.g. ``role=Role.ADMIN``).
"""
def __init__(
self,
validator: Callable[..., Any],
*,
prefix: str | None = None,
**kwargs: Any,
) -> None:
self._validator = _ensure_async(validator)
self._prefix = prefix
self._kwargs = kwargs
self._scheme = HTTPBearer(auto_error=False)
async def _call(
security_scopes: SecurityScopes, # noqa: ARG001
credentials: Annotated[
HTTPAuthorizationCredentials | None, Depends(self._scheme)
] = None,
) -> Any:
if credentials is None:
raise UnauthorizedError()
return await self._validate(credentials.credentials)
self._call_fn = _call
self.__signature__ = inspect.signature(_call)
async def _validate(self, token: str) -> Any:
"""Check prefix and call the validator."""
if self._prefix is not None and not token.startswith(self._prefix):
raise UnauthorizedError()
return await self._validator(token, **self._kwargs)
async def extract(self, request: Any) -> str | None:
"""Extract the raw credential from the request without validating.
Returns ``None`` if no ``Authorization: Bearer`` header is present,
the token is empty, or the token does not match the configured prefix.
The prefix is included in the returned value.
"""
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
return None
token = auth[7:]
if not token:
return None
if self._prefix is not None and not token.startswith(self._prefix):
return None
return token
async def authenticate(self, credential: str) -> Any:
"""Validate a credential and return the identity.
Calls ``await validator(credential, **kwargs)`` where ``kwargs`` are
the extra keyword arguments provided at instantiation.
"""
return await self._validate(credential)
def require(self, **kwargs: Any) -> "BearerTokenAuth":
"""Return a new instance with additional (or overriding) validator kwargs."""
return BearerTokenAuth(
self._validator,
prefix=self._prefix,
**{**self._kwargs, **kwargs},
)
def generate_token(self, nbytes: int = 32) -> str:
"""Generate a secure random token for this auth source.
Returns a URL-safe random token. If a prefix is configured it is
prepended — the returned value is what you store in your database
and return to the client as-is.
Args:
nbytes: Number of random bytes before base64 encoding. The
resulting string is ``ceil(nbytes * 4 / 3)`` characters
(43 chars for the default 32 bytes). Defaults to 32.
Returns:
A ready-to-use token string (e.g. ``"user_Xk3..."``).
"""
token = secrets.token_urlsafe(nbytes)
if self._prefix is not None:
return f"{self._prefix}{token}"
return token

View File

@@ -0,0 +1,139 @@
"""Cookie-based authentication source."""
import base64
import hashlib
import hmac
import inspect
import json
import time
from typing import Annotated, Any, Callable
from fastapi import Depends, Request, Response
from fastapi.security import APIKeyCookie, SecurityScopes
from fastapi_toolsets.exceptions import UnauthorizedError
from ..abc import AuthSource, _ensure_async
class CookieAuth(AuthSource):
"""Cookie-based authentication source.
Wraps :class:`fastapi.security.APIKeyCookie` for OpenAPI documentation.
Optionally signs the cookie with HMAC-SHA256 to provide stateless, tamper-
proof sessions without any database entry.
Args:
name: Cookie name.
validator: Sync or async callable that receives the cookie value
(plain, after signature verification when ``secret_key`` is set)
and any extra keyword arguments, and returns the authenticated
identity.
secret_key: When provided, the cookie is HMAC-SHA256 signed.
:meth:`set_cookie` embeds an expiry and signs the payload;
:meth:`extract` verifies the signature and expiry before handing
the plain value to the validator. When ``None`` (default), the raw
cookie value is passed to the validator as-is.
ttl: Cookie lifetime in seconds (default 24 h). Only used when
``secret_key`` is set.
**kwargs: Extra keyword arguments forwarded to the validator on every
call (e.g. ``role=Role.ADMIN``).
"""
def __init__(
self,
name: str,
validator: Callable[..., Any],
*,
secret_key: str | None = None,
ttl: int = 86400,
**kwargs: Any,
) -> None:
self._name = name
self._validator = _ensure_async(validator)
self._secret_key = secret_key
self._ttl = ttl
self._kwargs = kwargs
self._scheme = APIKeyCookie(name=name, auto_error=False)
async def _call(
security_scopes: SecurityScopes, # noqa: ARG001
value: Annotated[str | None, Depends(self._scheme)] = None,
) -> Any:
if value is None:
raise UnauthorizedError()
plain = self._verify(value)
return await self._validator(plain, **self._kwargs)
self._call_fn = _call
self.__signature__ = inspect.signature(_call)
def _hmac(self, data: str) -> str:
if self._secret_key is None:
raise RuntimeError("_hmac called without secret_key configured")
return hmac.new(
self._secret_key.encode(), data.encode(), hashlib.sha256
).hexdigest()
def _sign(self, value: str) -> str:
data = base64.urlsafe_b64encode(
json.dumps({"v": value, "exp": int(time.time()) + self._ttl}).encode()
).decode()
return f"{data}.{self._hmac(data)}"
def _verify(self, cookie_value: str) -> str:
"""Return the plain value, verifying HMAC + expiry when signed."""
if not self._secret_key:
return cookie_value
try:
data, sig = cookie_value.rsplit(".", 1)
except ValueError:
raise UnauthorizedError()
if not hmac.compare_digest(self._hmac(data), sig):
raise UnauthorizedError()
try:
payload = json.loads(base64.urlsafe_b64decode(data))
value: str = payload["v"]
exp: int = payload["exp"]
except Exception:
raise UnauthorizedError()
if exp < int(time.time()):
raise UnauthorizedError()
return value
async def extract(self, request: Request) -> str | None:
return request.cookies.get(self._name)
async def authenticate(self, credential: str) -> Any:
plain = self._verify(credential)
return await self._validator(plain, **self._kwargs)
def require(self, **kwargs: Any) -> "CookieAuth":
"""Return a new instance with additional (or overriding) validator kwargs."""
return CookieAuth(
self._name,
self._validator,
secret_key=self._secret_key,
ttl=self._ttl,
**{**self._kwargs, **kwargs},
)
def set_cookie(self, response: Response, value: str) -> None:
"""Attach the cookie to *response*, signing it when ``secret_key`` is set."""
cookie_value = self._sign(value) if self._secret_key else value
response.set_cookie(
self._name,
cookie_value,
httponly=True,
samesite="lax",
max_age=self._ttl,
)
def delete_cookie(self, response: Response) -> None:
"""Clear the session cookie (logout)."""
response.delete_cookie(self._name, httponly=True, samesite="lax")

View File

@@ -0,0 +1,67 @@
"""API key header authentication source."""
import inspect
from typing import Annotated, Any, Callable
from fastapi import Depends, Request
from fastapi.security import APIKeyHeader, SecurityScopes
from fastapi_toolsets.exceptions import UnauthorizedError
from ..abc import AuthSource, _ensure_async
class APIKeyHeaderAuth(AuthSource):
"""API key header authentication source.
Wraps :class:`fastapi.security.APIKeyHeader` for OpenAPI documentation.
The validator is called as ``await validator(api_key, **kwargs)``
where ``kwargs`` are the extra keyword arguments provided at instantiation.
Args:
name: HTTP header name that carries the API key (e.g. ``"X-API-Key"``).
validator: Sync or async callable that receives the API key and any
extra keyword arguments, and returns the authenticated identity.
Should raise :class:`~fastapi_toolsets.exceptions.UnauthorizedError`
on failure.
**kwargs: Extra keyword arguments forwarded to the validator on every
call (e.g. ``role=Role.ADMIN``).
"""
def __init__(
self,
name: str,
validator: Callable[..., Any],
**kwargs: Any,
) -> None:
self._name = name
self._validator = _ensure_async(validator)
self._kwargs = kwargs
self._scheme = APIKeyHeader(name=name, auto_error=False)
async def _call(
security_scopes: SecurityScopes, # noqa: ARG001
api_key: Annotated[str | None, Depends(self._scheme)] = None,
) -> Any:
if api_key is None:
raise UnauthorizedError()
return await self._validator(api_key, **self._kwargs)
self._call_fn = _call
self.__signature__ = inspect.signature(_call)
async def extract(self, request: Request) -> str | None:
"""Extract the API key from the configured header."""
return request.headers.get(self._name) or None
async def authenticate(self, credential: str) -> Any:
"""Validate a credential and return the identity."""
return await self._validator(credential, **self._kwargs)
def require(self, **kwargs: Any) -> "APIKeyHeaderAuth":
"""Return a new instance with additional (or overriding) validator kwargs."""
return APIKeyHeaderAuth(
self._name,
self._validator,
**{**self._kwargs, **kwargs},
)

View File

@@ -0,0 +1,119 @@
"""MultiAuth: combine multiple authentication sources into a single callable."""
import inspect
from typing import Any, cast
from fastapi import Request
from fastapi.security import SecurityScopes
from fastapi_toolsets.exceptions import UnauthorizedError
from ..abc import AuthSource
class MultiAuth:
"""Combine multiple authentication sources into a single callable.
Sources are tried in order; the first one whose
:meth:`~AuthSource.extract` returns a non-``None`` credential wins.
Its :meth:`~AuthSource.authenticate` is called and the result returned.
If a credential is found but the validator raises, the exception propagates
immediately — the remaining sources are **not** tried. This prevents
silent fallthrough on invalid credentials.
If no source provides a credential,
:class:`~fastapi_toolsets.exceptions.UnauthorizedError` is raised.
The :meth:`~AuthSource.extract` method of each source performs only
string matching (no I/O), so prefix-based dispatch is essentially free.
Any :class:`~AuthSource` subclass — including user-defined ones — can be
passed as a source.
Args:
*sources: Auth source instances to try in order.
Example::
user_bearer = BearerTokenAuth(verify_user, prefix="user_")
org_bearer = BearerTokenAuth(verify_org, prefix="org_")
cookie = CookieAuth("session", verify_session)
multi = MultiAuth(user_bearer, org_bearer, cookie)
@app.get("/data")
async def data_route(user = Security(multi)):
return user
# Apply a shared requirement to all sources at once
@app.get("/admin")
async def admin_route(user = Security(multi.require(role=Role.ADMIN))):
return user
"""
def __init__(self, *sources: AuthSource) -> None:
self._sources = sources
async def _call(
request: Request,
security_scopes: SecurityScopes, # noqa: ARG001
**kwargs: Any, # noqa: ARG001 — absorbs scheme values injected by FastAPI
) -> Any:
for source in self._sources:
credential = await source.extract(request)
if credential is not None:
return await source.authenticate(credential)
raise UnauthorizedError()
self._call_fn = _call
# Build a merged signature that includes the security-scheme Depends()
# parameters from every source so FastAPI registers them in OpenAPI docs.
seen: set[str] = {"request", "security_scopes"}
merged: list[inspect.Parameter] = [
inspect.Parameter(
"request",
inspect.Parameter.POSITIONAL_OR_KEYWORD,
annotation=Request,
),
inspect.Parameter(
"security_scopes",
inspect.Parameter.POSITIONAL_OR_KEYWORD,
annotation=SecurityScopes,
),
]
for i, source in enumerate(sources):
for name, param in inspect.signature(source).parameters.items():
if name in seen:
continue
merged.append(param.replace(name=f"_s{i}_{name}"))
seen.add(name)
self.__signature__ = inspect.Signature(merged, return_annotation=Any)
async def __call__(self, **kwargs: Any) -> Any:
return await self._call_fn(**kwargs)
def require(self, **kwargs: Any) -> "MultiAuth":
"""Return a new :class:`MultiAuth` with kwargs forwarded to each source.
Calls ``.require(**kwargs)`` on every source that supports it. Sources
that do not implement ``.require()`` (e.g. custom :class:`~AuthSource`
subclasses) are passed through unchanged.
New kwargs are merged over each source's existing kwargs — new values
win on conflict::
multi = MultiAuth(bearer, cookie)
@app.get("/admin")
async def admin(user = Security(multi.require(role=Role.ADMIN))):
return user
"""
new_sources = tuple(
cast(Any, source).require(**kwargs)
if hasattr(source, "require")
else source
for source in self._sources
)
return MultiAuth(*new_sources)

View File

@@ -15,7 +15,7 @@ ModelType = TypeVar("ModelType", bound=DeclarativeBase)
SchemaType = TypeVar("SchemaType", bound=BaseModel)
# CRUD type aliases
JoinType = list[tuple[type[DeclarativeBase], Any]]
JoinType = list[tuple[type[DeclarativeBase] | Any, Any]]
M2MFieldType = Mapping[str, QueryableAttribute[Any]]
OrderByClause = ColumnElement[Any] | QueryableAttribute[Any]

View File

@@ -139,6 +139,17 @@ class Post(Base):
tags: Mapped[list[Tag]] = relationship(secondary=post_tags)
class Transfer(Base):
"""Test model with two FKs to the same table (users)."""
__tablename__ = "transfers"
id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)
amount: Mapped[str] = mapped_column(String(50))
sender_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"))
receiver_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"))
class Article(Base):
"""Test article model with ARRAY and JSON columns."""
@@ -300,6 +311,23 @@ class ArticleRead(PydanticBase):
labels: list[str]
class TransferCreate(BaseModel):
"""Schema for creating a transfer."""
id: uuid.UUID | None = None
amount: str
sender_id: uuid.UUID
receiver_id: uuid.UUID
class TransferRead(PydanticBase):
"""Schema for reading a transfer."""
id: uuid.UUID
amount: str
TransferCrud = CrudFactory(Transfer)
ArticleCrud = CrudFactory(Article)
RoleCrud = CrudFactory(Role)
RoleCursorCrud = CrudFactory(Role, cursor_column=Role.id)

View File

@@ -38,6 +38,10 @@ from .conftest import (
Tag,
TagCreate,
TagCrud,
Transfer,
TransferCreate,
TransferCrud,
TransferRead,
User,
UserCreate,
UserCrud,
@@ -1282,6 +1286,128 @@ class TestCrudJoins:
assert users[0].username == "multi_join"
class TestCrudAliasedJoins:
"""Tests for CRUD operations with aliased joins (same table joined twice)."""
@pytest.mark.anyio
async def test_get_multi_with_aliased_joins(self, db_session: AsyncSession):
"""Aliased joins allow joining the same table twice."""
from sqlalchemy.orm import aliased
alice = await UserCrud.create(
db_session, UserCreate(username="alice", email="alice@test.com")
)
bob = await UserCrud.create(
db_session, UserCreate(username="bob", email="bob@test.com")
)
await TransferCrud.create(
db_session,
TransferCreate(amount="100", sender_id=alice.id, receiver_id=bob.id),
)
Sender = aliased(User)
Receiver = aliased(User)
results = await TransferCrud.get_multi(
db_session,
joins=[
(Sender, Transfer.sender_id == Sender.id),
(Receiver, Transfer.receiver_id == Receiver.id),
],
filters=[Sender.username == "alice", Receiver.username == "bob"],
)
assert len(results) == 1
assert results[0].amount == "100"
@pytest.mark.anyio
async def test_get_multi_aliased_no_match(self, db_session: AsyncSession):
"""Aliased joins correctly filter out non-matching rows."""
from sqlalchemy.orm import aliased
alice = await UserCrud.create(
db_session, UserCreate(username="alice", email="alice@test.com")
)
bob = await UserCrud.create(
db_session, UserCreate(username="bob", email="bob@test.com")
)
await TransferCrud.create(
db_session,
TransferCreate(amount="100", sender_id=alice.id, receiver_id=bob.id),
)
Sender = aliased(User)
Receiver = aliased(User)
# bob is receiver, not sender — should return nothing
results = await TransferCrud.get_multi(
db_session,
joins=[
(Sender, Transfer.sender_id == Sender.id),
(Receiver, Transfer.receiver_id == Receiver.id),
],
filters=[Sender.username == "bob", Receiver.username == "alice"],
)
assert len(results) == 0
@pytest.mark.anyio
async def test_paginate_with_aliased_joins(self, db_session: AsyncSession):
"""Aliased joins work with offset_paginate."""
from sqlalchemy.orm import aliased
alice = await UserCrud.create(
db_session, UserCreate(username="alice", email="alice@test.com")
)
bob = await UserCrud.create(
db_session, UserCreate(username="bob", email="bob@test.com")
)
await TransferCrud.create(
db_session,
TransferCreate(amount="50", sender_id=alice.id, receiver_id=bob.id),
)
await TransferCrud.create(
db_session,
TransferCreate(amount="75", sender_id=bob.id, receiver_id=alice.id),
)
Sender = aliased(User)
result = await TransferCrud.offset_paginate(
db_session,
joins=[(Sender, Transfer.sender_id == Sender.id)],
filters=[Sender.username == "alice"],
schema=TransferRead,
)
assert result.pagination.total_count == 1
assert result.data[0].amount == "50"
@pytest.mark.anyio
async def test_count_with_aliased_join(self, db_session: AsyncSession):
"""Aliased joins work with count."""
from sqlalchemy.orm import aliased
alice = await UserCrud.create(
db_session, UserCreate(username="alice", email="alice@test.com")
)
bob = await UserCrud.create(
db_session, UserCreate(username="bob", email="bob@test.com")
)
await TransferCrud.create(
db_session,
TransferCreate(amount="10", sender_id=alice.id, receiver_id=bob.id),
)
await TransferCrud.create(
db_session,
TransferCreate(amount="20", sender_id=alice.id, receiver_id=bob.id),
)
Sender = aliased(User)
count = await TransferCrud.count(
db_session,
joins=[(Sender, Transfer.sender_id == Sender.id)],
filters=[Sender.username == "alice"],
)
assert count == 2
class TestCrudFactoryM2M:
"""Tests for CrudFactory with m2m_fields parameter."""

View File

@@ -971,7 +971,7 @@ class TestFilterBy:
@pytest.mark.anyio
async def test_bool_filter_false(self, db_session: AsyncSession):
"""filter_by with a boolean False value correctly filters rows."""
"""filter_by with a string 'false' value correctly filters rows."""
UserBoolCrud = CrudFactory(User, facet_fields=[User.is_active])
await UserCrud.create(
db_session, UserCreate(username="alice", email="a@test.com", is_active=True)
@@ -982,7 +982,7 @@ class TestFilterBy:
)
result = await UserBoolCrud.offset_paginate(
db_session, filter_by={"is_active": False}, schema=UserRead
db_session, filter_by={"is_active": "false"}, schema=UserRead
)
assert isinstance(result.pagination, OffsetPagination)
@@ -991,7 +991,7 @@ class TestFilterBy:
@pytest.mark.anyio
async def test_bool_filter_true(self, db_session: AsyncSession):
"""filter_by with a boolean True value correctly filters rows."""
"""filter_by with a string 'true' value correctly filters rows."""
UserBoolCrud = CrudFactory(User, facet_fields=[User.is_active])
await UserCrud.create(
db_session, UserCreate(username="alice", email="a@test.com", is_active=True)
@@ -1002,7 +1002,7 @@ class TestFilterBy:
)
result = await UserBoolCrud.offset_paginate(
db_session, filter_by={"is_active": True}, schema=UserRead
db_session, filter_by={"is_active": "true"}, schema=UserRead
)
assert isinstance(result.pagination, OffsetPagination)
@@ -1011,7 +1011,7 @@ class TestFilterBy:
@pytest.mark.anyio
async def test_bool_filter_list(self, db_session: AsyncSession):
"""filter_by with a list of booleans produces an IN clause."""
"""filter_by with a list of string booleans produces an IN clause."""
UserBoolCrud = CrudFactory(User, facet_fields=[User.is_active])
await UserCrud.create(
db_session, UserCreate(username="alice", email="a@test.com", is_active=True)
@@ -1022,12 +1022,41 @@ class TestFilterBy:
)
result = await UserBoolCrud.offset_paginate(
db_session, filter_by={"is_active": [True, False]}, schema=UserRead
db_session, filter_by={"is_active": ["true", "false"]}, schema=UserRead
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 2
@pytest.mark.anyio
async def test_bool_filter_native_bool(self, db_session: AsyncSession):
"""filter_by with a native Python bool passes through coercion."""
UserBoolCrud = CrudFactory(User, facet_fields=[User.is_active])
await UserCrud.create(
db_session, UserCreate(username="alice", email="a@test.com", is_active=True)
)
result = await UserBoolCrud.offset_paginate(
db_session, filter_by={"is_active": True}, schema=UserRead
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 1
def test_bool_coerce_invalid_value(self):
"""_coerce_bool raises ValueError for non-bool, non-string values."""
from fastapi_toolsets.crud.search import _coerce_bool
with pytest.raises(ValueError, match="Cannot coerce"):
_coerce_bool(42)
def test_bool_coerce_invalid_string(self):
"""_coerce_bool raises ValueError for unrecognized string values."""
from fastapi_toolsets.crud.search import _coerce_bool
with pytest.raises(ValueError, match="Cannot coerce"):
_coerce_bool("maybe")
@pytest.mark.anyio
async def test_array_contains_single_value(self, db_session: AsyncSession):
"""filter_by on an ARRAY column with a scalar checks containment."""

1180
tests/test_security.py Normal file

File diff suppressed because it is too large Load Diff

78
uv.lock generated
View File

@@ -81,6 +81,76 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/3c/d7/8fb3044eaef08a310acfe23dae9a8e2e07d305edc29a53497e52bc76eca7/asyncpg-0.31.0-cp314-cp314t-win_amd64.whl", hash = "sha256:bd4107bb7cdd0e9e65fae66a62afd3a249663b844fa34d479f6d5b3bef9c04c3", size = 706062, upload-time = "2025-11-24T23:26:44.086Z" },
]
[[package]]
name = "bcrypt"
version = "5.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d4/36/3329e2518d70ad8e2e5817d5a4cac6bba05a47767ec416c7d020a965f408/bcrypt-5.0.0.tar.gz", hash = "sha256:f748f7c2d6fd375cc93d3fba7ef4a9e3a092421b8dbf34d8d4dc06be9492dfdd", size = 25386, upload-time = "2025-09-25T19:50:47.829Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/13/85/3e65e01985fddf25b64ca67275bb5bdb4040bd1a53b66d355c6c37c8a680/bcrypt-5.0.0-cp313-cp313t-macosx_10_12_universal2.whl", hash = "sha256:f3c08197f3039bec79cee59a606d62b96b16669cff3949f21e74796b6e3cd2be", size = 481806, upload-time = "2025-09-25T19:49:05.102Z" },
{ url = "https://files.pythonhosted.org/packages/44/dc/01eb79f12b177017a726cbf78330eb0eb442fae0e7b3dfd84ea2849552f3/bcrypt-5.0.0-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:200af71bc25f22006f4069060c88ed36f8aa4ff7f53e67ff04d2ab3f1e79a5b2", size = 268626, upload-time = "2025-09-25T19:49:06.723Z" },
{ url = "https://files.pythonhosted.org/packages/8c/cf/e82388ad5959c40d6afd94fb4743cc077129d45b952d46bdc3180310e2df/bcrypt-5.0.0-cp313-cp313t-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:baade0a5657654c2984468efb7d6c110db87ea63ef5a4b54732e7e337253e44f", size = 271853, upload-time = "2025-09-25T19:49:08.028Z" },
{ url = "https://files.pythonhosted.org/packages/ec/86/7134b9dae7cf0efa85671651341f6afa695857fae172615e960fb6a466fa/bcrypt-5.0.0-cp313-cp313t-manylinux_2_28_aarch64.whl", hash = "sha256:c58b56cdfb03202b3bcc9fd8daee8e8e9b6d7e3163aa97c631dfcfcc24d36c86", size = 269793, upload-time = "2025-09-25T19:49:09.727Z" },
{ url = "https://files.pythonhosted.org/packages/cc/82/6296688ac1b9e503d034e7d0614d56e80c5d1a08402ff856a4549cb59207/bcrypt-5.0.0-cp313-cp313t-manylinux_2_28_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:4bfd2a34de661f34d0bda43c3e4e79df586e4716ef401fe31ea39d69d581ef23", size = 289930, upload-time = "2025-09-25T19:49:11.204Z" },
{ url = "https://files.pythonhosted.org/packages/d1/18/884a44aa47f2a3b88dd09bc05a1e40b57878ecd111d17e5bba6f09f8bb77/bcrypt-5.0.0-cp313-cp313t-manylinux_2_28_x86_64.whl", hash = "sha256:ed2e1365e31fc73f1825fa830f1c8f8917ca1b3ca6185773b349c20fd606cec2", size = 272194, upload-time = "2025-09-25T19:49:12.524Z" },
{ url = "https://files.pythonhosted.org/packages/0e/8f/371a3ab33c6982070b674f1788e05b656cfbf5685894acbfef0c65483a59/bcrypt-5.0.0-cp313-cp313t-manylinux_2_34_aarch64.whl", hash = "sha256:83e787d7a84dbbfba6f250dd7a5efd689e935f03dd83b0f919d39349e1f23f83", size = 269381, upload-time = "2025-09-25T19:49:14.308Z" },
{ url = "https://files.pythonhosted.org/packages/b1/34/7e4e6abb7a8778db6422e88b1f06eb07c47682313997ee8a8f9352e5a6f1/bcrypt-5.0.0-cp313-cp313t-manylinux_2_34_x86_64.whl", hash = "sha256:137c5156524328a24b9fac1cb5db0ba618bc97d11970b39184c1d87dc4bf1746", size = 271750, upload-time = "2025-09-25T19:49:15.584Z" },
{ url = "https://files.pythonhosted.org/packages/c0/1b/54f416be2499bd72123c70d98d36c6cd61a4e33d9b89562c22481c81bb30/bcrypt-5.0.0-cp313-cp313t-musllinux_1_1_aarch64.whl", hash = "sha256:38cac74101777a6a7d3b3e3cfefa57089b5ada650dce2baf0cbdd9d65db22a9e", size = 303757, upload-time = "2025-09-25T19:49:17.244Z" },
{ url = "https://files.pythonhosted.org/packages/13/62/062c24c7bcf9d2826a1a843d0d605c65a755bc98002923d01fd61270705a/bcrypt-5.0.0-cp313-cp313t-musllinux_1_1_x86_64.whl", hash = "sha256:d8d65b564ec849643d9f7ea05c6d9f0cd7ca23bdd4ac0c2dbef1104ab504543d", size = 306740, upload-time = "2025-09-25T19:49:18.693Z" },
{ url = "https://files.pythonhosted.org/packages/d5/c8/1fdbfc8c0f20875b6b4020f3c7dc447b8de60aa0be5faaf009d24242aec9/bcrypt-5.0.0-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:741449132f64b3524e95cd30e5cd3343006ce146088f074f31ab26b94e6c75ba", size = 334197, upload-time = "2025-09-25T19:49:20.523Z" },
{ url = "https://files.pythonhosted.org/packages/a6/c1/8b84545382d75bef226fbc6588af0f7b7d095f7cd6a670b42a86243183cd/bcrypt-5.0.0-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:212139484ab3207b1f0c00633d3be92fef3c5f0af17cad155679d03ff2ee1e41", size = 352974, upload-time = "2025-09-25T19:49:22.254Z" },
{ url = "https://files.pythonhosted.org/packages/10/a6/ffb49d4254ed085e62e3e5dd05982b4393e32fe1e49bb1130186617c29cd/bcrypt-5.0.0-cp313-cp313t-win32.whl", hash = "sha256:9d52ed507c2488eddd6a95bccee4e808d3234fa78dd370e24bac65a21212b861", size = 148498, upload-time = "2025-09-25T19:49:24.134Z" },
{ url = "https://files.pythonhosted.org/packages/48/a9/259559edc85258b6d5fc5471a62a3299a6aa37a6611a169756bf4689323c/bcrypt-5.0.0-cp313-cp313t-win_amd64.whl", hash = "sha256:f6984a24db30548fd39a44360532898c33528b74aedf81c26cf29c51ee47057e", size = 145853, upload-time = "2025-09-25T19:49:25.702Z" },
{ url = "https://files.pythonhosted.org/packages/2d/df/9714173403c7e8b245acf8e4be8876aac64a209d1b392af457c79e60492e/bcrypt-5.0.0-cp313-cp313t-win_arm64.whl", hash = "sha256:9fffdb387abe6aa775af36ef16f55e318dcda4194ddbf82007a6f21da29de8f5", size = 139626, upload-time = "2025-09-25T19:49:26.928Z" },
{ url = "https://files.pythonhosted.org/packages/f8/14/c18006f91816606a4abe294ccc5d1e6f0e42304df5a33710e9e8e95416e1/bcrypt-5.0.0-cp314-cp314t-macosx_10_12_universal2.whl", hash = "sha256:4870a52610537037adb382444fefd3706d96d663ac44cbb2f37e3919dca3d7ef", size = 481862, upload-time = "2025-09-25T19:49:28.365Z" },
{ url = "https://files.pythonhosted.org/packages/67/49/dd074d831f00e589537e07a0725cf0e220d1f0d5d8e85ad5bbff251c45aa/bcrypt-5.0.0-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:48f753100931605686f74e27a7b49238122aa761a9aefe9373265b8b7aa43ea4", size = 268544, upload-time = "2025-09-25T19:49:30.39Z" },
{ url = "https://files.pythonhosted.org/packages/f5/91/50ccba088b8c474545b034a1424d05195d9fcbaaf802ab8bfe2be5a4e0d7/bcrypt-5.0.0-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:f70aadb7a809305226daedf75d90379c397b094755a710d7014b8b117df1ebbf", size = 271787, upload-time = "2025-09-25T19:49:32.144Z" },
{ url = "https://files.pythonhosted.org/packages/aa/e7/d7dba133e02abcda3b52087a7eea8c0d4f64d3e593b4fffc10c31b7061f3/bcrypt-5.0.0-cp314-cp314t-manylinux_2_28_aarch64.whl", hash = "sha256:744d3c6b164caa658adcb72cb8cc9ad9b4b75c7db507ab4bc2480474a51989da", size = 269753, upload-time = "2025-09-25T19:49:33.885Z" },
{ url = "https://files.pythonhosted.org/packages/33/fc/5b145673c4b8d01018307b5c2c1fc87a6f5a436f0ad56607aee389de8ee3/bcrypt-5.0.0-cp314-cp314t-manylinux_2_28_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:a28bc05039bdf3289d757f49d616ab3efe8cf40d8e8001ccdd621cd4f98f4fc9", size = 289587, upload-time = "2025-09-25T19:49:35.144Z" },
{ url = "https://files.pythonhosted.org/packages/27/d7/1ff22703ec6d4f90e62f1a5654b8867ef96bafb8e8102c2288333e1a6ca6/bcrypt-5.0.0-cp314-cp314t-manylinux_2_28_x86_64.whl", hash = "sha256:7f277a4b3390ab4bebe597800a90da0edae882c6196d3038a73adf446c4f969f", size = 272178, upload-time = "2025-09-25T19:49:36.793Z" },
{ url = "https://files.pythonhosted.org/packages/c8/88/815b6d558a1e4d40ece04a2f84865b0fef233513bd85fd0e40c294272d62/bcrypt-5.0.0-cp314-cp314t-manylinux_2_34_aarch64.whl", hash = "sha256:79cfa161eda8d2ddf29acad370356b47f02387153b11d46042e93a0a95127493", size = 269295, upload-time = "2025-09-25T19:49:38.164Z" },
{ url = "https://files.pythonhosted.org/packages/51/8c/e0db387c79ab4931fc89827d37608c31cc57b6edc08ccd2386139028dc0d/bcrypt-5.0.0-cp314-cp314t-manylinux_2_34_x86_64.whl", hash = "sha256:a5393eae5722bcef046a990b84dff02b954904c36a194f6cfc817d7dca6c6f0b", size = 271700, upload-time = "2025-09-25T19:49:39.917Z" },
{ url = "https://files.pythonhosted.org/packages/06/83/1570edddd150f572dbe9fc00f6203a89fc7d4226821f67328a85c330f239/bcrypt-5.0.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:7f4c94dec1b5ab5d522750cb059bb9409ea8872d4494fd152b53cca99f1ddd8c", size = 334034, upload-time = "2025-09-25T19:49:41.227Z" },
{ url = "https://files.pythonhosted.org/packages/c9/f2/ea64e51a65e56ae7a8a4ec236c2bfbdd4b23008abd50ac33fbb2d1d15424/bcrypt-5.0.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:0cae4cb350934dfd74c020525eeae0a5f79257e8a201c0c176f4b84fdbf2a4b4", size = 352766, upload-time = "2025-09-25T19:49:43.08Z" },
{ url = "https://files.pythonhosted.org/packages/d7/d4/1a388d21ee66876f27d1a1f41287897d0c0f1712ef97d395d708ba93004c/bcrypt-5.0.0-cp314-cp314t-win32.whl", hash = "sha256:b17366316c654e1ad0306a6858e189fc835eca39f7eb2cafd6aaca8ce0c40a2e", size = 152449, upload-time = "2025-09-25T19:49:44.971Z" },
{ url = "https://files.pythonhosted.org/packages/3f/61/3291c2243ae0229e5bca5d19f4032cecad5dfb05a2557169d3a69dc0ba91/bcrypt-5.0.0-cp314-cp314t-win_amd64.whl", hash = "sha256:92864f54fb48b4c718fc92a32825d0e42265a627f956bc0361fe869f1adc3e7d", size = 149310, upload-time = "2025-09-25T19:49:46.162Z" },
{ url = "https://files.pythonhosted.org/packages/3e/89/4b01c52ae0c1a681d4021e5dd3e45b111a8fb47254a274fa9a378d8d834b/bcrypt-5.0.0-cp314-cp314t-win_arm64.whl", hash = "sha256:dd19cf5184a90c873009244586396a6a884d591a5323f0e8a5922560718d4993", size = 143761, upload-time = "2025-09-25T19:49:47.345Z" },
{ url = "https://files.pythonhosted.org/packages/84/29/6237f151fbfe295fe3e074ecc6d44228faa1e842a81f6d34a02937ee1736/bcrypt-5.0.0-cp38-abi3-macosx_10_12_universal2.whl", hash = "sha256:fc746432b951e92b58317af8e0ca746efe93e66555f1b40888865ef5bf56446b", size = 494553, upload-time = "2025-09-25T19:49:49.006Z" },
{ url = "https://files.pythonhosted.org/packages/45/b6/4c1205dde5e464ea3bd88e8742e19f899c16fa8916fb8510a851fae985b5/bcrypt-5.0.0-cp38-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:c2388ca94ffee269b6038d48747f4ce8df0ffbea43f31abfa18ac72f0218effb", size = 275009, upload-time = "2025-09-25T19:49:50.581Z" },
{ url = "https://files.pythonhosted.org/packages/3b/71/427945e6ead72ccffe77894b2655b695ccf14ae1866cd977e185d606dd2f/bcrypt-5.0.0-cp38-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:560ddb6ec730386e7b3b26b8b4c88197aaed924430e7b74666a586ac997249ef", size = 278029, upload-time = "2025-09-25T19:49:52.533Z" },
{ url = "https://files.pythonhosted.org/packages/17/72/c344825e3b83c5389a369c8a8e58ffe1480b8a699f46c127c34580c4666b/bcrypt-5.0.0-cp38-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:d79e5c65dcc9af213594d6f7f1fa2c98ad3fc10431e7aa53c176b441943efbdd", size = 275907, upload-time = "2025-09-25T19:49:54.709Z" },
{ url = "https://files.pythonhosted.org/packages/0b/7e/d4e47d2df1641a36d1212e5c0514f5291e1a956a7749f1e595c07a972038/bcrypt-5.0.0-cp38-abi3-manylinux_2_28_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:2b732e7d388fa22d48920baa267ba5d97cca38070b69c0e2d37087b381c681fd", size = 296500, upload-time = "2025-09-25T19:49:56.013Z" },
{ url = "https://files.pythonhosted.org/packages/0f/c3/0ae57a68be2039287ec28bc463b82e4b8dc23f9d12c0be331f4782e19108/bcrypt-5.0.0-cp38-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:0c8e093ea2532601a6f686edbc2c6b2ec24131ff5c52f7610dd64fa4553b5464", size = 278412, upload-time = "2025-09-25T19:49:57.356Z" },
{ url = "https://files.pythonhosted.org/packages/45/2b/77424511adb11e6a99e3a00dcc7745034bee89036ad7d7e255a7e47be7d8/bcrypt-5.0.0-cp38-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:5b1589f4839a0899c146e8892efe320c0fa096568abd9b95593efac50a87cb75", size = 275486, upload-time = "2025-09-25T19:49:59.116Z" },
{ url = "https://files.pythonhosted.org/packages/43/0a/405c753f6158e0f3f14b00b462d8bca31296f7ecfc8fc8bc7919c0c7d73a/bcrypt-5.0.0-cp38-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:89042e61b5e808b67daf24a434d89bab164d4de1746b37a8d173b6b14f3db9ff", size = 277940, upload-time = "2025-09-25T19:50:00.869Z" },
{ url = "https://files.pythonhosted.org/packages/62/83/b3efc285d4aadc1fa83db385ec64dcfa1707e890eb42f03b127d66ac1b7b/bcrypt-5.0.0-cp38-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:e3cf5b2560c7b5a142286f69bde914494b6d8f901aaa71e453078388a50881c4", size = 310776, upload-time = "2025-09-25T19:50:02.393Z" },
{ url = "https://files.pythonhosted.org/packages/95/7d/47ee337dacecde6d234890fe929936cb03ebc4c3a7460854bbd9c97780b8/bcrypt-5.0.0-cp38-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:f632fd56fc4e61564f78b46a2269153122db34988e78b6be8b32d28507b7eaeb", size = 312922, upload-time = "2025-09-25T19:50:04.232Z" },
{ url = "https://files.pythonhosted.org/packages/d6/3a/43d494dfb728f55f4e1cf8fd435d50c16a2d75493225b54c8d06122523c6/bcrypt-5.0.0-cp38-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:801cad5ccb6b87d1b430f183269b94c24f248dddbbc5c1f78b6ed231743e001c", size = 341367, upload-time = "2025-09-25T19:50:05.559Z" },
{ url = "https://files.pythonhosted.org/packages/55/ab/a0727a4547e383e2e22a630e0f908113db37904f58719dc48d4622139b5c/bcrypt-5.0.0-cp38-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:3cf67a804fc66fc217e6914a5635000259fbbbb12e78a99488e4d5ba445a71eb", size = 359187, upload-time = "2025-09-25T19:50:06.916Z" },
{ url = "https://files.pythonhosted.org/packages/1b/bb/461f352fdca663524b4643d8b09e8435b4990f17fbf4fea6bc2a90aa0cc7/bcrypt-5.0.0-cp38-abi3-win32.whl", hash = "sha256:3abeb543874b2c0524ff40c57a4e14e5d3a66ff33fb423529c88f180fd756538", size = 153752, upload-time = "2025-09-25T19:50:08.515Z" },
{ url = "https://files.pythonhosted.org/packages/41/aa/4190e60921927b7056820291f56fc57d00d04757c8b316b2d3c0d1d6da2c/bcrypt-5.0.0-cp38-abi3-win_amd64.whl", hash = "sha256:35a77ec55b541e5e583eb3436ffbbf53b0ffa1fa16ca6782279daf95d146dcd9", size = 150881, upload-time = "2025-09-25T19:50:09.742Z" },
{ url = "https://files.pythonhosted.org/packages/54/12/cd77221719d0b39ac0b55dbd39358db1cd1246e0282e104366ebbfb8266a/bcrypt-5.0.0-cp38-abi3-win_arm64.whl", hash = "sha256:cde08734f12c6a4e28dc6755cd11d3bdfea608d93d958fffbe95a7026ebe4980", size = 144931, upload-time = "2025-09-25T19:50:11.016Z" },
{ url = "https://files.pythonhosted.org/packages/5d/ba/2af136406e1c3839aea9ecadc2f6be2bcd1eff255bd451dd39bcf302c47a/bcrypt-5.0.0-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:0c418ca99fd47e9c59a301744d63328f17798b5947b0f791e9af3c1c499c2d0a", size = 495313, upload-time = "2025-09-25T19:50:12.309Z" },
{ url = "https://files.pythonhosted.org/packages/ac/ee/2f4985dbad090ace5ad1f7dd8ff94477fe089b5fab2040bd784a3d5f187b/bcrypt-5.0.0-cp39-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:ddb4e1500f6efdd402218ffe34d040a1196c072e07929b9820f363a1fd1f4191", size = 275290, upload-time = "2025-09-25T19:50:13.673Z" },
{ url = "https://files.pythonhosted.org/packages/e4/6e/b77ade812672d15cf50842e167eead80ac3514f3beacac8902915417f8b7/bcrypt-5.0.0-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:7aeef54b60ceddb6f30ee3db090351ecf0d40ec6e2abf41430997407a46d2254", size = 278253, upload-time = "2025-09-25T19:50:15.089Z" },
{ url = "https://files.pythonhosted.org/packages/36/c4/ed00ed32f1040f7990dac7115f82273e3c03da1e1a1587a778d8cea496d8/bcrypt-5.0.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:f0ce778135f60799d89c9693b9b398819d15f1921ba15fe719acb3178215a7db", size = 276084, upload-time = "2025-09-25T19:50:16.699Z" },
{ url = "https://files.pythonhosted.org/packages/e7/c4/fa6e16145e145e87f1fa351bbd54b429354fd72145cd3d4e0c5157cf4c70/bcrypt-5.0.0-cp39-abi3-manylinux_2_28_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:a71f70ee269671460b37a449f5ff26982a6f2ba493b3eabdd687b4bf35f875ac", size = 297185, upload-time = "2025-09-25T19:50:18.525Z" },
{ url = "https://files.pythonhosted.org/packages/24/b4/11f8a31d8b67cca3371e046db49baa7c0594d71eb40ac8121e2fc0888db0/bcrypt-5.0.0-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:f8429e1c410b4073944f03bd778a9e066e7fad723564a52ff91841d278dfc822", size = 278656, upload-time = "2025-09-25T19:50:19.809Z" },
{ url = "https://files.pythonhosted.org/packages/ac/31/79f11865f8078e192847d2cb526e3fa27c200933c982c5b2869720fa5fce/bcrypt-5.0.0-cp39-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:edfcdcedd0d0f05850c52ba3127b1fce70b9f89e0fe5ff16517df7e81fa3cbb8", size = 275662, upload-time = "2025-09-25T19:50:21.567Z" },
{ url = "https://files.pythonhosted.org/packages/d4/8d/5e43d9584b3b3591a6f9b68f755a4da879a59712981ef5ad2a0ac1379f7a/bcrypt-5.0.0-cp39-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:611f0a17aa4a25a69362dcc299fda5c8a3d4f160e2abb3831041feb77393a14a", size = 278240, upload-time = "2025-09-25T19:50:23.305Z" },
{ url = "https://files.pythonhosted.org/packages/89/48/44590e3fc158620f680a978aafe8f87a4c4320da81ed11552f0323aa9a57/bcrypt-5.0.0-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:db99dca3b1fdc3db87d7c57eac0c82281242d1eabf19dcb8a6b10eb29a2e72d1", size = 311152, upload-time = "2025-09-25T19:50:24.597Z" },
{ url = "https://files.pythonhosted.org/packages/5f/85/e4fbfc46f14f47b0d20493669a625da5827d07e8a88ee460af6cd9768b44/bcrypt-5.0.0-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:5feebf85a9cefda32966d8171f5db7e3ba964b77fdfe31919622256f80f9cf42", size = 313284, upload-time = "2025-09-25T19:50:26.268Z" },
{ url = "https://files.pythonhosted.org/packages/25/ae/479f81d3f4594456a01ea2f05b132a519eff9ab5768a70430fa1132384b1/bcrypt-5.0.0-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:3ca8a166b1140436e058298a34d88032ab62f15aae1c598580333dc21d27ef10", size = 341643, upload-time = "2025-09-25T19:50:28.02Z" },
{ url = "https://files.pythonhosted.org/packages/df/d2/36a086dee1473b14276cd6ea7f61aef3b2648710b5d7f1c9e032c29b859f/bcrypt-5.0.0-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:61afc381250c3182d9078551e3ac3a41da14154fbff647ddf52a769f588c4172", size = 359698, upload-time = "2025-09-25T19:50:31.347Z" },
{ url = "https://files.pythonhosted.org/packages/c0/f6/688d2cd64bfd0b14d805ddb8a565e11ca1fb0fd6817175d58b10052b6d88/bcrypt-5.0.0-cp39-abi3-win32.whl", hash = "sha256:64d7ce196203e468c457c37ec22390f1a61c85c6f0b8160fd752940ccfb3a683", size = 153725, upload-time = "2025-09-25T19:50:34.384Z" },
{ url = "https://files.pythonhosted.org/packages/9f/b9/9d9a641194a730bda138b3dfe53f584d61c58cd5230e37566e83ec2ffa0d/bcrypt-5.0.0-cp39-abi3-win_amd64.whl", hash = "sha256:64ee8434b0da054d830fa8e89e1c8bf30061d539044a39524ff7dec90481e5c2", size = 150912, upload-time = "2025-09-25T19:50:35.69Z" },
{ url = "https://files.pythonhosted.org/packages/27/44/d2ef5e87509158ad2187f4dd0852df80695bb1ee0cfe0a684727b01a69e0/bcrypt-5.0.0-cp39-abi3-win_arm64.whl", hash = "sha256:f2347d3534e76bf50bca5500989d6c1d05ed64b440408057a37673282c654927", size = 144953, upload-time = "2025-09-25T19:50:37.32Z" },
{ url = "https://files.pythonhosted.org/packages/8a/75/4aa9f5a4d40d762892066ba1046000b329c7cd58e888a6db878019b282dc/bcrypt-5.0.0-pp311-pypy311_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:7edda91d5ab52b15636d9c30da87d2cc84f426c72b9dba7a9b4fe142ba11f534", size = 271180, upload-time = "2025-09-25T19:50:38.575Z" },
{ url = "https://files.pythonhosted.org/packages/54/79/875f9558179573d40a9cc743038ac2bf67dfb79cecb1e8b5d70e88c94c3d/bcrypt-5.0.0-pp311-pypy311_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:046ad6db88edb3c5ece4369af997938fb1c19d6a699b9c1b27b0db432faae4c4", size = 273791, upload-time = "2025-09-25T19:50:39.913Z" },
{ url = "https://files.pythonhosted.org/packages/bc/fe/975adb8c216174bf70fc17535f75e85ac06ed5252ea077be10d9cff5ce24/bcrypt-5.0.0-pp311-pypy311_pp73-manylinux_2_34_aarch64.whl", hash = "sha256:dcd58e2b3a908b5ecc9b9df2f0085592506ac2d5110786018ee5e160f28e0911", size = 270746, upload-time = "2025-09-25T19:50:43.306Z" },
{ url = "https://files.pythonhosted.org/packages/e4/f8/972c96f5a2b6c4b3deca57009d93e946bbdbe2241dca9806d502f29dd3ee/bcrypt-5.0.0-pp311-pypy311_pp73-manylinux_2_34_x86_64.whl", hash = "sha256:6b8f520b61e8781efee73cba14e3e8c9556ccfb375623f4f97429544734545b4", size = 273375, upload-time = "2025-09-25T19:50:45.43Z" },
]
[[package]]
name = "certifi"
version = "2026.1.4"
@@ -251,7 +321,7 @@ wheels = [
[[package]]
name = "fastapi-toolsets"
version = "3.0.0"
version = "3.0.1"
source = { editable = "." }
dependencies = [
{ name = "asyncpg" },
@@ -282,6 +352,7 @@ pytest = [
[package.dev-dependencies]
dev = [
{ name = "bcrypt" },
{ name = "coverage" },
{ name = "fastapi-toolsets", extra = ["all"] },
{ name = "httpx" },
@@ -301,6 +372,9 @@ docs = [
{ name = "mkdocstrings-python" },
{ name = "zensical" },
]
docs-src = [
{ name = "bcrypt" },
]
tests = [
{ name = "coverage" },
{ name = "httpx" },
@@ -327,6 +401,7 @@ provides-extras = ["cli", "metrics", "pytest", "all"]
[package.metadata.requires-dev]
dev = [
{ name = "bcrypt", specifier = ">=4.0.0" },
{ name = "coverage", specifier = ">=7.0.0" },
{ name = "fastapi-toolsets", extras = ["all"] },
{ name = "httpx", specifier = ">=0.25.0" },
@@ -346,6 +421,7 @@ docs = [
{ name = "mkdocstrings-python", specifier = ">=2.0.2" },
{ name = "zensical", specifier = ">=0.0.30" },
]
docs-src = [{ name = "bcrypt", specifier = ">=4.0.0" }]
tests = [
{ name = "coverage", specifier = ">=7.0.0" },
{ name = "httpx", specifier = ">=0.25.0" },

View File

@@ -2,10 +2,15 @@
site_name = "FastAPI Toolsets"
site_description = "Production-ready utilities for FastAPI applications."
site_author = "d3vyce"
site_url = "https://fastapi-toolsets.d3vyce.fr"
site_url = "https://fastapi-toolsets.d3vyce.fr/"
copyright = "Copyright &copy; 2026 d3vyce"
repo_url = "https://github.com/d3vyce/fastapi-toolsets"
[project.extra.version]
provider = "mike"
default = "stable"
alias = true
[project.theme]
custom_dir = "docs/overrides"
language = "en"