Compare commits

..

6 Commits

31 changed files with 954 additions and 2681 deletions

View File

@@ -6,6 +6,9 @@ on:
pull_request:
branches: [main]
permissions:
contents: read
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

View File

@@ -1 +0,0 @@
# Authentication

View File

@@ -1,267 +0,0 @@
# Security
Composable authentication helpers for FastAPI that use `Security()` for OpenAPI documentation and accept user-provided validator functions with full type flexibility.
## Overview
The `security` module provides four auth source classes and a `MultiAuth` factory. Each class wraps a FastAPI security scheme for OpenAPI and accepts a validator function called as:
```python
await validator(credential, **kwargs)
```
where `kwargs` are the extra keyword arguments provided at instantiation (roles, permissions, enums, etc.). The validator returns the authenticated identity (e.g. a `User` model) which becomes the route dependency value.
```python
from fastapi import Security
from fastapi_toolsets.security import BearerTokenAuth
async def verify_token(token: str, *, role: str) -> User:
user = await db.get_by_token(token)
if not user or user.role != role:
raise UnauthorizedError()
return user
bearer_admin = BearerTokenAuth(verify_token, role="admin")
@app.get("/admin")
async def admin_route(user: User = Security(bearer_admin)):
return user
```
## Auth sources
### [`BearerTokenAuth`](../reference/security.md#fastapi_toolsets.security.BearerTokenAuth)
Reads the `Authorization: Bearer <token>` header. Wraps `HTTPBearer` for OpenAPI.
```python
from fastapi_toolsets.security import BearerTokenAuth
bearer = BearerTokenAuth(validator=verify_token)
@app.get("/me")
async def me(user: User = Security(bearer)):
return user
```
#### Token prefix
The optional `prefix` parameter restricts a `BearerTokenAuth` instance to tokens
that start with a given string. The prefix is **kept** in the value passed to the
validator — store and compare tokens with their prefix included.
This lets you deploy multiple `BearerTokenAuth` instances in the same application
and disambiguate them efficiently in `MultiAuth`:
```python
user_bearer = BearerTokenAuth(verify_user, prefix="user_") # matches "Bearer user_..."
org_bearer = BearerTokenAuth(verify_org, prefix="org_") # matches "Bearer org_..."
```
Use [`generate_token()`](#token-generation) to create correctly-prefixed tokens.
#### Token generation
`BearerTokenAuth.generate_token()` produces a secure random token ready to store
in your database and return to the client. If a prefix is configured it is
prepended automatically:
```python
bearer = BearerTokenAuth(verify_token, prefix="user_")
token = bearer.generate_token() # e.g. "user_Xk3mN..."
await db.store_token(user_id, token)
return {"access_token": token, "token_type": "bearer"}
```
The client sends `Authorization: Bearer user_Xk3mN...` and the validator receives
the full token (prefix included) to compare against the stored value.
### [`CookieAuth`](../reference/security.md#fastapi_toolsets.security.CookieAuth)
Reads a named cookie. Wraps `APIKeyCookie` for OpenAPI.
```python
from fastapi_toolsets.security import CookieAuth
cookie_auth = CookieAuth("session", validator=verify_session)
@app.get("/me")
async def me(user: User = Security(cookie_auth)):
return user
```
### [`OAuth2Auth`](../reference/security.md#fastapi_toolsets.security.OAuth2Auth)
Reads the `Authorization: Bearer <token>` header and registers the token endpoint
in OpenAPI via `OAuth2PasswordBearer`.
```python
from fastapi_toolsets.security import OAuth2Auth
oauth2_auth = OAuth2Auth(token_url="/token", validator=verify_token)
@app.get("/me")
async def me(user: User = Security(oauth2_auth)):
return user
```
### [`OpenIDAuth`](../reference/security.md#fastapi_toolsets.security.OpenIDAuth)
Reads the `Authorization: Bearer <token>` header and registers the OpenID Connect
discovery URL in OpenAPI via `OpenIdConnect`. Token validation is fully delegated
to your validator — use any OIDC / JWT library (`authlib`, `python-jose`, `PyJWT`).
```python
from fastapi_toolsets.security import OpenIDAuth
async def verify_google_token(token: str, *, audience: str) -> User:
payload = jwt.decode(token, google_public_keys, algorithms=["RS256"],
audience=audience)
return User(email=payload["email"], name=payload["name"])
google_auth = OpenIDAuth(
"https://accounts.google.com/.well-known/openid-configuration",
verify_google_token,
audience="my-client-id",
)
@app.get("/me")
async def me(user: User = Security(google_auth)):
return user
```
The discovery URL is used **only for OpenAPI documentation** — no requests are made
to it by this class. You are responsible for fetching and caching the provider's
public keys in your validator.
Multiple providers work naturally with `MultiAuth`:
```python
multi = MultiAuth(google_auth, github_auth)
@app.get("/data")
async def data(user: User = Security(multi)):
return user
```
## Typed validator kwargs
All auth classes forward extra instantiation keyword arguments to the validator.
Arguments can be any type — enums, strings, integers, etc. The validator returns
the authenticated identity, which FastAPI injects directly into the route handler.
```python
async def verify_token(token: str, *, role: Role, permission: str) -> User:
user = await decode_token(token)
if user.role != role or permission not in user.permissions:
raise UnauthorizedError()
return user
bearer = BearerTokenAuth(verify_token, role=Role.ADMIN, permission="billing:read")
```
Each auth instance is self-contained — create a separate instance per distinct
requirement instead of passing requirements through `Security(scopes=[...])`.
### Using `.require()` inline
If declaring a new top-level variable per role feels verbose, use `.require()` to
create a configured clone directly in the route decorator. The original instance
is not mutated:
```python
bearer = BearerTokenAuth(verify_token)
@app.get("/admin/stats")
async def admin_stats(user: User = Security(bearer.require(role=Role.ADMIN))):
return {"message": f"Hello admin {user.name}"}
@app.get("/profile")
async def profile(user: User = Security(bearer.require(role=Role.USER))):
return {"id": user.id, "name": user.name}
```
`.require()` kwargs are merged over existing ones — new values win on conflict.
The `prefix` (for `BearerTokenAuth`) and cookie name (for `CookieAuth`) are
always preserved.
`.require()` instances work transparently inside `MultiAuth`:
```python
multi = MultiAuth(
user_bearer.require(role=Role.USER),
org_bearer.require(role=Role.ADMIN),
)
```
## MultiAuth
[`MultiAuth`](../reference/security.md#fastapi_toolsets.security.MultiAuth) combines
multiple auth sources into a single callable. Sources are tried in order; the
first one that finds a credential wins.
```python
from fastapi_toolsets.security import MultiAuth
multi = MultiAuth(user_bearer, org_bearer, cookie_auth)
@app.get("/data")
async def data_route(user = Security(multi)):
return user
```
### Using `.require()` on MultiAuth
`MultiAuth` also supports `.require()`, which propagates the kwargs to every
source that implements it. Sources that do not (e.g. custom `AuthSource`
subclasses) are passed through unchanged:
```python
multi = MultiAuth(bearer, cookie)
@app.get("/admin")
async def admin(user: User = Security(multi.require(role=Role.ADMIN))):
return user
```
This is equivalent to calling `.require()` on each source individually:
```python
# These two are identical
multi.require(role=Role.ADMIN)
MultiAuth(
bearer.require(role=Role.ADMIN),
cookie.require(role=Role.ADMIN),
)
```
### Prefix-based dispatch
Because `extract()` is pure string matching (no I/O), prefix-based source
selection is essentially free. Only the matching source's validator (which may
involve DB or network I/O) is ever called:
```python
user_bearer = BearerTokenAuth(verify_user, prefix="user_")
org_bearer = BearerTokenAuth(verify_org, prefix="org_")
multi = MultiAuth(user_bearer, org_bearer)
# "Bearer user_alice" → only verify_user runs, receives "user_alice"
# "Bearer org_acme" → only verify_org runs, receives "org_acme"
```
Tokens are stored and compared **with their prefix** — use `generate_token()` on
each source to issue correctly-prefixed tokens:
```python
user_token = user_bearer.generate_token() # "user_..."
org_token = org_bearer.generate_token() # "org_..."
```
---
[:material-api: API Reference](../reference/security.md)

View File

@@ -1,28 +0,0 @@
# `security`
Here's the reference for the authentication helpers provided by the `security` module.
You can import them directly from `fastapi_toolsets.security`:
```python
from fastapi_toolsets.security import (
AuthSource,
BearerTokenAuth,
CookieAuth,
OAuth2Auth,
OpenIDAuth,
MultiAuth,
)
```
## ::: fastapi_toolsets.security.AuthSource
## ::: fastapi_toolsets.security.BearerTokenAuth
## ::: fastapi_toolsets.security.CookieAuth
## ::: fastapi_toolsets.security.OAuth2Auth
## ::: fastapi_toolsets.security.OpenIDAuth
## ::: fastapi_toolsets.security.MultiAuth

View File

@@ -1,9 +0,0 @@
from fastapi import FastAPI
from fastapi_toolsets.exceptions import init_exceptions_handlers
from .routes import router
app = FastAPI()
init_exceptions_handlers(app=app)
app.include_router(router=router)

View File

@@ -1,9 +0,0 @@
from fastapi_toolsets.crud import CrudFactory
from .models import OAuthAccount, OAuthProvider, Team, User, UserToken
TeamCrud = CrudFactory(model=Team)
UserCrud = CrudFactory(model=User)
UserTokenCrud = CrudFactory(model=UserToken)
OAuthProviderCrud = CrudFactory(model=OAuthProvider)
OAuthAccountCrud = CrudFactory(model=OAuthAccount)

View File

@@ -1,15 +0,0 @@
from fastapi import Depends
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from fastapi_toolsets.db import create_db_context, create_db_dependency
DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost:5432/postgres"
engine = create_async_engine(url=DATABASE_URL, future=True)
async_session_maker = async_sessionmaker(bind=engine, expire_on_commit=False)
get_db = create_db_dependency(session_maker=async_session_maker)
get_db_context = create_db_context(session_maker=async_session_maker)
SessionDep = Depends(get_db)

View File

@@ -1,105 +0,0 @@
import enum
from datetime import datetime
from uuid import UUID
from sqlalchemy import (
Boolean,
DateTime,
Enum,
ForeignKey,
Integer,
String,
UniqueConstraint,
)
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from fastapi_toolsets.models import TimestampMixin, UUIDMixin
class Base(DeclarativeBase, UUIDMixin):
type_annotation_map = {
str: String(),
int: Integer(),
UUID: PG_UUID(as_uuid=True),
datetime: DateTime(timezone=True),
}
class UserRole(enum.Enum):
admin = "admin"
moderator = "moderator"
user = "user"
class Team(Base, TimestampMixin):
__tablename__ = "teams"
name: Mapped[str] = mapped_column(String, unique=True, index=True)
users: Mapped[list["User"]] = relationship(back_populates="team")
class User(Base, TimestampMixin):
__tablename__ = "users"
username: Mapped[str] = mapped_column(String, unique=True, index=True)
email: Mapped[str | None] = mapped_column(
String, unique=True, index=True, nullable=True
)
hashed_password: Mapped[str | None] = mapped_column(String, nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
role: Mapped[UserRole] = mapped_column(Enum(UserRole), default=UserRole.user)
team_id: Mapped[UUID | None] = mapped_column(ForeignKey("teams.id"), nullable=True)
team: Mapped["Team | None"] = relationship(back_populates="users")
oauth_accounts: Mapped[list["OAuthAccount"]] = relationship(back_populates="user")
tokens: Mapped[list["UserToken"]] = relationship(back_populates="user")
class UserToken(Base, TimestampMixin):
"""API tokens for a user (multiple allowed)."""
__tablename__ = "user_tokens"
user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"))
# Store hashed token value
token_hash: Mapped[str] = mapped_column(String, unique=True, index=True)
name: Mapped[str | None] = mapped_column(String, nullable=True)
expires_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
user: Mapped["User"] = relationship(back_populates="tokens")
class OAuthProvider(Base, TimestampMixin):
"""Configurable OAuth2 / OpenID Connect provider."""
__tablename__ = "oauth_providers"
slug: Mapped[str] = mapped_column(String, unique=True, index=True)
name: Mapped[str] = mapped_column(String)
client_id: Mapped[str] = mapped_column(String)
client_secret: Mapped[str] = mapped_column(String)
discovery_url: Mapped[str] = mapped_column(String, nullable=False)
scopes: Mapped[str] = mapped_column(String, default="openid email profile")
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
accounts: Mapped[list["OAuthAccount"]] = relationship(back_populates="provider")
class OAuthAccount(Base, TimestampMixin):
"""OAuth2 / OpenID Connect account linked to a user."""
__tablename__ = "oauth_accounts"
__table_args__ = (
UniqueConstraint("provider_id", "subject", name="uq_oauth_provider_subject"),
)
user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"))
provider_id: Mapped[UUID] = mapped_column(ForeignKey("oauth_providers.id"))
# OAuth `sub` / OpenID subject identifier
subject: Mapped[str] = mapped_column(String)
user: Mapped["User"] = relationship(back_populates="oauth_accounts")
provider: Mapped["OAuthProvider"] = relationship(back_populates="accounts")

View File

@@ -1,122 +0,0 @@
from typing import Annotated
from uuid import UUID
import bcrypt
from fastapi import APIRouter, Form, HTTPException, Response, Security
from fastapi_toolsets.dependencies import PathDependency
from .crud import UserCrud, UserTokenCrud
from .db import SessionDep
from .models import OAuthProvider, User, UserToken
from .schemas import (
ApiTokenCreateRequest,
ApiTokenResponse,
RegisterRequest,
UserCreate,
UserResponse,
)
from .security import auth, cookie_auth, create_api_token
ProviderDep = PathDependency(
model=OAuthProvider,
field=OAuthProvider.slug,
session_dep=SessionDep,
param_name="slug",
)
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def verify_password(plain: str, hashed: str) -> bool:
return bcrypt.checkpw(plain.encode(), hashed.encode())
router = APIRouter(prefix="/auth")
@router.post("/register", response_model=UserResponse, status_code=201)
async def register(body: RegisterRequest, session: SessionDep):
existing = await UserCrud.first(
session=session, filters=[User.username == body.username]
)
if existing:
raise HTTPException(status_code=409, detail="Username already taken")
user = await UserCrud.create(
session=session,
obj=UserCreate(
username=body.username,
email=body.email,
hashed_password=hash_password(body.password),
),
)
return user
@router.post("/token", status_code=204)
async def login(
session: SessionDep,
response: Response,
username: Annotated[str, Form()],
password: Annotated[str, Form()],
):
user = await UserCrud.first(session=session, filters=[User.username == username])
if (
not user
or not user.hashed_password
or not verify_password(password, user.hashed_password)
):
raise HTTPException(status_code=401, detail="Invalid credentials")
if not user.is_active:
raise HTTPException(status_code=403, detail="Account disabled")
cookie_auth.set_cookie(response, str(user.id))
@router.post("/logout", status_code=204)
async def logout(response: Response):
cookie_auth.delete_cookie(response)
@router.get("/me", response_model=UserResponse)
async def me(user: User = Security(auth)):
return user
@router.post("/tokens", response_model=ApiTokenResponse, status_code=201)
async def create_token(
body: ApiTokenCreateRequest,
user: User = Security(auth),
):
raw, token_row = await create_api_token(
user.id, name=body.name, expires_at=body.expires_at
)
return ApiTokenResponse(
id=token_row.id,
name=token_row.name,
expires_at=token_row.expires_at,
created_at=token_row.created_at,
token=raw,
)
@router.delete("/tokens/{token_id}", status_code=204)
async def revoke_token(
session: SessionDep,
token_id: UUID,
user: User = Security(auth),
):
if not await UserTokenCrud.first(
session=session,
filters=[UserToken.id == token_id, UserToken.user_id == user.id],
):
raise HTTPException(status_code=404, detail="Token not found")
await UserTokenCrud.delete(
session=session,
filters=[UserToken.id == token_id, UserToken.user_id == user.id],
)

View File

@@ -1,64 +0,0 @@
from datetime import datetime
from uuid import UUID
from pydantic import EmailStr
from fastapi_toolsets.schemas import PydanticBase
class RegisterRequest(PydanticBase):
username: str
password: str
email: EmailStr | None = None
class UserResponse(PydanticBase):
id: UUID
username: str
email: str | None
role: str
is_active: bool
model_config = {"from_attributes": True}
class ApiTokenCreateRequest(PydanticBase):
name: str | None = None
expires_at: datetime | None = None
class ApiTokenResponse(PydanticBase):
id: UUID
name: str | None
expires_at: datetime | None
created_at: datetime
# Only populated on creation
token: str | None = None
model_config = {"from_attributes": True}
class OAuthProviderResponse(PydanticBase):
slug: str
name: str
model_config = {"from_attributes": True}
class UserCreate(PydanticBase):
username: str
email: str | None = None
hashed_password: str | None = None
class UserTokenCreate(PydanticBase):
user_id: UUID
token_hash: str
name: str | None = None
expires_at: datetime | None = None
class OAuthAccountCreate(PydanticBase):
user_id: UUID
provider_id: UUID
subject: str

View File

@@ -1,100 +0,0 @@
import hashlib
from datetime import datetime, timezone
from uuid import UUID
from fastapi import HTTPException
from sqlalchemy.orm import selectinload
from fastapi_toolsets.exceptions import UnauthorizedError
from fastapi_toolsets.security import (
APIKeyHeaderAuth,
BearerTokenAuth,
CookieAuth,
MultiAuth,
)
from .crud import UserCrud, UserTokenCrud
from .db import get_db_context
from .models import User, UserRole, UserToken
from .schemas import UserTokenCreate
SESSION_COOKIE = "session"
SECRET_KEY = "123456789"
def _hash_token(token: str) -> str:
return hashlib.sha256(token.encode()).hexdigest()
async def _verify_token(token: str, role: UserRole | None = None) -> User:
async with get_db_context() as db:
user_token = await UserTokenCrud.first(
session=db,
filters=[UserToken.token_hash == _hash_token(token)],
load_options=[selectinload(UserToken.user)],
)
if user_token is None or not user_token.user.is_active:
raise UnauthorizedError()
if user_token.expires_at and user_token.expires_at < datetime.now(timezone.utc):
raise UnauthorizedError()
user = user_token.user
if role is not None and user.role != role:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
async def _verify_cookie(user_id: str, role: UserRole | None = None) -> User:
async with get_db_context() as db:
user = await UserCrud.first(
session=db,
filters=[User.id == UUID(user_id)],
)
if not user or not user.is_active:
raise UnauthorizedError()
if role is not None and user.role != role:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
bearer_auth = BearerTokenAuth(
validator=_verify_token,
prefix="ctf_",
)
header_auth = APIKeyHeaderAuth(
name="X-API-Key",
validator=_verify_token,
)
cookie_auth = CookieAuth(
name=SESSION_COOKIE,
validator=_verify_cookie,
secret_key=SECRET_KEY,
)
auth = MultiAuth(bearer_auth, header_auth, cookie_auth)
async def create_api_token(
user_id: UUID,
*,
name: str | None = None,
expires_at: datetime | None = None,
) -> tuple[str, UserToken]:
raw = bearer_auth.generate_token()
async with get_db_context() as db:
token_row = await UserTokenCrud.create(
session=db,
obj=UserTokenCreate(
user_id=user_id,
token_hash=_hash_token(raw),
name=name,
expires_at=expires_at,
),
)
return raw, token_row

View File

@@ -1,6 +1,6 @@
[project]
name = "fastapi-toolsets"
version = "3.0.1"
version = "3.0.3"
description = "Production-ready utilities for FastAPI applications"
readme = "README.md"
license = "MIT"
@@ -66,7 +66,6 @@ manager = "fastapi_toolsets.cli.app:cli"
dev = [
{include-group = "tests"},
{include-group = "docs"},
{include-group = "docs-src"},
"fastapi-toolsets[all]",
"prek>=0.3.8",
"ruff>=0.1.0",
@@ -85,9 +84,6 @@ docs = [
"mkdocstrings-python>=2.0.2",
"zensical>=0.0.30",
]
docs-src = [
"bcrypt>=4.0.0",
]
[build-system]
requires = ["uv_build>=0.10,<0.12.0"]

View File

@@ -21,4 +21,4 @@ Example usage:
return Response(data={"user": user.username}, message="Success")
"""
__version__ = "3.0.1"
__version__ = "3.0.3"

View File

@@ -15,12 +15,13 @@ from ..types import (
OrderFieldType,
SearchFieldType,
)
from .factory import AsyncCrud, CrudFactory
from .factory import AsyncCrud, CrudFactory, lateral_load
from .search import SearchConfig, get_searchable_fields
__all__ = [
"AsyncCrud",
"CrudFactory",
"lateral_load",
"FacetFieldType",
"get_searchable_fields",
"InvalidFacetFilterError",

View File

@@ -10,15 +10,32 @@ from collections.abc import Awaitable, Callable, Sequence
from datetime import date, datetime
from decimal import Decimal
from enum import Enum
from typing import Any, ClassVar, Generic, Literal, Self, cast, overload
from typing import Any, ClassVar, Generic, Literal, NamedTuple, Self, cast, overload
from fastapi import Query
from pydantic import BaseModel
from sqlalchemy import Date, DateTime, Float, Integer, Numeric, Uuid, and_, func, select
from sqlalchemy import (
Date,
DateTime,
Float,
Integer,
Numeric,
Uuid,
and_,
func,
select,
true,
)
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.exc import NoResultFound
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase, QueryableAttribute, selectinload
from sqlalchemy.orm import (
DeclarativeBase,
QueryableAttribute,
RelationshipProperty,
contains_eager,
selectinload,
)
from sqlalchemy.sql.base import ExecutableOption
from sqlalchemy.sql.roles import WhereHavingRole
@@ -35,6 +52,7 @@ from ..schemas import (
from ..types import (
FacetFieldType,
JoinType,
LateralJoinType,
M2MFieldType,
ModelType,
OrderByClause,
@@ -115,6 +133,78 @@ def _apply_joins(q: Any, joins: JoinType | None, outer_join: bool) -> Any:
return q
class _ResolvedLateral(NamedTuple):
joins: LateralJoinType
eager: list[ExecutableOption]
class _LateralLoad:
"""Marker used inside ``default_load_options`` for lateral join loading.
Supports only Many:One and One:One relationships (single row per parent).
"""
__slots__ = ("rel_attr",)
def __init__(self, rel_attr: QueryableAttribute) -> None:
prop = rel_attr.property
if not isinstance(prop, RelationshipProperty):
raise TypeError(
f"lateral_load() requires a relationship attribute, got {type(prop).__name__}. "
"Example: lateral_load(User.team)"
)
if prop.secondary is not None:
raise ValueError(
f"lateral_load({rel_attr}) does not support Many:Many relationships. "
"Use selectinload() instead."
)
if prop.uselist:
raise ValueError(
f"lateral_load({rel_attr}) does not support One:Many relationships. "
"Use selectinload() instead."
)
self.rel_attr = rel_attr
def lateral_load(rel_attr: QueryableAttribute) -> _LateralLoad:
"""Mark a Many:One or One:One relationship for lateral join loading.
Raises ``ValueError`` for One:Many or Many:Many relationships.
"""
return _LateralLoad(rel_attr)
def _build_lateral_from_relationship(
rel_attr: QueryableAttribute,
) -> tuple[Any, Any, ExecutableOption]:
"""Introspect a Many:One relationship and build (lateral_subquery, true(), contains_eager)."""
prop = rel_attr.property
target_class = prop.mapper.class_
parent_class = prop.parent.class_
conditions = [
getattr(target_class, remote_col.key) == getattr(parent_class, local_col.key)
for local_col, remote_col in prop.local_remote_pairs
]
lateral_sub = (
select(target_class)
.where(and_(*conditions))
.correlate(parent_class)
.lateral(f"_lateral_{prop.key}")
)
return lateral_sub, true(), contains_eager(rel_attr, alias=lateral_sub)
def _apply_lateral_joins(q: Any, lateral_joins: LateralJoinType | None) -> Any:
"""Apply lateral subqueries as LEFT JOIN LATERAL to preserve all parent rows."""
if not lateral_joins:
return q
for subquery, condition in lateral_joins:
q = q.outerjoin(subquery, condition)
return q
def _apply_search_joins(q: Any, search_joins: list[Any]) -> Any:
"""Apply relationship-based outer joins (from search/filter_by) to a query."""
seen: set[str] = set()
@@ -132,12 +222,17 @@ class AsyncCrud(Generic[ModelType]):
Subclass this and set the `model` class variable, or use `CrudFactory`.
"""
_resolved_lateral: ClassVar[_ResolvedLateral | None] = None
model: ClassVar[type[DeclarativeBase]]
searchable_fields: ClassVar[Sequence[SearchFieldType] | None] = None
facet_fields: ClassVar[Sequence[FacetFieldType] | None] = None
order_fields: ClassVar[Sequence[OrderFieldType] | None] = None
m2m_fields: ClassVar[M2MFieldType | None] = None
default_load_options: ClassVar[Sequence[ExecutableOption] | None] = None
default_load_options: ClassVar[Sequence[ExecutableOption | _LateralLoad] | None] = (
None
)
lateral_joins: ClassVar[LateralJoinType | None] = None
cursor_column: ClassVar[Any | None] = None
@classmethod
@@ -161,14 +256,48 @@ class AsyncCrud(Generic[ModelType]):
):
cls.searchable_fields = [pk_col, *raw_fields]
raw_default_opts = cls.__dict__.get("default_load_options", None)
if raw_default_opts:
joins: LateralJoinType = []
eager: list[ExecutableOption] = []
clean: list[ExecutableOption] = []
for opt in raw_default_opts:
if isinstance(opt, _LateralLoad):
lat_sub, condition, eager_opt = _build_lateral_from_relationship(
opt.rel_attr
)
joins.append((lat_sub, condition))
eager.append(eager_opt)
else:
clean.append(opt)
if joins:
cls._resolved_lateral = _ResolvedLateral(joins=joins, eager=eager)
cls.default_load_options = clean or None
@classmethod
def _get_lateral_joins(cls) -> LateralJoinType | None:
"""Merge manual lateral_joins with ones resolved from default_load_options."""
resolved = cls._resolved_lateral
all_lateral = [
*(cls.lateral_joins or []),
*(resolved.joins if resolved else []),
]
return all_lateral or None
@classmethod
def _resolve_load_options(
cls, load_options: Sequence[ExecutableOption] | None
) -> Sequence[ExecutableOption] | None:
"""Return load_options if provided, else fall back to default_load_options."""
"""Return merged load options."""
if load_options is not None:
return load_options
return cls.default_load_options
return list(load_options) or None
resolved = cls._resolved_lateral
# default_load_options is cleaned of _LateralLoad markers in __init_subclass__,
# but its declared type still includes them — cast to reflect the runtime invariant.
base = cast(list[ExecutableOption], cls.default_load_options or [])
lateral = resolved.eager if resolved else []
merged = [*base, *lateral]
return merged or None
@classmethod
async def _reload_with_options(
@@ -861,6 +990,8 @@ class AsyncCrud(Generic[ModelType]):
"""
q = select(cls.model)
q = _apply_joins(q, joins, outer_join)
if load_options is None:
q = _apply_lateral_joins(q, cls._get_lateral_joins())
q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options):
q = q.options(*resolved)
@@ -933,6 +1064,8 @@ class AsyncCrud(Generic[ModelType]):
"""
q = select(cls.model)
q = _apply_joins(q, joins, outer_join)
if load_options is None:
q = _apply_lateral_joins(q, cls._get_lateral_joins())
if filters:
q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options):
@@ -978,6 +1111,8 @@ class AsyncCrud(Generic[ModelType]):
"""
q = select(cls.model)
q = _apply_joins(q, joins, outer_join)
if load_options is None:
q = _apply_lateral_joins(q, cls._get_lateral_joins())
if filters:
q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options):
@@ -1300,6 +1435,10 @@ class AsyncCrud(Generic[ModelType]):
# Apply explicit joins
q = _apply_joins(q, joins, outer_join)
# Apply lateral joins (Many:One relationship loading, excluded from count query)
if load_options is None:
q = _apply_lateral_joins(q, cls._get_lateral_joins())
# Apply search joins (always outer joins for search)
q = _apply_search_joins(q, search_joins)
@@ -1398,7 +1537,9 @@ class AsyncCrud(Generic[ModelType]):
tables.
outer_join: Use LEFT OUTER JOIN instead of INNER JOIN.
load_options: SQLAlchemy loader options. Falls back to
``default_load_options`` when not provided.
``default_load_options`` (including any lateral joins) when not
provided. When explicitly supplied, the caller takes full control
and lateral joins are skipped.
order_by: Additional ordering applied after the cursor column.
items_per_page: Number of items per page (default 20).
search: Search query string or SearchConfig object.
@@ -1455,6 +1596,10 @@ class AsyncCrud(Generic[ModelType]):
# Apply explicit joins
q = _apply_joins(q, joins, outer_join)
# Apply lateral joins (Many:One relationship loading)
if load_options is None:
q = _apply_lateral_joins(q, cls._get_lateral_joins())
# Apply search joins (always outer joins)
q = _apply_search_joins(q, search_joins)

View File

@@ -265,7 +265,15 @@ async def build_facets(
else:
q = q.order_by(column)
result = await session.execute(q)
values = [row[0] for row in result.all() if row[0] is not None]
col_type = column.property.columns[0].type
enum_class = getattr(col_type, "enum_class", None)
values = [
row[0].name
if (enum_class is not None and isinstance(row[0], enum_class))
else row[0]
for row in result.all()
if row[0] is not None
]
return key, values
pairs = await asyncio.gather(
@@ -347,6 +355,24 @@ def build_filter_by(
filters.append(column.overlap(value))
else:
filters.append(column.any(value))
elif isinstance(col_type, Enum):
enum_class = col_type.enum_class
if enum_class is not None:
def _coerce_enum(v: Any) -> Any:
if isinstance(v, enum_class):
return v
return enum_class[v] # lookup by name: "PENDING", "RED"
if isinstance(value, list):
filters.append(column.in_([_coerce_enum(v) for v in value]))
else:
filters.append(column == _coerce_enum(value))
else: # pragma: no cover
if isinstance(value, list):
filters.append(column.in_(value))
else:
filters.append(column == value)
elif isinstance(col_type, _EQUALITY_TYPES):
if isinstance(value, list):
filters.append(column.in_(value))

View File

@@ -1,24 +0,0 @@
"""Authentication helpers for FastAPI using Security()."""
from .abc import AuthSource
from .oauth import (
oauth_build_authorization_redirect,
oauth_decode_state,
oauth_encode_state,
oauth_fetch_userinfo,
oauth_resolve_provider_urls,
)
from .sources import APIKeyHeaderAuth, BearerTokenAuth, CookieAuth, MultiAuth
__all__ = [
"APIKeyHeaderAuth",
"AuthSource",
"BearerTokenAuth",
"CookieAuth",
"MultiAuth",
"oauth_build_authorization_redirect",
"oauth_decode_state",
"oauth_encode_state",
"oauth_fetch_userinfo",
"oauth_resolve_provider_urls",
]

View File

@@ -1,53 +0,0 @@
"""Abstract base class for authentication sources."""
import inspect
from abc import ABC, abstractmethod
from typing import Any, Callable
from fastapi import Request
from fastapi.security import SecurityScopes
from fastapi_toolsets.exceptions import UnauthorizedError
def _ensure_async(fn: Callable[..., Any]) -> Callable[..., Any]:
"""Wrap *fn* so it can always be awaited, caching the coroutine check at init time."""
if inspect.iscoroutinefunction(fn):
return fn
async def wrapper(*args: Any, **kwargs: Any) -> Any:
return fn(*args, **kwargs)
return wrapper
class AuthSource(ABC):
"""Abstract base class for authentication sources."""
def __init__(self) -> None:
"""Set up the default FastAPI dependency signature."""
source = self
async def _call(
request: Request,
security_scopes: SecurityScopes, # noqa: ARG001
) -> Any:
credential = await source.extract(request)
if credential is None:
raise UnauthorizedError()
return await source.authenticate(credential)
self._call_fn: Callable[..., Any] = _call
self.__signature__ = inspect.signature(_call)
@abstractmethod
async def extract(self, request: Request) -> str | None:
"""Extract the raw credential from the request without validating."""
@abstractmethod
async def authenticate(self, credential: str) -> Any:
"""Validate a credential and return the authenticated identity."""
async def __call__(self, **kwargs: Any) -> Any:
"""FastAPI dependency dispatch."""
return await self._call_fn(**kwargs)

View File

@@ -1,140 +0,0 @@
"""OAuth 2.0 / OIDC helper utilities."""
import base64
from typing import Any
from urllib.parse import urlencode
import httpx
from fastapi.responses import RedirectResponse
_discovery_cache: dict[str, dict] = {}
async def oauth_resolve_provider_urls(
discovery_url: str,
) -> tuple[str, str, str | None]:
"""Fetch the OIDC discovery document and return endpoint URLs.
Args:
discovery_url: URL of the provider's ``/.well-known/openid-configuration``.
Returns:
A ``(authorization_url, token_url, userinfo_url)`` tuple.
*userinfo_url* is ``None`` when the provider does not advertise one.
"""
if discovery_url not in _discovery_cache:
async with httpx.AsyncClient() as client:
resp = await client.get(discovery_url)
resp.raise_for_status()
_discovery_cache[discovery_url] = resp.json()
cfg = _discovery_cache[discovery_url]
return (
cfg["authorization_endpoint"],
cfg["token_endpoint"],
cfg.get("userinfo_endpoint"),
)
async def oauth_fetch_userinfo(
*,
token_url: str,
userinfo_url: str,
code: str,
client_id: str,
client_secret: str,
redirect_uri: str,
) -> dict[str, Any]:
"""Exchange an authorization code for tokens and return the userinfo payload.
Performs the two-step OAuth 2.0 / OIDC token exchange:
1. POSTs the authorization *code* to *token_url* to obtain an access token.
2. GETs *userinfo_url* using that access token as a Bearer credential.
Args:
token_url: Provider's token endpoint.
userinfo_url: Provider's userinfo endpoint.
code: Authorization code received from the provider's callback.
client_id: OAuth application client ID.
client_secret: OAuth application client secret.
redirect_uri: Redirect URI that was used in the authorization request.
Returns:
The JSON payload returned by the userinfo endpoint as a plain ``dict``.
"""
async with httpx.AsyncClient() as client:
token_resp = await client.post(
token_url,
data={
"grant_type": "authorization_code",
"code": code,
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
},
headers={"Accept": "application/json"},
)
token_resp.raise_for_status()
access_token = token_resp.json()["access_token"]
userinfo_resp = await client.get(
userinfo_url,
headers={"Authorization": f"Bearer {access_token}"},
)
userinfo_resp.raise_for_status()
return userinfo_resp.json()
def oauth_build_authorization_redirect(
authorization_url: str,
*,
client_id: str,
scopes: str,
redirect_uri: str,
destination: str,
) -> RedirectResponse:
"""Return an OAuth 2.0 authorization ``RedirectResponse``.
Args:
authorization_url: Provider's authorization endpoint.
client_id: OAuth application client ID.
scopes: Space-separated list of requested scopes.
redirect_uri: URI the provider should redirect back to after authorization.
destination: URL the user should be sent to after the full OAuth flow
completes (encoded as ``state``).
Returns:
A :class:`~fastapi.responses.RedirectResponse` to the provider's
authorization page.
"""
params = urlencode(
{
"client_id": client_id,
"response_type": "code",
"scope": scopes,
"redirect_uri": redirect_uri,
"state": oauth_encode_state(destination),
}
)
return RedirectResponse(f"{authorization_url}?{params}")
def oauth_encode_state(url: str) -> str:
"""Base64url-encode a URL to embed as an OAuth ``state`` parameter."""
return base64.urlsafe_b64encode(url.encode()).decode()
def oauth_decode_state(state: str | None, *, fallback: str) -> str:
"""Decode a base64url OAuth ``state`` parameter.
Handles missing padding (some providers strip ``=``).
Returns *fallback* if *state* is absent, the literal string ``"null"``,
or cannot be decoded.
"""
if not state or state == "null":
return fallback
try:
padded = state + "=" * (4 - len(state) % 4)
return base64.urlsafe_b64decode(padded).decode()
except Exception:
return fallback

View File

@@ -1,8 +0,0 @@
"""Built-in authentication source implementations."""
from .header import APIKeyHeaderAuth
from .bearer import BearerTokenAuth
from .cookie import CookieAuth
from .multi import MultiAuth
__all__ = ["APIKeyHeaderAuth", "BearerTokenAuth", "CookieAuth", "MultiAuth"]

View File

@@ -1,120 +0,0 @@
"""Bearer token authentication source."""
import inspect
import secrets
from typing import Annotated, Any, Callable
from fastapi import Depends
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer, SecurityScopes
from fastapi_toolsets.exceptions import UnauthorizedError
from ..abc import AuthSource, _ensure_async
class BearerTokenAuth(AuthSource):
"""Bearer token authentication source.
Wraps :class:`fastapi.security.HTTPBearer` for OpenAPI documentation.
The validator is called as ``await validator(credential, **kwargs)``
where ``kwargs`` are the extra keyword arguments provided at instantiation.
Args:
validator: Sync or async callable that receives the credential and any
extra keyword arguments, and returns the authenticated identity
(e.g. a ``User`` model). Should raise
:class:`~fastapi_toolsets.exceptions.UnauthorizedError` on failure.
prefix: Optional token prefix (e.g. ``"user_"``). If set, only tokens
whose value starts with this prefix are matched. The prefix is
**kept** in the value passed to the validator — store and compare
tokens with their prefix included. Use :meth:`generate_token` to
create correctly-prefixed tokens. This enables multiple
``BearerTokenAuth`` instances in the same app (e.g. ``"user_"``
for user tokens, ``"org_"`` for org tokens).
**kwargs: Extra keyword arguments forwarded to the validator on every
call (e.g. ``role=Role.ADMIN``).
"""
def __init__(
self,
validator: Callable[..., Any],
*,
prefix: str | None = None,
**kwargs: Any,
) -> None:
self._validator = _ensure_async(validator)
self._prefix = prefix
self._kwargs = kwargs
self._scheme = HTTPBearer(auto_error=False)
async def _call(
security_scopes: SecurityScopes, # noqa: ARG001
credentials: Annotated[
HTTPAuthorizationCredentials | None, Depends(self._scheme)
] = None,
) -> Any:
if credentials is None:
raise UnauthorizedError()
return await self._validate(credentials.credentials)
self._call_fn = _call
self.__signature__ = inspect.signature(_call)
async def _validate(self, token: str) -> Any:
"""Check prefix and call the validator."""
if self._prefix is not None and not token.startswith(self._prefix):
raise UnauthorizedError()
return await self._validator(token, **self._kwargs)
async def extract(self, request: Any) -> str | None:
"""Extract the raw credential from the request without validating.
Returns ``None`` if no ``Authorization: Bearer`` header is present,
the token is empty, or the token does not match the configured prefix.
The prefix is included in the returned value.
"""
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
return None
token = auth[7:]
if not token:
return None
if self._prefix is not None and not token.startswith(self._prefix):
return None
return token
async def authenticate(self, credential: str) -> Any:
"""Validate a credential and return the identity.
Calls ``await validator(credential, **kwargs)`` where ``kwargs`` are
the extra keyword arguments provided at instantiation.
"""
return await self._validate(credential)
def require(self, **kwargs: Any) -> "BearerTokenAuth":
"""Return a new instance with additional (or overriding) validator kwargs."""
return BearerTokenAuth(
self._validator,
prefix=self._prefix,
**{**self._kwargs, **kwargs},
)
def generate_token(self, nbytes: int = 32) -> str:
"""Generate a secure random token for this auth source.
Returns a URL-safe random token. If a prefix is configured it is
prepended — the returned value is what you store in your database
and return to the client as-is.
Args:
nbytes: Number of random bytes before base64 encoding. The
resulting string is ``ceil(nbytes * 4 / 3)`` characters
(43 chars for the default 32 bytes). Defaults to 32.
Returns:
A ready-to-use token string (e.g. ``"user_Xk3..."``).
"""
token = secrets.token_urlsafe(nbytes)
if self._prefix is not None:
return f"{self._prefix}{token}"
return token

View File

@@ -1,139 +0,0 @@
"""Cookie-based authentication source."""
import base64
import hashlib
import hmac
import inspect
import json
import time
from typing import Annotated, Any, Callable
from fastapi import Depends, Request, Response
from fastapi.security import APIKeyCookie, SecurityScopes
from fastapi_toolsets.exceptions import UnauthorizedError
from ..abc import AuthSource, _ensure_async
class CookieAuth(AuthSource):
"""Cookie-based authentication source.
Wraps :class:`fastapi.security.APIKeyCookie` for OpenAPI documentation.
Optionally signs the cookie with HMAC-SHA256 to provide stateless, tamper-
proof sessions without any database entry.
Args:
name: Cookie name.
validator: Sync or async callable that receives the cookie value
(plain, after signature verification when ``secret_key`` is set)
and any extra keyword arguments, and returns the authenticated
identity.
secret_key: When provided, the cookie is HMAC-SHA256 signed.
:meth:`set_cookie` embeds an expiry and signs the payload;
:meth:`extract` verifies the signature and expiry before handing
the plain value to the validator. When ``None`` (default), the raw
cookie value is passed to the validator as-is.
ttl: Cookie lifetime in seconds (default 24 h). Only used when
``secret_key`` is set.
**kwargs: Extra keyword arguments forwarded to the validator on every
call (e.g. ``role=Role.ADMIN``).
"""
def __init__(
self,
name: str,
validator: Callable[..., Any],
*,
secret_key: str | None = None,
ttl: int = 86400,
**kwargs: Any,
) -> None:
self._name = name
self._validator = _ensure_async(validator)
self._secret_key = secret_key
self._ttl = ttl
self._kwargs = kwargs
self._scheme = APIKeyCookie(name=name, auto_error=False)
async def _call(
security_scopes: SecurityScopes, # noqa: ARG001
value: Annotated[str | None, Depends(self._scheme)] = None,
) -> Any:
if value is None:
raise UnauthorizedError()
plain = self._verify(value)
return await self._validator(plain, **self._kwargs)
self._call_fn = _call
self.__signature__ = inspect.signature(_call)
def _hmac(self, data: str) -> str:
if self._secret_key is None:
raise RuntimeError("_hmac called without secret_key configured")
return hmac.new(
self._secret_key.encode(), data.encode(), hashlib.sha256
).hexdigest()
def _sign(self, value: str) -> str:
data = base64.urlsafe_b64encode(
json.dumps({"v": value, "exp": int(time.time()) + self._ttl}).encode()
).decode()
return f"{data}.{self._hmac(data)}"
def _verify(self, cookie_value: str) -> str:
"""Return the plain value, verifying HMAC + expiry when signed."""
if not self._secret_key:
return cookie_value
try:
data, sig = cookie_value.rsplit(".", 1)
except ValueError:
raise UnauthorizedError()
if not hmac.compare_digest(self._hmac(data), sig):
raise UnauthorizedError()
try:
payload = json.loads(base64.urlsafe_b64decode(data))
value: str = payload["v"]
exp: int = payload["exp"]
except Exception:
raise UnauthorizedError()
if exp < int(time.time()):
raise UnauthorizedError()
return value
async def extract(self, request: Request) -> str | None:
return request.cookies.get(self._name)
async def authenticate(self, credential: str) -> Any:
plain = self._verify(credential)
return await self._validator(plain, **self._kwargs)
def require(self, **kwargs: Any) -> "CookieAuth":
"""Return a new instance with additional (or overriding) validator kwargs."""
return CookieAuth(
self._name,
self._validator,
secret_key=self._secret_key,
ttl=self._ttl,
**{**self._kwargs, **kwargs},
)
def set_cookie(self, response: Response, value: str) -> None:
"""Attach the cookie to *response*, signing it when ``secret_key`` is set."""
cookie_value = self._sign(value) if self._secret_key else value
response.set_cookie(
self._name,
cookie_value,
httponly=True,
samesite="lax",
max_age=self._ttl,
)
def delete_cookie(self, response: Response) -> None:
"""Clear the session cookie (logout)."""
response.delete_cookie(self._name, httponly=True, samesite="lax")

View File

@@ -1,67 +0,0 @@
"""API key header authentication source."""
import inspect
from typing import Annotated, Any, Callable
from fastapi import Depends, Request
from fastapi.security import APIKeyHeader, SecurityScopes
from fastapi_toolsets.exceptions import UnauthorizedError
from ..abc import AuthSource, _ensure_async
class APIKeyHeaderAuth(AuthSource):
"""API key header authentication source.
Wraps :class:`fastapi.security.APIKeyHeader` for OpenAPI documentation.
The validator is called as ``await validator(api_key, **kwargs)``
where ``kwargs`` are the extra keyword arguments provided at instantiation.
Args:
name: HTTP header name that carries the API key (e.g. ``"X-API-Key"``).
validator: Sync or async callable that receives the API key and any
extra keyword arguments, and returns the authenticated identity.
Should raise :class:`~fastapi_toolsets.exceptions.UnauthorizedError`
on failure.
**kwargs: Extra keyword arguments forwarded to the validator on every
call (e.g. ``role=Role.ADMIN``).
"""
def __init__(
self,
name: str,
validator: Callable[..., Any],
**kwargs: Any,
) -> None:
self._name = name
self._validator = _ensure_async(validator)
self._kwargs = kwargs
self._scheme = APIKeyHeader(name=name, auto_error=False)
async def _call(
security_scopes: SecurityScopes, # noqa: ARG001
api_key: Annotated[str | None, Depends(self._scheme)] = None,
) -> Any:
if api_key is None:
raise UnauthorizedError()
return await self._validator(api_key, **self._kwargs)
self._call_fn = _call
self.__signature__ = inspect.signature(_call)
async def extract(self, request: Request) -> str | None:
"""Extract the API key from the configured header."""
return request.headers.get(self._name) or None
async def authenticate(self, credential: str) -> Any:
"""Validate a credential and return the identity."""
return await self._validator(credential, **self._kwargs)
def require(self, **kwargs: Any) -> "APIKeyHeaderAuth":
"""Return a new instance with additional (or overriding) validator kwargs."""
return APIKeyHeaderAuth(
self._name,
self._validator,
**{**self._kwargs, **kwargs},
)

View File

@@ -1,119 +0,0 @@
"""MultiAuth: combine multiple authentication sources into a single callable."""
import inspect
from typing import Any, cast
from fastapi import Request
from fastapi.security import SecurityScopes
from fastapi_toolsets.exceptions import UnauthorizedError
from ..abc import AuthSource
class MultiAuth:
"""Combine multiple authentication sources into a single callable.
Sources are tried in order; the first one whose
:meth:`~AuthSource.extract` returns a non-``None`` credential wins.
Its :meth:`~AuthSource.authenticate` is called and the result returned.
If a credential is found but the validator raises, the exception propagates
immediately — the remaining sources are **not** tried. This prevents
silent fallthrough on invalid credentials.
If no source provides a credential,
:class:`~fastapi_toolsets.exceptions.UnauthorizedError` is raised.
The :meth:`~AuthSource.extract` method of each source performs only
string matching (no I/O), so prefix-based dispatch is essentially free.
Any :class:`~AuthSource` subclass — including user-defined ones — can be
passed as a source.
Args:
*sources: Auth source instances to try in order.
Example::
user_bearer = BearerTokenAuth(verify_user, prefix="user_")
org_bearer = BearerTokenAuth(verify_org, prefix="org_")
cookie = CookieAuth("session", verify_session)
multi = MultiAuth(user_bearer, org_bearer, cookie)
@app.get("/data")
async def data_route(user = Security(multi)):
return user
# Apply a shared requirement to all sources at once
@app.get("/admin")
async def admin_route(user = Security(multi.require(role=Role.ADMIN))):
return user
"""
def __init__(self, *sources: AuthSource) -> None:
self._sources = sources
async def _call(
request: Request,
security_scopes: SecurityScopes, # noqa: ARG001
**kwargs: Any, # noqa: ARG001 — absorbs scheme values injected by FastAPI
) -> Any:
for source in self._sources:
credential = await source.extract(request)
if credential is not None:
return await source.authenticate(credential)
raise UnauthorizedError()
self._call_fn = _call
# Build a merged signature that includes the security-scheme Depends()
# parameters from every source so FastAPI registers them in OpenAPI docs.
seen: set[str] = {"request", "security_scopes"}
merged: list[inspect.Parameter] = [
inspect.Parameter(
"request",
inspect.Parameter.POSITIONAL_OR_KEYWORD,
annotation=Request,
),
inspect.Parameter(
"security_scopes",
inspect.Parameter.POSITIONAL_OR_KEYWORD,
annotation=SecurityScopes,
),
]
for i, source in enumerate(sources):
for name, param in inspect.signature(source).parameters.items():
if name in seen:
continue
merged.append(param.replace(name=f"_s{i}_{name}"))
seen.add(name)
self.__signature__ = inspect.Signature(merged, return_annotation=Any)
async def __call__(self, **kwargs: Any) -> Any:
return await self._call_fn(**kwargs)
def require(self, **kwargs: Any) -> "MultiAuth":
"""Return a new :class:`MultiAuth` with kwargs forwarded to each source.
Calls ``.require(**kwargs)`` on every source that supports it. Sources
that do not implement ``.require()`` (e.g. custom :class:`~AuthSource`
subclasses) are passed through unchanged.
New kwargs are merged over each source's existing kwargs — new values
win on conflict::
multi = MultiAuth(bearer, cookie)
@app.get("/admin")
async def admin(user = Security(multi.require(role=Role.ADMIN))):
return user
"""
new_sources = tuple(
cast(Any, source).require(**kwargs)
if hasattr(source, "require")
else source
for source in self._sources
)
return MultiAuth(*new_sources)

View File

@@ -16,6 +16,7 @@ SchemaType = TypeVar("SchemaType", bound=BaseModel)
# CRUD type aliases
JoinType = list[tuple[type[DeclarativeBase] | Any, Any]]
LateralJoinType = list[tuple[Any, Any]]
M2MFieldType = Mapping[str, QueryableAttribute[Any]]
OrderByClause = ColumnElement[Any] | QueryableAttribute[Any]

View File

@@ -2,6 +2,7 @@
import os
import uuid
from enum import Enum
import pytest
from pydantic import BaseModel
@@ -12,6 +13,7 @@ from sqlalchemy import (
Column,
Date,
DateTime,
Enum as SAEnum,
ForeignKey,
Integer,
JSON,
@@ -139,6 +141,35 @@ class Post(Base):
tags: Mapped[list[Tag]] = relationship(secondary=post_tags)
class OrderStatus(int, Enum):
"""Integer-backed enum for order status."""
PENDING = 1
PROCESSING = 2
SHIPPED = 3
CANCELLED = 4
class Color(str, Enum):
"""String-backed enum for color."""
RED = "red"
GREEN = "green"
BLUE = "blue"
class Order(Base):
"""Test model with an IntEnum column (Enum(int, Enum)) and a raw Integer column."""
__tablename__ = "orders"
id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(String(100))
status: Mapped[OrderStatus] = mapped_column(SAEnum(OrderStatus))
priority: Mapped[int] = mapped_column(Integer)
color: Mapped[Color] = mapped_column(SAEnum(Color))
class Transfer(Base):
"""Test model with two FKs to the same table (users)."""
@@ -311,6 +342,26 @@ class ArticleRead(PydanticBase):
labels: list[str]
class OrderCreate(BaseModel):
"""Schema for creating an order."""
id: uuid.UUID | None = None
name: str
status: OrderStatus
priority: int = 0
color: Color = Color.RED
class OrderRead(PydanticBase):
"""Schema for reading an order."""
id: uuid.UUID
name: str
status: OrderStatus
priority: int
color: Color
class TransferCreate(BaseModel):
"""Schema for creating a transfer."""
@@ -327,6 +378,7 @@ class TransferRead(PydanticBase):
amount: str
OrderCrud = CrudFactory(Order)
TransferCrud = CrudFactory(Transfer)
ArticleCrud = CrudFactory(Article)
RoleCrud = CrudFactory(Role)

View File

@@ -6,9 +6,15 @@ import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from fastapi_toolsets.crud import CrudFactory, PaginationType
from fastapi_toolsets.crud.factory import AsyncCrud, _CursorDirection
from fastapi_toolsets.crud import CrudFactory, PaginationType, lateral_load
from fastapi_toolsets.crud.factory import (
AsyncCrud,
_CursorDirection,
_LateralLoad,
_ResolvedLateral,
)
from fastapi_toolsets.exceptions import NotFoundError
from fastapi_toolsets.schemas import PydanticBase
from .conftest import (
EventCreate,
@@ -51,6 +57,12 @@ from .conftest import (
)
class UserWithRoleRead(PydanticBase):
id: uuid.UUID
username: str
role: RoleRead | None = None
class TestCrudFactory:
"""Tests for CrudFactory."""
@@ -208,11 +220,11 @@ class TestResolveLoadOptions:
assert crud._resolve_load_options(None) is None
def test_empty_list_overrides_default(self):
"""An empty list is a valid override and disables default_load_options."""
"""An explicit empty list disables default_load_options (no options applied)."""
default = [selectinload(User.role)]
crud = CrudFactory(User, default_load_options=default)
# Empty list is not None, so it should replace default
assert crud._resolve_load_options([]) == []
# Empty list replaces default; None and [] are both falsy → no options applied
assert not crud._resolve_load_options([])
class TestResolveSearchColumns:
@@ -359,13 +371,6 @@ class TestDefaultLoadOptionsIntegration:
self, db_session: AsyncSession
):
"""default_load_options loads relationships automatically on offset_paginate()."""
from fastapi_toolsets.schemas import PydanticBase
class UserWithRoleRead(PydanticBase):
id: uuid.UUID
username: str
role: RoleRead | None = None
UserWithDefaultLoad = CrudFactory(
User, default_load_options=[selectinload(User.role)]
)
@@ -2462,12 +2467,7 @@ class TestCursorPaginateExtraOptions:
@pytest.mark.anyio
async def test_with_load_options(self, db_session: AsyncSession):
"""cursor_paginate passes load_options to the query."""
from fastapi_toolsets.schemas import CursorPagination, PydanticBase
class UserWithRoleRead(PydanticBase):
id: uuid.UUID
username: str
role: RoleRead | None = None
from fastapi_toolsets.schemas import CursorPagination
role = await RoleCrud.create(db_session, RoleCreate(name="manager"))
for i in range(3):
@@ -2833,3 +2833,445 @@ class TestPaginate:
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count is None
class TestLateralLoadValidation:
"""lateral_load() raises immediately for bad relationship types."""
def test_valid_many_to_one_returns_marker(self):
"""lateral_load() on a Many:One rel returns a _LateralLoad with rel_attr set."""
marker = lateral_load(User.role)
assert isinstance(marker, _LateralLoad)
assert marker.rel_attr is User.role
def test_raises_type_error_for_plain_column(self):
"""lateral_load() raises TypeError when passed a plain column."""
with pytest.raises(TypeError, match="relationship attribute"):
lateral_load(User.username)
def test_raises_value_error_for_many_to_many(self):
"""lateral_load() raises ValueError for Many:Many (secondary table)."""
with pytest.raises(ValueError, match="Many:Many"):
lateral_load(Post.tags)
def test_raises_value_error_for_one_to_many(self):
"""lateral_load() raises ValueError for One:Many (uselist=True)."""
with pytest.raises(ValueError, match="One:Many"):
lateral_load(Role.users)
class TestLateralLoadInSubclass:
"""lateral_load() markers in default_load_options are processed at class definition."""
def test_marker_extracted_from_default_load_options(self):
"""_LateralLoad is removed from default_load_options and stored in _resolved_lateral."""
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
assert UserLateralCrud.default_load_options is None
assert UserLateralCrud._resolved_lateral is not None
def test_resolved_lateral_has_one_join_and_eager(self):
"""_resolved_lateral contains exactly one join and one eager option."""
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
resolved = UserLateralCrud._resolved_lateral
assert isinstance(resolved, _ResolvedLateral)
assert len(resolved.joins) == 1
assert len(resolved.eager) == 1
def test_regular_options_preserved_alongside_lateral(self):
"""Non-lateral opts stay in default_load_options; lateral marker is extracted."""
regular = selectinload(User.role)
class UserMixedCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role), regular]
assert UserMixedCrud._resolved_lateral is not None
assert UserMixedCrud.default_load_options == [regular]
def test_no_lateral_leaves_default_load_options_untouched(self):
"""When no lateral marker is present, default_load_options is unchanged."""
opts = [selectinload(User.role)]
class UserNormalCrud(AsyncCrud[User]):
model = User
default_load_options = opts
assert UserNormalCrud.default_load_options is opts
assert UserNormalCrud._resolved_lateral is None
def test_no_default_load_options_leaves_resolved_lateral_none(self):
"""_resolved_lateral stays None when default_load_options is not set."""
class UserPlainCrud(AsyncCrud[User]):
model = User
assert UserPlainCrud._resolved_lateral is None
class TestResolveLoadOptionsWithLateral:
"""_resolve_load_options always appends lateral eager options."""
def test_lateral_eager_included_when_no_call_site_opts(self):
"""contains_eager from lateral_load is returned when load_options=None."""
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
resolved = UserLateralCrud._resolve_load_options(None)
assert resolved is not None
assert len(resolved) == 1 # the contains_eager
def test_call_site_opts_bypass_lateral_eager(self):
"""When call-site load_options are provided, lateral eager is NOT appended."""
extra = selectinload(User.role)
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
resolved = UserLateralCrud._resolve_load_options([extra])
assert resolved is not None
assert len(resolved) == 1 # only the call-site option; lateral eager skipped
def test_lateral_eager_appended_to_default_load_options(self):
"""default_load_options (regular) + lateral eager are both returned."""
regular = selectinload(User.role)
class UserMixedCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role), regular]
resolved = UserMixedCrud._resolve_load_options(None)
assert resolved is not None
assert len(resolved) == 2
class TestGetLateralJoins:
"""_get_lateral_joins merges auto-resolved and manual lateral_joins."""
def test_returns_none_when_no_lateral_configured(self):
"""Returns None when neither lateral_joins nor lateral_load is set."""
class UserPlainCrud(AsyncCrud[User]):
model = User
assert UserPlainCrud._get_lateral_joins() is None
def test_returns_resolved_lateral_joins(self):
"""Returns the join tuple built from lateral_load()."""
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
joins = UserLateralCrud._get_lateral_joins()
assert joins is not None
assert len(joins) == 1
def test_manual_lateral_joins_included(self):
"""Manual lateral_joins class var is included in _get_lateral_joins."""
from sqlalchemy import select, true
manual_sub = select(Role).where(Role.id == User.role_id).lateral("_manual_role")
class UserManualCrud(AsyncCrud[User]):
model = User
lateral_joins = [(manual_sub, true())]
joins = UserManualCrud._get_lateral_joins()
assert joins is not None
assert len(joins) == 1
def test_manual_and_auto_lateral_joins_merged(self):
"""Both manual lateral_joins and auto-resolved from lateral_load are combined."""
from sqlalchemy import select, true
manual_sub = select(Role).where(Role.id == User.role_id).lateral("_manual_role")
class UserBothCrud(AsyncCrud[User]):
model = User
lateral_joins = [(manual_sub, true())]
default_load_options = [lateral_load(User.role)]
joins = UserBothCrud._get_lateral_joins()
assert joins is not None
assert len(joins) == 2
class TestLateralLoadIntegration:
"""lateral_load() in real DB queries: relationship loaded, pagination correct."""
@pytest.mark.anyio
async def test_get_loads_relationship(self, db_session: AsyncSession):
"""get() populates the relationship via lateral join."""
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
role = await RoleCrud.create(db_session, RoleCreate(name="admin"))
user = await UserCrud.create(
db_session,
UserCreate(username="alice", email="alice@test.com", role_id=role.id),
)
fetched = await UserLateralCrud.get(db_session, [User.id == user.id])
assert fetched.role is not None
assert fetched.role.name == "admin"
@pytest.mark.anyio
async def test_get_null_fk_preserved(self, db_session: AsyncSession):
"""User with null role_id still returned (LEFT JOIN behaviour)."""
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
user = await UserCrud.create(
db_session, UserCreate(username="bob", email="bob@test.com")
)
fetched = await UserLateralCrud.get(db_session, [User.id == user.id])
assert fetched is not None
assert fetched.role is None
@pytest.mark.anyio
async def test_first_loads_relationship(self, db_session: AsyncSession):
"""first() populates the relationship via lateral join."""
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
role = await RoleCrud.create(db_session, RoleCreate(name="editor"))
await UserCrud.create(
db_session,
UserCreate(username="carol", email="carol@test.com", role_id=role.id),
)
user = await UserLateralCrud.first(db_session)
assert user is not None
assert user.role is not None
assert user.role.name == "editor"
@pytest.mark.anyio
async def test_get_multi_loads_relationship(self, db_session: AsyncSession):
"""get_multi() populates the relationship via lateral join for all rows."""
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
role = await RoleCrud.create(db_session, RoleCreate(name="member"))
for i in range(3):
await UserCrud.create(
db_session,
UserCreate(
username=f"user{i}", email=f"u{i}@test.com", role_id=role.id
),
)
users = await UserLateralCrud.get_multi(db_session)
assert len(users) == 3
assert all(u.role is not None and u.role.name == "member" for u in users)
@pytest.mark.anyio
async def test_offset_paginate_correct_count(self, db_session: AsyncSession):
"""offset_paginate total_count is not inflated by the lateral join."""
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
role = await RoleCrud.create(db_session, RoleCreate(name="admin"))
for i in range(5):
await UserCrud.create(
db_session,
UserCreate(
username=f"user{i}", email=f"u{i}@test.com", role_id=role.id
),
)
result = await UserLateralCrud.offset_paginate(
db_session, schema=UserWithRoleRead, items_per_page=10
)
assert result.pagination.total_count == 5
assert len(result.data) == 5
@pytest.mark.anyio
async def test_offset_paginate_loads_relationship(self, db_session: AsyncSession):
"""offset_paginate serializes relationship data loaded via lateral."""
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
role = await RoleCrud.create(db_session, RoleCreate(name="admin"))
await UserCrud.create(
db_session,
UserCreate(username="alice", email="alice@test.com", role_id=role.id),
)
result = await UserLateralCrud.offset_paginate(
db_session, schema=UserWithRoleRead, items_per_page=10
)
assert result.data[0].role is not None
assert result.data[0].role.name == "admin"
@pytest.mark.anyio
async def test_offset_paginate_mixed_null_fk(self, db_session: AsyncSession):
"""offset_paginate returns all users including those with null role_id."""
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
role = await RoleCrud.create(db_session, RoleCreate(name="admin"))
await UserCrud.create(
db_session,
UserCreate(username="with_role", email="a@test.com", role_id=role.id),
)
await UserCrud.create(
db_session, UserCreate(username="no_role", email="b@test.com")
)
result = await UserLateralCrud.offset_paginate(
db_session, schema=UserWithRoleRead, items_per_page=10
)
assert result.pagination.total_count == 2
@pytest.mark.anyio
async def test_cursor_paginate_loads_relationship(self, db_session: AsyncSession):
"""cursor_paginate populates the relationship via lateral join."""
class UserLateralCursorCrud(AsyncCrud[User]):
model = User
cursor_column = User.id
default_load_options = [lateral_load(User.role)]
role = await RoleCrud.create(db_session, RoleCreate(name="admin"))
for i in range(3):
await UserCrud.create(
db_session,
UserCreate(
username=f"user{i}", email=f"u{i}@test.com", role_id=role.id
),
)
result = await UserLateralCursorCrud.cursor_paginate(
db_session, schema=UserWithRoleRead, items_per_page=10
)
assert len(result.data) == 3
assert all(item.role is not None for item in result.data)
@pytest.mark.anyio
async def test_offset_paginate_with_search_and_lateral(
self, db_session: AsyncSession
):
"""search filter works alongside lateral join."""
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
searchable_fields = [User.username]
role = await RoleCrud.create(db_session, RoleCreate(name="admin"))
await UserCrud.create(
db_session,
UserCreate(username="alice", email="a@test.com", role_id=role.id),
)
await UserCrud.create(
db_session, UserCreate(username="bob", email="b@test.com", role_id=role.id)
)
result = await UserLateralCrud.offset_paginate(
db_session, schema=UserWithRoleRead, search="alice", items_per_page=10
)
assert result.pagination.total_count == 1
assert result.data[0].username == "alice"
@pytest.mark.anyio
async def test_first_call_site_load_options_bypasses_lateral(
self, db_session: AsyncSession
):
"""When load_options is provided, lateral join is skipped (no conflict)."""
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
role = await RoleCrud.create(db_session, RoleCreate(name="admin"))
user = await UserCrud.create(
db_session,
UserCreate(username="alice", email="alice@test.com", role_id=role.id),
)
# Passing explicit load_options bypasses the lateral join — role loaded via selectinload
fetched = await UserLateralCrud.first(
db_session,
filters=[User.id == user.id],
load_options=[selectinload(User.role)],
)
assert fetched is not None
assert fetched.role is not None
assert fetched.role.name == "admin"
@pytest.mark.anyio
async def test_get_multi_call_site_load_options_bypasses_lateral(
self, db_session: AsyncSession
):
"""When load_options is provided, lateral join is skipped (no conflict)."""
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
role = await RoleCrud.create(db_session, RoleCreate(name="viewer"))
for i in range(2):
await UserCrud.create(
db_session,
UserCreate(username=f"u{i}", email=f"u{i}@test.com", role_id=role.id),
)
# Passing explicit load_options bypasses the lateral join — role loaded via selectinload
users = await UserLateralCrud.get_multi(
db_session, load_options=[selectinload(User.role)]
)
assert len(users) == 2
assert all(u.role is not None and u.role.name == "viewer" for u in users)
@pytest.mark.anyio
async def test_offset_paginate_call_site_load_options_bypasses_lateral(
self, db_session: AsyncSession
):
"""When load_options is provided, lateral join is skipped (no conflict)."""
class UserLateralCrud(AsyncCrud[User]):
model = User
default_load_options = [lateral_load(User.role)]
role = await RoleCrud.create(db_session, RoleCreate(name="editor"))
for i in range(3):
await UserCrud.create(
db_session,
UserCreate(username=f"e{i}", email=f"e{i}@test.com", role_id=role.id),
)
# Passing explicit load_options bypasses the lateral join — role loaded via selectinload
result = await UserLateralCrud.offset_paginate(
db_session,
schema=UserWithRoleRead,
items_per_page=10,
load_options=[selectinload(User.role)],
)
assert result.pagination.total_count == 3
assert all(item.role is not None for item in result.data)

View File

@@ -23,6 +23,12 @@ from .conftest import (
ArticleCreate,
ArticleCrud,
ArticleRead,
Color,
Order,
OrderCreate,
OrderCrud,
OrderRead,
OrderStatus,
Role,
RoleCreate,
RoleCrud,
@@ -1121,6 +1127,253 @@ class TestFilterBy:
assert "JSON" in exc_info.value.col_type
class TestFilterByIntEnum:
"""Tests for filter_by on columns typed as (int, Enum) / IntEnum."""
@pytest.mark.anyio
async def test_filter_by_intenum_member(self, db_session: AsyncSession):
"""filter_by with an IntEnum member value filters correctly."""
OrderFacetCrud = CrudFactory(Order, facet_fields=[Order.status])
await OrderCrud.create(
db_session, OrderCreate(name="order-1", status=OrderStatus.PENDING)
)
await OrderCrud.create(
db_session, OrderCreate(name="order-2", status=OrderStatus.SHIPPED)
)
await OrderCrud.create(
db_session, OrderCreate(name="order-3", status=OrderStatus.PENDING)
)
result = await OrderFacetCrud.offset_paginate(
db_session,
filter_by={"status": OrderStatus.PENDING},
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 2
names = {o.name for o in result.data}
assert names == {"order-1", "order-3"}
@pytest.mark.anyio
async def test_filter_by_plain_int_value_raises(self, db_session: AsyncSession):
"""filter_by with a plain int on an IntEnum column raises KeyError — use name or member."""
OrderFacetCrud = CrudFactory(Order, facet_fields=[Order.status])
with pytest.raises(KeyError):
await OrderFacetCrud.offset_paginate(
db_session,
filter_by={"status": 1},
schema=OrderRead,
)
@pytest.mark.anyio
async def test_filter_by_intenum_list(self, db_session: AsyncSession):
"""filter_by with a list of IntEnum members produces an IN filter."""
OrderFacetCrud = CrudFactory(Order, facet_fields=[Order.status])
await OrderCrud.create(
db_session, OrderCreate(name="order-1", status=OrderStatus.PENDING)
)
await OrderCrud.create(
db_session, OrderCreate(name="order-2", status=OrderStatus.SHIPPED)
)
await OrderCrud.create(
db_session, OrderCreate(name="order-3", status=OrderStatus.CANCELLED)
)
result = await OrderFacetCrud.offset_paginate(
db_session,
filter_by={"status": [OrderStatus.PENDING, OrderStatus.SHIPPED]},
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 2
names = {o.name for o in result.data}
assert names == {"order-1", "order-2"}
@pytest.mark.anyio
async def test_filter_by_plain_int_list_raises(self, db_session: AsyncSession):
"""filter_by with a list of plain ints on an IntEnum column raises KeyError — use names or members."""
OrderFacetCrud = CrudFactory(Order, facet_fields=[Order.status])
with pytest.raises(KeyError):
await OrderFacetCrud.offset_paginate(
db_session,
filter_by={"status": [1, 3]},
schema=OrderRead,
)
@pytest.mark.anyio
async def test_filter_by_intenum_name_string(self, db_session: AsyncSession):
"""filter_by with the enum member name as a string filters correctly."""
OrderFacetCrud = CrudFactory(Order, facet_fields=[Order.status])
await OrderCrud.create(
db_session, OrderCreate(name="order-1", status=OrderStatus.PENDING)
)
await OrderCrud.create(
db_session, OrderCreate(name="order-2", status=OrderStatus.SHIPPED)
)
result = await OrderFacetCrud.offset_paginate(
db_session,
filter_by={
"status": "PENDING"
}, # name as string, e.g. from HTTP query param
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 1
assert result.data[0].name == "order-1"
@pytest.mark.anyio
async def test_filter_by_intenum_name_string_list(self, db_session: AsyncSession):
"""filter_by with a list of enum name strings produces an IN filter."""
OrderFacetCrud = CrudFactory(Order, facet_fields=[Order.status])
await OrderCrud.create(
db_session, OrderCreate(name="order-1", status=OrderStatus.PENDING)
)
await OrderCrud.create(
db_session, OrderCreate(name="order-2", status=OrderStatus.SHIPPED)
)
await OrderCrud.create(
db_session, OrderCreate(name="order-3", status=OrderStatus.CANCELLED)
)
result = await OrderFacetCrud.offset_paginate(
db_session,
filter_by={"status": ["PENDING", "SHIPPED"]},
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 2
names = {o.name for o in result.data}
assert names == {"order-1", "order-2"}
class TestFilterByStrEnum:
"""Tests for filter_by on columns typed as (str, Enum) / StrEnum (lines 364-367)."""
@pytest.mark.anyio
async def test_filter_by_strenum_member(self, db_session: AsyncSession):
"""filter_by with a StrEnum member on a string Enum column filters correctly."""
OrderColorCrud = CrudFactory(Order, facet_fields=[Order.color])
await OrderCrud.create(
db_session,
OrderCreate(name="red-order", status=OrderStatus.PENDING, color=Color.RED),
)
await OrderCrud.create(
db_session,
OrderCreate(
name="blue-order", status=OrderStatus.PENDING, color=Color.BLUE
),
)
result = await OrderColorCrud.offset_paginate(
db_session,
filter_by={"color": Color.RED},
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 1
assert result.data[0].name == "red-order"
@pytest.mark.anyio
async def test_filter_by_strenum_list(self, db_session: AsyncSession):
"""filter_by with a list of StrEnum members produces an IN filter."""
OrderColorCrud = CrudFactory(Order, facet_fields=[Order.color])
await OrderCrud.create(
db_session,
OrderCreate(name="red-order", status=OrderStatus.PENDING, color=Color.RED),
)
await OrderCrud.create(
db_session,
OrderCreate(
name="green-order", status=OrderStatus.PENDING, color=Color.GREEN
),
)
await OrderCrud.create(
db_session,
OrderCreate(
name="blue-order", status=OrderStatus.PENDING, color=Color.BLUE
),
)
result = await OrderColorCrud.offset_paginate(
db_session,
filter_by={"color": [Color.RED, Color.BLUE]},
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 2
names = {o.name for o in result.data}
assert names == {"red-order", "blue-order"}
class TestFilterByIntegerColumn:
"""Tests for filter_by on plain Integer columns with IntEnum values."""
@pytest.mark.anyio
async def test_filter_by_integer_column_with_intenum_member(
self, db_session: AsyncSession
):
"""filter_by with an IntEnum member on an Integer column works correctly."""
OrderPriorityCrud = CrudFactory(Order, facet_fields=[Order.priority])
await OrderCrud.create(
db_session,
OrderCreate(
name="order-1", status=OrderStatus.PENDING, priority=OrderStatus.PENDING
),
)
await OrderCrud.create(
db_session,
OrderCreate(
name="order-2", status=OrderStatus.SHIPPED, priority=OrderStatus.SHIPPED
),
)
result = await OrderPriorityCrud.offset_paginate(
db_session,
filter_by={
"priority": OrderStatus.PENDING
}, # IntEnum member on Integer col
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 1
assert result.data[0].name == "order-1"
@pytest.mark.anyio
async def test_filter_by_integer_column_with_plain_int(
self, db_session: AsyncSession
):
"""filter_by with a plain int on an Integer column works correctly."""
OrderPriorityCrud = CrudFactory(Order, facet_fields=[Order.priority])
await OrderCrud.create(
db_session,
OrderCreate(name="order-1", status=OrderStatus.PENDING, priority=1),
)
await OrderCrud.create(
db_session,
OrderCreate(name="order-2", status=OrderStatus.SHIPPED, priority=3),
)
result = await OrderPriorityCrud.offset_paginate(
db_session,
filter_by={"priority": 1},
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 1
assert result.data[0].name == "order-1"
class TestFilterParamsViaConsolidated:
"""Tests for filter params via consolidated offset_paginate_params()."""

File diff suppressed because it is too large Load Diff

78
uv.lock generated
View File

@@ -81,76 +81,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/3c/d7/8fb3044eaef08a310acfe23dae9a8e2e07d305edc29a53497e52bc76eca7/asyncpg-0.31.0-cp314-cp314t-win_amd64.whl", hash = "sha256:bd4107bb7cdd0e9e65fae66a62afd3a249663b844fa34d479f6d5b3bef9c04c3", size = 706062, upload-time = "2025-11-24T23:26:44.086Z" },
]
[[package]]
name = "bcrypt"
version = "5.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d4/36/3329e2518d70ad8e2e5817d5a4cac6bba05a47767ec416c7d020a965f408/bcrypt-5.0.0.tar.gz", hash = "sha256:f748f7c2d6fd375cc93d3fba7ef4a9e3a092421b8dbf34d8d4dc06be9492dfdd", size = 25386, upload-time = "2025-09-25T19:50:47.829Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/13/85/3e65e01985fddf25b64ca67275bb5bdb4040bd1a53b66d355c6c37c8a680/bcrypt-5.0.0-cp313-cp313t-macosx_10_12_universal2.whl", hash = "sha256:f3c08197f3039bec79cee59a606d62b96b16669cff3949f21e74796b6e3cd2be", size = 481806, upload-time = "2025-09-25T19:49:05.102Z" },
{ url = "https://files.pythonhosted.org/packages/44/dc/01eb79f12b177017a726cbf78330eb0eb442fae0e7b3dfd84ea2849552f3/bcrypt-5.0.0-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:200af71bc25f22006f4069060c88ed36f8aa4ff7f53e67ff04d2ab3f1e79a5b2", size = 268626, upload-time = "2025-09-25T19:49:06.723Z" },
{ url = "https://files.pythonhosted.org/packages/8c/cf/e82388ad5959c40d6afd94fb4743cc077129d45b952d46bdc3180310e2df/bcrypt-5.0.0-cp313-cp313t-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:baade0a5657654c2984468efb7d6c110db87ea63ef5a4b54732e7e337253e44f", size = 271853, upload-time = "2025-09-25T19:49:08.028Z" },
{ url = "https://files.pythonhosted.org/packages/ec/86/7134b9dae7cf0efa85671651341f6afa695857fae172615e960fb6a466fa/bcrypt-5.0.0-cp313-cp313t-manylinux_2_28_aarch64.whl", hash = "sha256:c58b56cdfb03202b3bcc9fd8daee8e8e9b6d7e3163aa97c631dfcfcc24d36c86", size = 269793, upload-time = "2025-09-25T19:49:09.727Z" },
{ url = "https://files.pythonhosted.org/packages/cc/82/6296688ac1b9e503d034e7d0614d56e80c5d1a08402ff856a4549cb59207/bcrypt-5.0.0-cp313-cp313t-manylinux_2_28_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:4bfd2a34de661f34d0bda43c3e4e79df586e4716ef401fe31ea39d69d581ef23", size = 289930, upload-time = "2025-09-25T19:49:11.204Z" },
{ url = "https://files.pythonhosted.org/packages/d1/18/884a44aa47f2a3b88dd09bc05a1e40b57878ecd111d17e5bba6f09f8bb77/bcrypt-5.0.0-cp313-cp313t-manylinux_2_28_x86_64.whl", hash = "sha256:ed2e1365e31fc73f1825fa830f1c8f8917ca1b3ca6185773b349c20fd606cec2", size = 272194, upload-time = "2025-09-25T19:49:12.524Z" },
{ url = "https://files.pythonhosted.org/packages/0e/8f/371a3ab33c6982070b674f1788e05b656cfbf5685894acbfef0c65483a59/bcrypt-5.0.0-cp313-cp313t-manylinux_2_34_aarch64.whl", hash = "sha256:83e787d7a84dbbfba6f250dd7a5efd689e935f03dd83b0f919d39349e1f23f83", size = 269381, upload-time = "2025-09-25T19:49:14.308Z" },
{ url = "https://files.pythonhosted.org/packages/b1/34/7e4e6abb7a8778db6422e88b1f06eb07c47682313997ee8a8f9352e5a6f1/bcrypt-5.0.0-cp313-cp313t-manylinux_2_34_x86_64.whl", hash = "sha256:137c5156524328a24b9fac1cb5db0ba618bc97d11970b39184c1d87dc4bf1746", size = 271750, upload-time = "2025-09-25T19:49:15.584Z" },
{ url = "https://files.pythonhosted.org/packages/c0/1b/54f416be2499bd72123c70d98d36c6cd61a4e33d9b89562c22481c81bb30/bcrypt-5.0.0-cp313-cp313t-musllinux_1_1_aarch64.whl", hash = "sha256:38cac74101777a6a7d3b3e3cfefa57089b5ada650dce2baf0cbdd9d65db22a9e", size = 303757, upload-time = "2025-09-25T19:49:17.244Z" },
{ url = "https://files.pythonhosted.org/packages/13/62/062c24c7bcf9d2826a1a843d0d605c65a755bc98002923d01fd61270705a/bcrypt-5.0.0-cp313-cp313t-musllinux_1_1_x86_64.whl", hash = "sha256:d8d65b564ec849643d9f7ea05c6d9f0cd7ca23bdd4ac0c2dbef1104ab504543d", size = 306740, upload-time = "2025-09-25T19:49:18.693Z" },
{ url = "https://files.pythonhosted.org/packages/d5/c8/1fdbfc8c0f20875b6b4020f3c7dc447b8de60aa0be5faaf009d24242aec9/bcrypt-5.0.0-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:741449132f64b3524e95cd30e5cd3343006ce146088f074f31ab26b94e6c75ba", size = 334197, upload-time = "2025-09-25T19:49:20.523Z" },
{ url = "https://files.pythonhosted.org/packages/a6/c1/8b84545382d75bef226fbc6588af0f7b7d095f7cd6a670b42a86243183cd/bcrypt-5.0.0-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:212139484ab3207b1f0c00633d3be92fef3c5f0af17cad155679d03ff2ee1e41", size = 352974, upload-time = "2025-09-25T19:49:22.254Z" },
{ url = "https://files.pythonhosted.org/packages/10/a6/ffb49d4254ed085e62e3e5dd05982b4393e32fe1e49bb1130186617c29cd/bcrypt-5.0.0-cp313-cp313t-win32.whl", hash = "sha256:9d52ed507c2488eddd6a95bccee4e808d3234fa78dd370e24bac65a21212b861", size = 148498, upload-time = "2025-09-25T19:49:24.134Z" },
{ url = "https://files.pythonhosted.org/packages/48/a9/259559edc85258b6d5fc5471a62a3299a6aa37a6611a169756bf4689323c/bcrypt-5.0.0-cp313-cp313t-win_amd64.whl", hash = "sha256:f6984a24db30548fd39a44360532898c33528b74aedf81c26cf29c51ee47057e", size = 145853, upload-time = "2025-09-25T19:49:25.702Z" },
{ url = "https://files.pythonhosted.org/packages/2d/df/9714173403c7e8b245acf8e4be8876aac64a209d1b392af457c79e60492e/bcrypt-5.0.0-cp313-cp313t-win_arm64.whl", hash = "sha256:9fffdb387abe6aa775af36ef16f55e318dcda4194ddbf82007a6f21da29de8f5", size = 139626, upload-time = "2025-09-25T19:49:26.928Z" },
{ url = "https://files.pythonhosted.org/packages/f8/14/c18006f91816606a4abe294ccc5d1e6f0e42304df5a33710e9e8e95416e1/bcrypt-5.0.0-cp314-cp314t-macosx_10_12_universal2.whl", hash = "sha256:4870a52610537037adb382444fefd3706d96d663ac44cbb2f37e3919dca3d7ef", size = 481862, upload-time = "2025-09-25T19:49:28.365Z" },
{ url = "https://files.pythonhosted.org/packages/67/49/dd074d831f00e589537e07a0725cf0e220d1f0d5d8e85ad5bbff251c45aa/bcrypt-5.0.0-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:48f753100931605686f74e27a7b49238122aa761a9aefe9373265b8b7aa43ea4", size = 268544, upload-time = "2025-09-25T19:49:30.39Z" },
{ url = "https://files.pythonhosted.org/packages/f5/91/50ccba088b8c474545b034a1424d05195d9fcbaaf802ab8bfe2be5a4e0d7/bcrypt-5.0.0-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:f70aadb7a809305226daedf75d90379c397b094755a710d7014b8b117df1ebbf", size = 271787, upload-time = "2025-09-25T19:49:32.144Z" },
{ url = "https://files.pythonhosted.org/packages/aa/e7/d7dba133e02abcda3b52087a7eea8c0d4f64d3e593b4fffc10c31b7061f3/bcrypt-5.0.0-cp314-cp314t-manylinux_2_28_aarch64.whl", hash = "sha256:744d3c6b164caa658adcb72cb8cc9ad9b4b75c7db507ab4bc2480474a51989da", size = 269753, upload-time = "2025-09-25T19:49:33.885Z" },
{ url = "https://files.pythonhosted.org/packages/33/fc/5b145673c4b8d01018307b5c2c1fc87a6f5a436f0ad56607aee389de8ee3/bcrypt-5.0.0-cp314-cp314t-manylinux_2_28_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:a28bc05039bdf3289d757f49d616ab3efe8cf40d8e8001ccdd621cd4f98f4fc9", size = 289587, upload-time = "2025-09-25T19:49:35.144Z" },
{ url = "https://files.pythonhosted.org/packages/27/d7/1ff22703ec6d4f90e62f1a5654b8867ef96bafb8e8102c2288333e1a6ca6/bcrypt-5.0.0-cp314-cp314t-manylinux_2_28_x86_64.whl", hash = "sha256:7f277a4b3390ab4bebe597800a90da0edae882c6196d3038a73adf446c4f969f", size = 272178, upload-time = "2025-09-25T19:49:36.793Z" },
{ url = "https://files.pythonhosted.org/packages/c8/88/815b6d558a1e4d40ece04a2f84865b0fef233513bd85fd0e40c294272d62/bcrypt-5.0.0-cp314-cp314t-manylinux_2_34_aarch64.whl", hash = "sha256:79cfa161eda8d2ddf29acad370356b47f02387153b11d46042e93a0a95127493", size = 269295, upload-time = "2025-09-25T19:49:38.164Z" },
{ url = "https://files.pythonhosted.org/packages/51/8c/e0db387c79ab4931fc89827d37608c31cc57b6edc08ccd2386139028dc0d/bcrypt-5.0.0-cp314-cp314t-manylinux_2_34_x86_64.whl", hash = "sha256:a5393eae5722bcef046a990b84dff02b954904c36a194f6cfc817d7dca6c6f0b", size = 271700, upload-time = "2025-09-25T19:49:39.917Z" },
{ url = "https://files.pythonhosted.org/packages/06/83/1570edddd150f572dbe9fc00f6203a89fc7d4226821f67328a85c330f239/bcrypt-5.0.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:7f4c94dec1b5ab5d522750cb059bb9409ea8872d4494fd152b53cca99f1ddd8c", size = 334034, upload-time = "2025-09-25T19:49:41.227Z" },
{ url = "https://files.pythonhosted.org/packages/c9/f2/ea64e51a65e56ae7a8a4ec236c2bfbdd4b23008abd50ac33fbb2d1d15424/bcrypt-5.0.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:0cae4cb350934dfd74c020525eeae0a5f79257e8a201c0c176f4b84fdbf2a4b4", size = 352766, upload-time = "2025-09-25T19:49:43.08Z" },
{ url = "https://files.pythonhosted.org/packages/d7/d4/1a388d21ee66876f27d1a1f41287897d0c0f1712ef97d395d708ba93004c/bcrypt-5.0.0-cp314-cp314t-win32.whl", hash = "sha256:b17366316c654e1ad0306a6858e189fc835eca39f7eb2cafd6aaca8ce0c40a2e", size = 152449, upload-time = "2025-09-25T19:49:44.971Z" },
{ url = "https://files.pythonhosted.org/packages/3f/61/3291c2243ae0229e5bca5d19f4032cecad5dfb05a2557169d3a69dc0ba91/bcrypt-5.0.0-cp314-cp314t-win_amd64.whl", hash = "sha256:92864f54fb48b4c718fc92a32825d0e42265a627f956bc0361fe869f1adc3e7d", size = 149310, upload-time = "2025-09-25T19:49:46.162Z" },
{ url = "https://files.pythonhosted.org/packages/3e/89/4b01c52ae0c1a681d4021e5dd3e45b111a8fb47254a274fa9a378d8d834b/bcrypt-5.0.0-cp314-cp314t-win_arm64.whl", hash = "sha256:dd19cf5184a90c873009244586396a6a884d591a5323f0e8a5922560718d4993", size = 143761, upload-time = "2025-09-25T19:49:47.345Z" },
{ url = "https://files.pythonhosted.org/packages/84/29/6237f151fbfe295fe3e074ecc6d44228faa1e842a81f6d34a02937ee1736/bcrypt-5.0.0-cp38-abi3-macosx_10_12_universal2.whl", hash = "sha256:fc746432b951e92b58317af8e0ca746efe93e66555f1b40888865ef5bf56446b", size = 494553, upload-time = "2025-09-25T19:49:49.006Z" },
{ url = "https://files.pythonhosted.org/packages/45/b6/4c1205dde5e464ea3bd88e8742e19f899c16fa8916fb8510a851fae985b5/bcrypt-5.0.0-cp38-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:c2388ca94ffee269b6038d48747f4ce8df0ffbea43f31abfa18ac72f0218effb", size = 275009, upload-time = "2025-09-25T19:49:50.581Z" },
{ url = "https://files.pythonhosted.org/packages/3b/71/427945e6ead72ccffe77894b2655b695ccf14ae1866cd977e185d606dd2f/bcrypt-5.0.0-cp38-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:560ddb6ec730386e7b3b26b8b4c88197aaed924430e7b74666a586ac997249ef", size = 278029, upload-time = "2025-09-25T19:49:52.533Z" },
{ url = "https://files.pythonhosted.org/packages/17/72/c344825e3b83c5389a369c8a8e58ffe1480b8a699f46c127c34580c4666b/bcrypt-5.0.0-cp38-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:d79e5c65dcc9af213594d6f7f1fa2c98ad3fc10431e7aa53c176b441943efbdd", size = 275907, upload-time = "2025-09-25T19:49:54.709Z" },
{ url = "https://files.pythonhosted.org/packages/0b/7e/d4e47d2df1641a36d1212e5c0514f5291e1a956a7749f1e595c07a972038/bcrypt-5.0.0-cp38-abi3-manylinux_2_28_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:2b732e7d388fa22d48920baa267ba5d97cca38070b69c0e2d37087b381c681fd", size = 296500, upload-time = "2025-09-25T19:49:56.013Z" },
{ url = "https://files.pythonhosted.org/packages/0f/c3/0ae57a68be2039287ec28bc463b82e4b8dc23f9d12c0be331f4782e19108/bcrypt-5.0.0-cp38-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:0c8e093ea2532601a6f686edbc2c6b2ec24131ff5c52f7610dd64fa4553b5464", size = 278412, upload-time = "2025-09-25T19:49:57.356Z" },
{ url = "https://files.pythonhosted.org/packages/45/2b/77424511adb11e6a99e3a00dcc7745034bee89036ad7d7e255a7e47be7d8/bcrypt-5.0.0-cp38-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:5b1589f4839a0899c146e8892efe320c0fa096568abd9b95593efac50a87cb75", size = 275486, upload-time = "2025-09-25T19:49:59.116Z" },
{ url = "https://files.pythonhosted.org/packages/43/0a/405c753f6158e0f3f14b00b462d8bca31296f7ecfc8fc8bc7919c0c7d73a/bcrypt-5.0.0-cp38-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:89042e61b5e808b67daf24a434d89bab164d4de1746b37a8d173b6b14f3db9ff", size = 277940, upload-time = "2025-09-25T19:50:00.869Z" },
{ url = "https://files.pythonhosted.org/packages/62/83/b3efc285d4aadc1fa83db385ec64dcfa1707e890eb42f03b127d66ac1b7b/bcrypt-5.0.0-cp38-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:e3cf5b2560c7b5a142286f69bde914494b6d8f901aaa71e453078388a50881c4", size = 310776, upload-time = "2025-09-25T19:50:02.393Z" },
{ url = "https://files.pythonhosted.org/packages/95/7d/47ee337dacecde6d234890fe929936cb03ebc4c3a7460854bbd9c97780b8/bcrypt-5.0.0-cp38-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:f632fd56fc4e61564f78b46a2269153122db34988e78b6be8b32d28507b7eaeb", size = 312922, upload-time = "2025-09-25T19:50:04.232Z" },
{ url = "https://files.pythonhosted.org/packages/d6/3a/43d494dfb728f55f4e1cf8fd435d50c16a2d75493225b54c8d06122523c6/bcrypt-5.0.0-cp38-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:801cad5ccb6b87d1b430f183269b94c24f248dddbbc5c1f78b6ed231743e001c", size = 341367, upload-time = "2025-09-25T19:50:05.559Z" },
{ url = "https://files.pythonhosted.org/packages/55/ab/a0727a4547e383e2e22a630e0f908113db37904f58719dc48d4622139b5c/bcrypt-5.0.0-cp38-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:3cf67a804fc66fc217e6914a5635000259fbbbb12e78a99488e4d5ba445a71eb", size = 359187, upload-time = "2025-09-25T19:50:06.916Z" },
{ url = "https://files.pythonhosted.org/packages/1b/bb/461f352fdca663524b4643d8b09e8435b4990f17fbf4fea6bc2a90aa0cc7/bcrypt-5.0.0-cp38-abi3-win32.whl", hash = "sha256:3abeb543874b2c0524ff40c57a4e14e5d3a66ff33fb423529c88f180fd756538", size = 153752, upload-time = "2025-09-25T19:50:08.515Z" },
{ url = "https://files.pythonhosted.org/packages/41/aa/4190e60921927b7056820291f56fc57d00d04757c8b316b2d3c0d1d6da2c/bcrypt-5.0.0-cp38-abi3-win_amd64.whl", hash = "sha256:35a77ec55b541e5e583eb3436ffbbf53b0ffa1fa16ca6782279daf95d146dcd9", size = 150881, upload-time = "2025-09-25T19:50:09.742Z" },
{ url = "https://files.pythonhosted.org/packages/54/12/cd77221719d0b39ac0b55dbd39358db1cd1246e0282e104366ebbfb8266a/bcrypt-5.0.0-cp38-abi3-win_arm64.whl", hash = "sha256:cde08734f12c6a4e28dc6755cd11d3bdfea608d93d958fffbe95a7026ebe4980", size = 144931, upload-time = "2025-09-25T19:50:11.016Z" },
{ url = "https://files.pythonhosted.org/packages/5d/ba/2af136406e1c3839aea9ecadc2f6be2bcd1eff255bd451dd39bcf302c47a/bcrypt-5.0.0-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:0c418ca99fd47e9c59a301744d63328f17798b5947b0f791e9af3c1c499c2d0a", size = 495313, upload-time = "2025-09-25T19:50:12.309Z" },
{ url = "https://files.pythonhosted.org/packages/ac/ee/2f4985dbad090ace5ad1f7dd8ff94477fe089b5fab2040bd784a3d5f187b/bcrypt-5.0.0-cp39-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:ddb4e1500f6efdd402218ffe34d040a1196c072e07929b9820f363a1fd1f4191", size = 275290, upload-time = "2025-09-25T19:50:13.673Z" },
{ url = "https://files.pythonhosted.org/packages/e4/6e/b77ade812672d15cf50842e167eead80ac3514f3beacac8902915417f8b7/bcrypt-5.0.0-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:7aeef54b60ceddb6f30ee3db090351ecf0d40ec6e2abf41430997407a46d2254", size = 278253, upload-time = "2025-09-25T19:50:15.089Z" },
{ url = "https://files.pythonhosted.org/packages/36/c4/ed00ed32f1040f7990dac7115f82273e3c03da1e1a1587a778d8cea496d8/bcrypt-5.0.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:f0ce778135f60799d89c9693b9b398819d15f1921ba15fe719acb3178215a7db", size = 276084, upload-time = "2025-09-25T19:50:16.699Z" },
{ url = "https://files.pythonhosted.org/packages/e7/c4/fa6e16145e145e87f1fa351bbd54b429354fd72145cd3d4e0c5157cf4c70/bcrypt-5.0.0-cp39-abi3-manylinux_2_28_armv7l.manylinux_2_31_armv7l.whl", hash = "sha256:a71f70ee269671460b37a449f5ff26982a6f2ba493b3eabdd687b4bf35f875ac", size = 297185, upload-time = "2025-09-25T19:50:18.525Z" },
{ url = "https://files.pythonhosted.org/packages/24/b4/11f8a31d8b67cca3371e046db49baa7c0594d71eb40ac8121e2fc0888db0/bcrypt-5.0.0-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:f8429e1c410b4073944f03bd778a9e066e7fad723564a52ff91841d278dfc822", size = 278656, upload-time = "2025-09-25T19:50:19.809Z" },
{ url = "https://files.pythonhosted.org/packages/ac/31/79f11865f8078e192847d2cb526e3fa27c200933c982c5b2869720fa5fce/bcrypt-5.0.0-cp39-abi3-manylinux_2_34_aarch64.whl", hash = "sha256:edfcdcedd0d0f05850c52ba3127b1fce70b9f89e0fe5ff16517df7e81fa3cbb8", size = 275662, upload-time = "2025-09-25T19:50:21.567Z" },
{ url = "https://files.pythonhosted.org/packages/d4/8d/5e43d9584b3b3591a6f9b68f755a4da879a59712981ef5ad2a0ac1379f7a/bcrypt-5.0.0-cp39-abi3-manylinux_2_34_x86_64.whl", hash = "sha256:611f0a17aa4a25a69362dcc299fda5c8a3d4f160e2abb3831041feb77393a14a", size = 278240, upload-time = "2025-09-25T19:50:23.305Z" },
{ url = "https://files.pythonhosted.org/packages/89/48/44590e3fc158620f680a978aafe8f87a4c4320da81ed11552f0323aa9a57/bcrypt-5.0.0-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:db99dca3b1fdc3db87d7c57eac0c82281242d1eabf19dcb8a6b10eb29a2e72d1", size = 311152, upload-time = "2025-09-25T19:50:24.597Z" },
{ url = "https://files.pythonhosted.org/packages/5f/85/e4fbfc46f14f47b0d20493669a625da5827d07e8a88ee460af6cd9768b44/bcrypt-5.0.0-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:5feebf85a9cefda32966d8171f5db7e3ba964b77fdfe31919622256f80f9cf42", size = 313284, upload-time = "2025-09-25T19:50:26.268Z" },
{ url = "https://files.pythonhosted.org/packages/25/ae/479f81d3f4594456a01ea2f05b132a519eff9ab5768a70430fa1132384b1/bcrypt-5.0.0-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:3ca8a166b1140436e058298a34d88032ab62f15aae1c598580333dc21d27ef10", size = 341643, upload-time = "2025-09-25T19:50:28.02Z" },
{ url = "https://files.pythonhosted.org/packages/df/d2/36a086dee1473b14276cd6ea7f61aef3b2648710b5d7f1c9e032c29b859f/bcrypt-5.0.0-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:61afc381250c3182d9078551e3ac3a41da14154fbff647ddf52a769f588c4172", size = 359698, upload-time = "2025-09-25T19:50:31.347Z" },
{ url = "https://files.pythonhosted.org/packages/c0/f6/688d2cd64bfd0b14d805ddb8a565e11ca1fb0fd6817175d58b10052b6d88/bcrypt-5.0.0-cp39-abi3-win32.whl", hash = "sha256:64d7ce196203e468c457c37ec22390f1a61c85c6f0b8160fd752940ccfb3a683", size = 153725, upload-time = "2025-09-25T19:50:34.384Z" },
{ url = "https://files.pythonhosted.org/packages/9f/b9/9d9a641194a730bda138b3dfe53f584d61c58cd5230e37566e83ec2ffa0d/bcrypt-5.0.0-cp39-abi3-win_amd64.whl", hash = "sha256:64ee8434b0da054d830fa8e89e1c8bf30061d539044a39524ff7dec90481e5c2", size = 150912, upload-time = "2025-09-25T19:50:35.69Z" },
{ url = "https://files.pythonhosted.org/packages/27/44/d2ef5e87509158ad2187f4dd0852df80695bb1ee0cfe0a684727b01a69e0/bcrypt-5.0.0-cp39-abi3-win_arm64.whl", hash = "sha256:f2347d3534e76bf50bca5500989d6c1d05ed64b440408057a37673282c654927", size = 144953, upload-time = "2025-09-25T19:50:37.32Z" },
{ url = "https://files.pythonhosted.org/packages/8a/75/4aa9f5a4d40d762892066ba1046000b329c7cd58e888a6db878019b282dc/bcrypt-5.0.0-pp311-pypy311_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:7edda91d5ab52b15636d9c30da87d2cc84f426c72b9dba7a9b4fe142ba11f534", size = 271180, upload-time = "2025-09-25T19:50:38.575Z" },
{ url = "https://files.pythonhosted.org/packages/54/79/875f9558179573d40a9cc743038ac2bf67dfb79cecb1e8b5d70e88c94c3d/bcrypt-5.0.0-pp311-pypy311_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:046ad6db88edb3c5ece4369af997938fb1c19d6a699b9c1b27b0db432faae4c4", size = 273791, upload-time = "2025-09-25T19:50:39.913Z" },
{ url = "https://files.pythonhosted.org/packages/bc/fe/975adb8c216174bf70fc17535f75e85ac06ed5252ea077be10d9cff5ce24/bcrypt-5.0.0-pp311-pypy311_pp73-manylinux_2_34_aarch64.whl", hash = "sha256:dcd58e2b3a908b5ecc9b9df2f0085592506ac2d5110786018ee5e160f28e0911", size = 270746, upload-time = "2025-09-25T19:50:43.306Z" },
{ url = "https://files.pythonhosted.org/packages/e4/f8/972c96f5a2b6c4b3deca57009d93e946bbdbe2241dca9806d502f29dd3ee/bcrypt-5.0.0-pp311-pypy311_pp73-manylinux_2_34_x86_64.whl", hash = "sha256:6b8f520b61e8781efee73cba14e3e8c9556ccfb375623f4f97429544734545b4", size = 273375, upload-time = "2025-09-25T19:50:45.43Z" },
]
[[package]]
name = "certifi"
version = "2026.1.4"
@@ -321,7 +251,7 @@ wheels = [
[[package]]
name = "fastapi-toolsets"
version = "3.0.1"
version = "3.0.3"
source = { editable = "." }
dependencies = [
{ name = "asyncpg" },
@@ -352,7 +282,6 @@ pytest = [
[package.dev-dependencies]
dev = [
{ name = "bcrypt" },
{ name = "coverage" },
{ name = "fastapi-toolsets", extra = ["all"] },
{ name = "httpx" },
@@ -372,9 +301,6 @@ docs = [
{ name = "mkdocstrings-python" },
{ name = "zensical" },
]
docs-src = [
{ name = "bcrypt" },
]
tests = [
{ name = "coverage" },
{ name = "httpx" },
@@ -401,7 +327,6 @@ provides-extras = ["cli", "metrics", "pytest", "all"]
[package.metadata.requires-dev]
dev = [
{ name = "bcrypt", specifier = ">=4.0.0" },
{ name = "coverage", specifier = ">=7.0.0" },
{ name = "fastapi-toolsets", extras = ["all"] },
{ name = "httpx", specifier = ">=0.25.0" },
@@ -421,7 +346,6 @@ docs = [
{ name = "mkdocstrings-python", specifier = ">=2.0.2" },
{ name = "zensical", specifier = ">=0.0.30" },
]
docs-src = [{ name = "bcrypt", specifier = ">=4.0.0" }]
tests = [
{ name = "coverage", specifier = ">=7.0.0" },
{ name = "httpx", specifier = ">=0.25.0" },