6 Commits

Author SHA1 Message Date
0fc86d3c34 refactor: simplify and deduplicate across crud, metrics, cli, and
exceptions
2026-03-01 09:47:36 -05:00
82ef96082e test: add missing tests for fixtures/utils.py 2026-03-01 08:05:20 -05:00
e0828c7e71 refactor: centralize type aliases in types.py and simplify crud layer 2026-03-01 07:46:49 -05:00
59d028d00e refactor: remove deprecated parameter and function 2026-03-01 07:21:07 -05:00
56d365d14b Version 1.3.0 2026-03-01 05:22:16 -05:00
d3vyce
a257d85d45 Add sort_params helper in CrudFactory (#103)
* feat: add sort_params helper in CrudFactory

* docs: add sorting

* fix: change sort_by to order_by
2026-03-01 11:20:43 +01:00
26 changed files with 788 additions and 275 deletions

View File

@@ -1,6 +1,6 @@
# Pagination & search # Pagination & search
This example builds an articles listing endpoint that supports **offset pagination**, **cursor pagination**, **full-text search**, and **faceted filtering** — all from a single `CrudFactory` definition. This example builds an articles listing endpoint that supports **offset pagination**, **cursor pagination**, **full-text search**, **faceted filtering**, and **sorting** — all from a single `CrudFactory` definition.
## Models ## Models
@@ -16,7 +16,7 @@ This example builds an articles listing endpoint that supports **offset paginati
## Crud ## Crud
Declare `facet_fields` and `searchable_fields` once on [`CrudFactory`](../reference/crud.md#fastapi_toolsets.crud.factory.CrudFactory). All endpoints built from this class share the same defaults and can override them per call. Declare `searchable_fields`, `facet_fields`, and `order_fields` once on [`CrudFactory`](../reference/crud.md#fastapi_toolsets.crud.factory.CrudFactory). All endpoints built from this class share the same defaults and can override them per call.
```python title="crud.py" ```python title="crud.py"
--8<-- "docs_src/examples/pagination_search/crud.py" --8<-- "docs_src/examples/pagination_search/crud.py"
@@ -46,14 +46,14 @@ Declare `facet_fields` and `searchable_fields` once on [`CrudFactory`](../refere
Best for admin panels or any UI that needs a total item count and numbered pages. Best for admin panels or any UI that needs a total item count and numbered pages.
```python title="routes.py:1:27" ```python title="routes.py:1:36"
--8<-- "docs_src/examples/pagination_search/routes.py:1:27" --8<-- "docs_src/examples/pagination_search/routes.py:1:36"
``` ```
**Example request** **Example request**
``` ```
GET /articles/offset?page=2&items_per_page=10&search=fastapi&status=published GET /articles/offset?page=2&items_per_page=10&search=fastapi&status=published&order_by=title&order=asc
``` ```
**Example response** **Example response**
@@ -83,14 +83,14 @@ GET /articles/offset?page=2&items_per_page=10&search=fastapi&status=published
Best for feeds, infinite scroll, or any high-throughput API where offset performance degrades. Best for feeds, infinite scroll, or any high-throughput API where offset performance degrades.
```python title="routes.py:30:45" ```python title="routes.py:39:59"
--8<-- "docs_src/examples/pagination_search/routes.py:30:45" --8<-- "docs_src/examples/pagination_search/routes.py:39:59"
``` ```
**Example request** **Example request**
``` ```
GET /articles/cursor?items_per_page=10&status=published GET /articles/cursor?items_per_page=10&status=published&order_by=created_at&order=desc
``` ```
**Example response** **Example response**

View File

@@ -292,6 +292,8 @@ Use `filter_by` to pass the client's chosen filter values directly — no need t
Use [`filter_params()`](../reference/crud.md#fastapi_toolsets.crud.factory.AsyncCrud.filter_params) to generate a dict with the facet filter values from the query parameters: Use [`filter_params()`](../reference/crud.md#fastapi_toolsets.crud.factory.AsyncCrud.filter_params) to generate a dict with the facet filter values from the query parameters:
```python ```python
from typing import Annotated
from fastapi import Depends from fastapi import Depends
UserCrud = CrudFactory( UserCrud = CrudFactory(
@@ -303,7 +305,7 @@ UserCrud = CrudFactory(
async def list_users( async def list_users(
session: SessionDep, session: SessionDep,
page: int = 1, page: int = 1,
filter_by: dict[str, list[str]] = Depends(UserCrud.filter_params()), filter_by: Annotated[dict[str, list[str]], Depends(UserCrud.filter_params())],
) -> PaginatedResponse[UserRead]: ) -> PaginatedResponse[UserRead]:
return await UserCrud.offset_paginate( return await UserCrud.offset_paginate(
session=session, session=session,
@@ -320,6 +322,58 @@ GET /users?status=active&country=FR → filter_by={"status": ["active"], "coun
GET /users?role=admin&role=editor → filter_by={"role": ["admin", "editor"]} (IN clause) GET /users?role=admin&role=editor → filter_by={"role": ["admin", "editor"]} (IN clause)
``` ```
## Sorting
!!! info "Added in `v1.3`"
Declare `order_fields` on the CRUD class to expose client-driven column ordering via `order_by` and `order` query parameters.
```python
UserCrud = CrudFactory(
model=User,
order_fields=[
User.name,
User.created_at,
],
)
```
Call [`order_params()`](../reference/crud.md#fastapi_toolsets.crud.factory.AsyncCrud.order_params) to generate a FastAPI dependency that maps the query parameters to an [`OrderByClause`](../reference/crud.md#fastapi_toolsets.crud.factory.OrderByClause) expression:
```python
from typing import Annotated
from fastapi import Depends
from fastapi_toolsets.crud import OrderByClause
@router.get("")
async def list_users(
session: SessionDep,
order_by: Annotated[OrderByClause | None, Depends(UserCrud.order_params())],
) -> PaginatedResponse[UserRead]:
return await UserCrud.offset_paginate(session=session, order_by=order_by)
```
The dependency adds two query parameters to the endpoint:
| Parameter | Type |
| ---------- | --------------- |
| `order_by` | `str | null` |
| `order` | `asc` or `desc` |
```
GET /users?order_by=name&order=asc → ORDER BY users.name ASC
GET /users?order_by=name&order=desc → ORDER BY users.name DESC
```
An unknown `order_by` value raises [`InvalidOrderFieldError`](../reference/exceptions.md#fastapi_toolsets.exceptions.exceptions.InvalidOrderFieldError) (HTTP 422).
You can also pass `order_fields` directly to `order_params()` to override the class-level defaults without modifying them:
```python
UserOrderParams = UserCrud.order_params(order_fields=[User.name])
```
## Relationship loading ## Relationship loading
!!! info "Added in `v1.1`" !!! info "Added in `v1.1`"

View File

@@ -13,6 +13,7 @@ from fastapi_toolsets.exceptions import (
ConflictError, ConflictError,
NoSearchableFieldsError, NoSearchableFieldsError,
InvalidFacetFilterError, InvalidFacetFilterError,
InvalidOrderFieldError,
generate_error_responses, generate_error_responses,
init_exceptions_handlers, init_exceptions_handlers,
) )
@@ -32,6 +33,8 @@ from fastapi_toolsets.exceptions import (
## ::: fastapi_toolsets.exceptions.exceptions.InvalidFacetFilterError ## ::: fastapi_toolsets.exceptions.exceptions.InvalidFacetFilterError
## ::: fastapi_toolsets.exceptions.exceptions.InvalidOrderFieldError
## ::: fastapi_toolsets.exceptions.exceptions.generate_error_responses ## ::: fastapi_toolsets.exceptions.exceptions.generate_error_responses
## ::: fastapi_toolsets.exceptions.handler.init_exceptions_handlers ## ::: fastapi_toolsets.exceptions.handler.init_exceptions_handlers

View File

@@ -1,6 +1,9 @@
from fastapi import FastAPI from fastapi import FastAPI
from fastapi_toolsets.exceptions import init_exceptions_handlers
from .routes import router from .routes import router
app = FastAPI() app = FastAPI()
init_exceptions_handlers(app=app)
app.include_router(router=router) app.include_router(router=router)

View File

@@ -14,6 +14,8 @@ ArticleCrud = CrudFactory(
Article.status, Article.status,
(Article.category, Category.name), (Article.category, Category.name),
], ],
order_fields=[ # fields exposed for client-driven ordering
Article.title,
Article.created_at,
],
) )
ArticleFilters = ArticleCrud.filter_params()

View File

@@ -1,9 +1,13 @@
from typing import Annotated
from fastapi import APIRouter, Depends, Query from fastapi import APIRouter, Depends, Query
from fastapi_toolsets.crud import OrderByClause
from fastapi_toolsets.schemas import PaginatedResponse from fastapi_toolsets.schemas import PaginatedResponse
from .crud import ArticleCrud from .crud import ArticleCrud
from .db import SessionDep from .db import SessionDep
from .models import Article
from .schemas import ArticleRead from .schemas import ArticleRead
router = APIRouter(prefix="/articles") router = APIRouter(prefix="/articles")
@@ -12,10 +16,14 @@ router = APIRouter(prefix="/articles")
@router.get("/offset") @router.get("/offset")
async def list_articles_offset( async def list_articles_offset(
session: SessionDep, session: SessionDep,
filter_by: Annotated[dict[str, list[str]], Depends(ArticleCrud.filter_params())],
order_by: Annotated[
OrderByClause | None,
Depends(ArticleCrud.order_params(default_field=Article.created_at)),
],
page: int = Query(1, ge=1), page: int = Query(1, ge=1),
items_per_page: int = Query(20, ge=1, le=100), items_per_page: int = Query(20, ge=1, le=100),
search: str | None = None, search: str | None = None,
filter_by: dict[str, list[str]] = Depends(ArticleCrud.filter_params()),
) -> PaginatedResponse[ArticleRead]: ) -> PaginatedResponse[ArticleRead]:
return await ArticleCrud.offset_paginate( return await ArticleCrud.offset_paginate(
session=session, session=session,
@@ -23,6 +31,7 @@ async def list_articles_offset(
items_per_page=items_per_page, items_per_page=items_per_page,
search=search, search=search,
filter_by=filter_by or None, filter_by=filter_by or None,
order_by=order_by,
schema=ArticleRead, schema=ArticleRead,
) )
@@ -30,10 +39,14 @@ async def list_articles_offset(
@router.get("/cursor") @router.get("/cursor")
async def list_articles_cursor( async def list_articles_cursor(
session: SessionDep, session: SessionDep,
filter_by: Annotated[dict[str, list[str]], Depends(ArticleCrud.filter_params())],
order_by: Annotated[
OrderByClause | None,
Depends(ArticleCrud.order_params(default_field=Article.created_at)),
],
cursor: str | None = None, cursor: str | None = None,
items_per_page: int = Query(20, ge=1, le=100), items_per_page: int = Query(20, ge=1, le=100),
search: str | None = None, search: str | None = None,
filter_by: dict[str, list[str]] = Depends(ArticleCrud.filter_params()),
) -> PaginatedResponse[ArticleRead]: ) -> PaginatedResponse[ArticleRead]:
return await ArticleCrud.cursor_paginate( return await ArticleCrud.cursor_paginate(
session=session, session=session,
@@ -41,5 +54,6 @@ async def list_articles_cursor(
items_per_page=items_per_page, items_per_page=items_per_page,
search=search, search=search,
filter_by=filter_by or None, filter_by=filter_by or None,
order_by=order_by,
schema=ArticleRead, schema=ArticleRead,
) )

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "fastapi-toolsets" name = "fastapi-toolsets"
version = "1.2.1" version = "1.3.0"
description = "Production-ready utilities for FastAPI applications" description = "Production-ready utilities for FastAPI applications"
readme = "README.md" readme = "README.md"
license = "MIT" license = "MIT"

View File

@@ -21,4 +21,4 @@ Example usage:
return Response(data={"user": user.username}, message="Success") return Response(data={"user": user.username}, message="Success")
""" """
__version__ = "1.2.1" __version__ = "1.3.0"

View File

@@ -72,7 +72,7 @@ async def load(
registry = get_fixtures_registry() registry = get_fixtures_registry()
db_context = get_db_context() db_context = get_db_context()
context_list = [c.value for c in contexts] if contexts else [Context.BASE] context_list = list(contexts) if contexts else [Context.BASE]
ordered = registry.resolve_context_dependencies(*context_list) ordered = registry.resolve_context_dependencies(*context_list)

View File

@@ -1,12 +1,9 @@
"""Generic async CRUD operations for SQLAlchemy models.""" """Generic async CRUD operations for SQLAlchemy models."""
from ..exceptions import InvalidFacetFilterError, NoSearchableFieldsError from ..exceptions import InvalidFacetFilterError, NoSearchableFieldsError
from .factory import CrudFactory, JoinType, M2MFieldType from ..types import FacetFieldType, JoinType, M2MFieldType, OrderByClause
from .search import ( from .factory import CrudFactory
FacetFieldType, from .search import SearchConfig, get_searchable_fields
SearchConfig,
get_searchable_fields,
)
__all__ = [ __all__ = [
"CrudFactory", "CrudFactory",
@@ -16,5 +13,6 @@ __all__ = [
"JoinType", "JoinType",
"M2MFieldType", "M2MFieldType",
"NoSearchableFieldsError", "NoSearchableFieldsError",
"OrderByClause",
"SearchConfig", "SearchConfig",
] ]

View File

@@ -6,10 +6,10 @@ import base64
import inspect import inspect
import json import json
import uuid as uuid_module import uuid as uuid_module
from collections.abc import Awaitable, Callable, Mapping, Sequence from collections.abc import Awaitable, Callable, Sequence
from datetime import date, datetime from datetime import date, datetime
from decimal import Decimal from decimal import Decimal
from typing import Any, ClassVar, Generic, Literal, Self, TypeVar, cast, overload from typing import Any, ClassVar, Generic, Literal, Self, cast, overload
from fastapi import Query from fastapi import Query
from pydantic import BaseModel from pydantic import BaseModel
@@ -23,23 +23,25 @@ from sqlalchemy.sql.base import ExecutableOption
from sqlalchemy.sql.roles import WhereHavingRole from sqlalchemy.sql.roles import WhereHavingRole
from ..db import get_transaction from ..db import get_transaction
from ..exceptions import NotFoundError from ..exceptions import InvalidOrderFieldError, NotFoundError
from ..schemas import CursorPagination, OffsetPagination, PaginatedResponse, Response from ..schemas import CursorPagination, OffsetPagination, PaginatedResponse, Response
from .search import ( from ..types import (
FacetFieldType, FacetFieldType,
SearchConfig, JoinType,
M2MFieldType,
ModelType,
OrderByClause,
SchemaType,
SearchFieldType, SearchFieldType,
)
from .search import (
SearchConfig,
build_facets, build_facets,
build_filter_by, build_filter_by,
build_search_filters, build_search_filters,
facet_keys, facet_keys,
) )
ModelType = TypeVar("ModelType", bound=DeclarativeBase)
SchemaType = TypeVar("SchemaType", bound=BaseModel)
JoinType = list[tuple[type[DeclarativeBase], Any]]
M2MFieldType = Mapping[str, QueryableAttribute[Any]]
def _encode_cursor(value: Any) -> str: def _encode_cursor(value: Any) -> str:
"""Encode cursor column value as an base64 string.""" """Encode cursor column value as an base64 string."""
@@ -51,6 +53,22 @@ def _decode_cursor(cursor: str) -> str:
return json.loads(base64.b64decode(cursor.encode()).decode()) return json.loads(base64.b64decode(cursor.encode()).decode())
def _apply_joins(q: Any, joins: JoinType | None, outer_join: bool) -> Any:
"""Apply a list of (model, condition) joins to a SQLAlchemy select query."""
if not joins:
return q
for model, condition in joins:
q = q.outerjoin(model, condition) if outer_join else q.join(model, condition)
return q
def _apply_search_joins(q: Any, search_joins: list[Any]) -> Any:
"""Apply relationship-based outer joins (from search/filter_by) to a query."""
for join_rel in search_joins:
q = q.outerjoin(join_rel)
return q
class AsyncCrud(Generic[ModelType]): class AsyncCrud(Generic[ModelType]):
"""Generic async CRUD operations for SQLAlchemy models. """Generic async CRUD operations for SQLAlchemy models.
@@ -60,6 +78,7 @@ class AsyncCrud(Generic[ModelType]):
model: ClassVar[type[DeclarativeBase]] model: ClassVar[type[DeclarativeBase]]
searchable_fields: ClassVar[Sequence[SearchFieldType] | None] = None searchable_fields: ClassVar[Sequence[SearchFieldType] | None] = None
facet_fields: ClassVar[Sequence[FacetFieldType] | None] = None facet_fields: ClassVar[Sequence[FacetFieldType] | None] = None
order_fields: ClassVar[Sequence[QueryableAttribute[Any]] | None] = None
m2m_fields: ClassVar[M2MFieldType | None] = None m2m_fields: ClassVar[M2MFieldType | None] = None
default_load_options: ClassVar[list[ExecutableOption] | None] = None default_load_options: ClassVar[list[ExecutableOption] | None] = None
cursor_column: ClassVar[Any | None] = None cursor_column: ClassVar[Any | None] = None
@@ -129,6 +148,48 @@ class AsyncCrud(Generic[ModelType]):
return set() return set()
return set(cls.m2m_fields.keys()) return set(cls.m2m_fields.keys())
@classmethod
def _resolve_facet_fields(
cls: type[Self],
facet_fields: Sequence[FacetFieldType] | None,
) -> Sequence[FacetFieldType] | None:
"""Return facet_fields if given, otherwise fall back to the class-level default."""
return facet_fields if facet_fields is not None else cls.facet_fields
@classmethod
def _prepare_filter_by(
cls: type[Self],
filter_by: dict[str, Any] | BaseModel | None,
facet_fields: Sequence[FacetFieldType] | None,
) -> tuple[list[Any], list[Any]]:
"""Normalize filter_by and return (filters, joins) to apply to the query."""
if isinstance(filter_by, BaseModel):
filter_by = filter_by.model_dump(exclude_none=True)
if not filter_by:
return [], []
resolved = cls._resolve_facet_fields(facet_fields)
return build_filter_by(filter_by, resolved or [])
@classmethod
async def _build_filter_attributes(
cls: type[Self],
session: AsyncSession,
facet_fields: Sequence[FacetFieldType] | None,
filters: list[Any],
search_joins: list[Any],
) -> dict[str, list[Any]] | None:
"""Build facet filter_attributes, or return None if no facet fields configured."""
resolved = cls._resolve_facet_fields(facet_fields)
if not resolved:
return None
return await build_facets(
session,
cls.model,
resolved,
base_filters=filters,
base_joins=search_joins,
)
@classmethod @classmethod
def filter_params( def filter_params(
cls: type[Self], cls: type[Self],
@@ -149,7 +210,7 @@ class AsyncCrud(Generic[ModelType]):
ValueError: If no facet fields are configured on this CRUD class and none are ValueError: If no facet fields are configured on this CRUD class and none are
provided via ``facet_fields``. provided via ``facet_fields``.
""" """
fields = facet_fields if facet_fields is not None else cls.facet_fields fields = cls._resolve_facet_fields(facet_fields)
if not fields: if not fields:
raise ValueError( raise ValueError(
f"{cls.__name__} has no facet_fields configured. " f"{cls.__name__} has no facet_fields configured. "
@@ -175,6 +236,63 @@ class AsyncCrud(Generic[ModelType]):
return dependency return dependency
@classmethod
def order_params(
cls: type[Self],
*,
order_fields: Sequence[QueryableAttribute[Any]] | None = None,
default_field: QueryableAttribute[Any] | None = None,
default_order: Literal["asc", "desc"] = "asc",
) -> Callable[..., Awaitable[OrderByClause | None]]:
"""Return a FastAPI dependency that resolves order query params into an order_by clause.
Args:
order_fields: Override the allowed order fields. Falls back to the class-level
``order_fields`` if not provided.
default_field: Field to order by when ``order_by`` query param is absent.
If ``None`` and no ``order_by`` is provided, no ordering is applied.
default_order: Default order direction when ``order`` is absent
(``"asc"`` or ``"desc"``).
Returns:
An async dependency function named ``{Model}OrderParams`` that resolves to an
``OrderByClause`` (or ``None``). Pass it to ``Depends()`` in your route.
Raises:
ValueError: If no order fields are configured on this CRUD class and none are
provided via ``order_fields``.
InvalidOrderFieldError: When the request provides an unknown ``order_by`` value.
"""
fields = order_fields if order_fields is not None else cls.order_fields
if not fields:
raise ValueError(
f"{cls.__name__} has no order_fields configured. "
"Pass order_fields= or set them on CrudFactory."
)
field_map: dict[str, QueryableAttribute[Any]] = {f.key: f for f in fields}
valid_keys = sorted(field_map.keys())
async def dependency(
order_by: str | None = Query(
None, description=f"Field to order by. Valid values: {valid_keys}"
),
order: Literal["asc", "desc"] = Query(
default_order, description="Sort direction"
),
) -> OrderByClause | None:
if order_by is None:
if default_field is None:
return None
field = default_field
elif order_by not in field_map:
raise InvalidOrderFieldError(order_by, valid_keys)
else:
field = field_map[order_by]
return field.asc() if order == "asc" else field.desc()
dependency.__name__ = f"{cls.model.__name__}OrderParams"
return dependency
@overload @overload
@classmethod @classmethod
async def create( # pragma: no cover async def create( # pragma: no cover
@@ -230,8 +348,7 @@ class AsyncCrud(Generic[ModelType]):
await session.refresh(db_model) await session.refresh(db_model)
result = cast(ModelType, db_model) result = cast(ModelType, db_model)
if schema: if schema:
data_out = schema.model_validate(result) if schema else result return Response(data=schema.model_validate(result))
return Response(data=data_out)
return result return result
@overload @overload
@@ -294,13 +411,7 @@ class AsyncCrud(Generic[ModelType]):
MultipleResultsFound: If more than one record found MultipleResultsFound: If more than one record found
""" """
q = select(cls.model) q = select(cls.model)
if joins: q = _apply_joins(q, joins, outer_join)
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
q = q.where(and_(*filters)) q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options): if resolved := cls._resolve_load_options(load_options):
q = q.options(*resolved) q = q.options(*resolved)
@@ -312,8 +423,7 @@ class AsyncCrud(Generic[ModelType]):
raise NotFoundError() raise NotFoundError()
result = cast(ModelType, item) result = cast(ModelType, item)
if schema: if schema:
data_out = schema.model_validate(result) if schema else result return Response(data=schema.model_validate(result))
return Response(data=data_out)
return result return result
@classmethod @classmethod
@@ -339,13 +449,7 @@ class AsyncCrud(Generic[ModelType]):
Model instance or None Model instance or None
""" """
q = select(cls.model) q = select(cls.model)
if joins: q = _apply_joins(q, joins, outer_join)
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
if filters: if filters:
q = q.where(and_(*filters)) q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options): if resolved := cls._resolve_load_options(load_options):
@@ -362,7 +466,7 @@ class AsyncCrud(Generic[ModelType]):
joins: JoinType | None = None, joins: JoinType | None = None,
outer_join: bool = False, outer_join: bool = False,
load_options: list[ExecutableOption] | None = None, load_options: list[ExecutableOption] | None = None,
order_by: Any | None = None, order_by: OrderByClause | None = None,
limit: int | None = None, limit: int | None = None,
offset: int | None = None, offset: int | None = None,
) -> Sequence[ModelType]: ) -> Sequence[ModelType]:
@@ -382,13 +486,7 @@ class AsyncCrud(Generic[ModelType]):
List of model instances List of model instances
""" """
q = select(cls.model) q = select(cls.model)
if joins: q = _apply_joins(q, joins, outer_join)
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
if filters: if filters:
q = q.where(and_(*filters)) q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options): if resolved := cls._resolve_load_options(load_options):
@@ -486,8 +584,7 @@ class AsyncCrud(Generic[ModelType]):
setattr(db_model, rel_attr, related_instances) setattr(db_model, rel_attr, related_instances)
await session.refresh(db_model) await session.refresh(db_model)
if schema: if schema:
data_out = schema.model_validate(db_model) if schema else db_model return Response(data=schema.model_validate(db_model))
return Response(data=data_out)
return db_model return db_model
@classmethod @classmethod
@@ -604,13 +701,7 @@ class AsyncCrud(Generic[ModelType]):
Number of matching records Number of matching records
""" """
q = select(func.count()).select_from(cls.model) q = select(func.count()).select_from(cls.model)
if joins: q = _apply_joins(q, joins, outer_join)
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
if filters: if filters:
q = q.where(and_(*filters)) q = q.where(and_(*filters))
result = await session.execute(q) result = await session.execute(q)
@@ -637,13 +728,7 @@ class AsyncCrud(Generic[ModelType]):
True if at least one record matches True if at least one record matches
""" """
q = select(cls.model) q = select(cls.model)
if joins: q = _apply_joins(q, joins, outer_join)
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
q = q.where(and_(*filters)).exists().select() q = q.where(and_(*filters)).exists().select()
result = await session.execute(q) result = await session.execute(q)
return bool(result.scalar()) return bool(result.scalar())
@@ -657,7 +742,7 @@ class AsyncCrud(Generic[ModelType]):
joins: JoinType | None = None, joins: JoinType | None = None,
outer_join: bool = False, outer_join: bool = False,
load_options: list[ExecutableOption] | None = None, load_options: list[ExecutableOption] | None = None,
order_by: Any | None = None, order_by: OrderByClause | None = None,
page: int = 1, page: int = 1,
items_per_page: int = 20, items_per_page: int = 20,
search: str | SearchConfig | None = None, search: str | SearchConfig | None = None,
@@ -690,47 +775,29 @@ class AsyncCrud(Generic[ModelType]):
""" """
filters = list(filters) if filters else [] filters = list(filters) if filters else []
offset = (page - 1) * items_per_page offset = (page - 1) * items_per_page
search_joins: list[Any] = []
if isinstance(filter_by, BaseModel): fb_filters, search_joins = cls._prepare_filter_by(filter_by, facet_fields)
filter_by = filter_by.model_dump(exclude_none=True) or None filters.extend(fb_filters)
# Build filter_by conditions from declared facet fields
if filter_by:
resolved_facets_for_filter = (
facet_fields if facet_fields is not None else cls.facet_fields
)
fb_filters, fb_joins = build_filter_by(
filter_by, resolved_facets_for_filter or []
)
filters.extend(fb_filters)
search_joins.extend(fb_joins)
# Build search filters # Build search filters
if search: if search:
search_filters, search_joins = build_search_filters( search_filters, new_search_joins = build_search_filters(
cls.model, cls.model,
search, search,
search_fields=search_fields, search_fields=search_fields,
default_fields=cls.searchable_fields, default_fields=cls.searchable_fields,
) )
filters.extend(search_filters) filters.extend(search_filters)
search_joins.extend(new_search_joins)
# Build query with joins # Build query with joins
q = select(cls.model) q = select(cls.model)
# Apply explicit joins # Apply explicit joins
if joins: q = _apply_joins(q, joins, outer_join)
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
# Apply search joins (always outer joins for search) # Apply search joins (always outer joins for search)
for join_rel in search_joins: q = _apply_search_joins(q, search_joins)
q = q.outerjoin(join_rel)
if filters: if filters:
q = q.where(and_(*filters)) q = q.where(and_(*filters))
@@ -750,17 +817,10 @@ class AsyncCrud(Generic[ModelType]):
count_q = count_q.select_from(cls.model) count_q = count_q.select_from(cls.model)
# Apply explicit joins to count query # Apply explicit joins to count query
if joins: count_q = _apply_joins(count_q, joins, outer_join)
for model, condition in joins:
count_q = (
count_q.outerjoin(model, condition)
if outer_join
else count_q.join(model, condition)
)
# Apply search joins to count query # Apply search joins to count query
for join_rel in search_joins: count_q = _apply_search_joins(count_q, search_joins)
count_q = count_q.outerjoin(join_rel)
if filters: if filters:
count_q = count_q.where(and_(*filters)) count_q = count_q.where(and_(*filters))
@@ -768,19 +828,9 @@ class AsyncCrud(Generic[ModelType]):
count_result = await session.execute(count_q) count_result = await session.execute(count_q)
total_count = count_result.scalar_one() total_count = count_result.scalar_one()
# Build facets filter_attributes = await cls._build_filter_attributes(
resolved_facet_fields = ( session, facet_fields, filters, search_joins
facet_fields if facet_fields is not None else cls.facet_fields
) )
filter_attributes: dict[str, list[Any]] | None = None
if resolved_facet_fields:
filter_attributes = await build_facets(
session,
cls.model,
resolved_facet_fields,
base_filters=filters or None,
base_joins=search_joins or None,
)
return PaginatedResponse( return PaginatedResponse(
data=items, data=items,
@@ -803,7 +853,7 @@ class AsyncCrud(Generic[ModelType]):
joins: JoinType | None = None, joins: JoinType | None = None,
outer_join: bool = False, outer_join: bool = False,
load_options: list[ExecutableOption] | None = None, load_options: list[ExecutableOption] | None = None,
order_by: Any | None = None, order_by: OrderByClause | None = None,
items_per_page: int = 20, items_per_page: int = 20,
search: str | SearchConfig | None = None, search: str | SearchConfig | None = None,
search_fields: Sequence[SearchFieldType] | None = None, search_fields: Sequence[SearchFieldType] | None = None,
@@ -837,21 +887,9 @@ class AsyncCrud(Generic[ModelType]):
PaginatedResponse with CursorPagination metadata PaginatedResponse with CursorPagination metadata
""" """
filters = list(filters) if filters else [] filters = list(filters) if filters else []
search_joins: list[Any] = []
if isinstance(filter_by, BaseModel): fb_filters, search_joins = cls._prepare_filter_by(filter_by, facet_fields)
filter_by = filter_by.model_dump(exclude_none=True) or None filters.extend(fb_filters)
# Build filter_by conditions from declared facet fields
if filter_by:
resolved_facets_for_filter = (
facet_fields if facet_fields is not None else cls.facet_fields
)
fb_filters, fb_joins = build_filter_by(
filter_by, resolved_facets_for_filter or []
)
filters.extend(fb_filters)
search_joins.extend(fb_joins)
if cls.cursor_column is None: if cls.cursor_column is None:
raise ValueError( raise ValueError(
@@ -884,29 +922,23 @@ class AsyncCrud(Generic[ModelType]):
# Build search filters # Build search filters
if search: if search:
search_filters, search_joins = build_search_filters( search_filters, new_search_joins = build_search_filters(
cls.model, cls.model,
search, search,
search_fields=search_fields, search_fields=search_fields,
default_fields=cls.searchable_fields, default_fields=cls.searchable_fields,
) )
filters.extend(search_filters) filters.extend(search_filters)
search_joins.extend(new_search_joins)
# Build query # Build query
q = select(cls.model) q = select(cls.model)
# Apply explicit joins # Apply explicit joins
if joins: q = _apply_joins(q, joins, outer_join)
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
# Apply search joins (always outer joins) # Apply search joins (always outer joins)
for join_rel in search_joins: q = _apply_search_joins(q, search_joins)
q = q.outerjoin(join_rel)
if filters: if filters:
q = q.where(and_(*filters)) q = q.where(and_(*filters))
@@ -938,19 +970,9 @@ class AsyncCrud(Generic[ModelType]):
items: list[Any] = [schema.model_validate(item) for item in items_page] items: list[Any] = [schema.model_validate(item) for item in items_page]
# Build facets filter_attributes = await cls._build_filter_attributes(
resolved_facet_fields = ( session, facet_fields, filters, search_joins
facet_fields if facet_fields is not None else cls.facet_fields
) )
filter_attributes: dict[str, list[Any]] | None = None
if resolved_facet_fields:
filter_attributes = await build_facets(
session,
cls.model,
resolved_facet_fields,
base_filters=filters or None,
base_joins=search_joins or None,
)
return PaginatedResponse( return PaginatedResponse(
data=items, data=items,
@@ -969,6 +991,7 @@ def CrudFactory(
*, *,
searchable_fields: Sequence[SearchFieldType] | None = None, searchable_fields: Sequence[SearchFieldType] | None = None,
facet_fields: Sequence[FacetFieldType] | None = None, facet_fields: Sequence[FacetFieldType] | None = None,
order_fields: Sequence[QueryableAttribute[Any]] | None = None,
m2m_fields: M2MFieldType | None = None, m2m_fields: M2MFieldType | None = None,
default_load_options: list[ExecutableOption] | None = None, default_load_options: list[ExecutableOption] | None = None,
cursor_column: Any | None = None, cursor_column: Any | None = None,
@@ -981,6 +1004,8 @@ def CrudFactory(
facet_fields: Optional list of columns to compute distinct values for in paginated facet_fields: Optional list of columns to compute distinct values for in paginated
responses. Supports direct columns (``User.status``) and relationship tuples responses. Supports direct columns (``User.status``) and relationship tuples
(``(User.role, Role.name)``). Can be overridden per call. (``(User.role, Role.name)``). Can be overridden per call.
order_fields: Optional list of model attributes that callers are allowed to order by
via ``order_params()``. Can be overridden per call.
m2m_fields: Optional mapping for many-to-many relationships. m2m_fields: Optional mapping for many-to-many relationships.
Maps schema field names (containing lists of IDs) to Maps schema field names (containing lists of IDs) to
SQLAlchemy relationship attributes. SQLAlchemy relationship attributes.
@@ -1074,6 +1099,7 @@ def CrudFactory(
"model": model, "model": model,
"searchable_fields": searchable_fields, "searchable_fields": searchable_fields,
"facet_fields": facet_fields, "facet_fields": facet_fields,
"order_fields": order_fields,
"m2m_fields": m2m_fields, "m2m_fields": m2m_fields,
"default_load_options": default_load_options, "default_load_options": default_load_options,
"cursor_column": cursor_column, "cursor_column": cursor_column,

View File

@@ -1,24 +1,23 @@
"""Search utilities for AsyncCrud.""" """Search utilities for AsyncCrud."""
import asyncio import asyncio
import functools
from collections import Counter from collections import Counter
from collections.abc import Sequence from collections.abc import Sequence
from dataclasses import dataclass from dataclasses import dataclass, replace
from typing import TYPE_CHECKING, Any, Literal from typing import TYPE_CHECKING, Any, Literal
from sqlalchemy import String, or_, select from sqlalchemy import String, and_, or_, select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm.attributes import InstrumentedAttribute from sqlalchemy.orm.attributes import InstrumentedAttribute
from ..exceptions import InvalidFacetFilterError, NoSearchableFieldsError from ..exceptions import InvalidFacetFilterError, NoSearchableFieldsError
from ..types import FacetFieldType, SearchFieldType
if TYPE_CHECKING: if TYPE_CHECKING:
from sqlalchemy.sql.elements import ColumnElement from sqlalchemy.sql.elements import ColumnElement
SearchFieldType = InstrumentedAttribute[Any] | tuple[InstrumentedAttribute[Any], ...]
FacetFieldType = SearchFieldType
@dataclass @dataclass
class SearchConfig: class SearchConfig:
@@ -37,6 +36,7 @@ class SearchConfig:
match_mode: Literal["any", "all"] = "any" match_mode: Literal["any", "all"] = "any"
@functools.lru_cache(maxsize=128)
def get_searchable_fields( def get_searchable_fields(
model: type[DeclarativeBase], model: type[DeclarativeBase],
*, *,
@@ -101,14 +101,11 @@ def build_search_filters(
if isinstance(search, str): if isinstance(search, str):
config = SearchConfig(query=search, fields=search_fields) config = SearchConfig(query=search, fields=search_fields)
else: else:
config = search config = (
if search_fields is not None: replace(search, fields=search_fields)
config = SearchConfig( if search_fields is not None
query=config.query, else search
fields=search_fields, )
case_sensitive=config.case_sensitive,
match_mode=config.match_mode,
)
if not config.query or not config.query.strip(): if not config.query or not config.query.strip():
return [], [] return [], []
@@ -227,8 +224,6 @@ async def build_facets(
q = q.outerjoin(rel) q = q.outerjoin(rel)
if base_filters: if base_filters:
from sqlalchemy import and_
q = q.where(and_(*base_filters)) q = q.where(and_(*base_filters))
q = q.order_by(column) q = q.order_by(column)

View File

@@ -1,20 +1,17 @@
"""Dependency factories for FastAPI routes.""" """Dependency factories for FastAPI routes."""
import inspect import inspect
from collections.abc import AsyncGenerator, Callable from collections.abc import Callable
from typing import Any, TypeVar, cast from typing import Any, cast
from fastapi import Depends from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase
from .crud import CrudFactory from .crud import CrudFactory
from .types import ModelType, SessionDependency
__all__ = ["BodyDependency", "PathDependency"] __all__ = ["BodyDependency", "PathDependency"]
ModelType = TypeVar("ModelType", bound=DeclarativeBase)
SessionDependency = Callable[[], AsyncGenerator[AsyncSession, None]]
def PathDependency( def PathDependency(
model: type[ModelType], model: type[ModelType],

View File

@@ -6,6 +6,7 @@ from .exceptions import (
ConflictError, ConflictError,
ForbiddenError, ForbiddenError,
InvalidFacetFilterError, InvalidFacetFilterError,
InvalidOrderFieldError,
NoSearchableFieldsError, NoSearchableFieldsError,
NotFoundError, NotFoundError,
UnauthorizedError, UnauthorizedError,
@@ -21,6 +22,7 @@ __all__ = [
"generate_error_responses", "generate_error_responses",
"init_exceptions_handlers", "init_exceptions_handlers",
"InvalidFacetFilterError", "InvalidFacetFilterError",
"InvalidOrderFieldError",
"NoSearchableFieldsError", "NoSearchableFieldsError",
"NotFoundError", "NotFoundError",
"UnauthorizedError", "UnauthorizedError",

View File

@@ -128,6 +128,31 @@ class InvalidFacetFilterError(ApiException):
super().__init__(detail) super().__init__(detail)
class InvalidOrderFieldError(ApiException):
"""Raised when order_by contains a field not in the allowed order fields."""
api_error = ApiError(
code=422,
msg="Invalid Order Field",
desc="The requested order field is not allowed for this resource.",
err_code="SORT-422",
)
def __init__(self, field: str, valid_fields: list[str]) -> None:
"""Initialize the exception.
Args:
field: The unknown order field provided by the caller
valid_fields: List of valid field names
"""
self.field = field
self.valid_fields = valid_fields
detail = (
f"'{field}' is not an allowed order field. Valid fields: {valid_fields}."
)
super().__init__(detail)
def generate_error_responses( def generate_error_responses(
*errors: type[ApiException], *errors: type[ApiException],
) -> dict[int | str, dict[str, Any]]: ) -> dict[int | str, dict[str, Any]]:

View File

@@ -10,6 +10,10 @@ from fastapi.responses import JSONResponse
from ..schemas import ErrorResponse, ResponseStatus from ..schemas import ErrorResponse, ResponseStatus
from .exceptions import ApiException from .exceptions import ApiException
_VALIDATION_LOCATION_PARAMS: frozenset[str] = frozenset(
{"body", "query", "path", "header", "cookie"}
)
def init_exceptions_handlers(app: FastAPI) -> FastAPI: def init_exceptions_handlers(app: FastAPI) -> FastAPI:
"""Register exception handlers and custom OpenAPI schema on a FastAPI app. """Register exception handlers and custom OpenAPI schema on a FastAPI app.
@@ -106,9 +110,7 @@ def _format_validation_error(
for error in errors: for error in errors:
field_path = ".".join( field_path = ".".join(
str(loc) str(loc) for loc in error["loc"] if loc not in _VALIDATION_LOCATION_PARAMS
for loc in error["loc"]
if loc not in ("body", "query", "path", "header", "cookie")
) )
formatted_errors.append( formatted_errors.append(
{ {

View File

@@ -1,24 +1,84 @@
"""Fixture loading utilities for database seeding.""" """Fixture loading utilities for database seeding."""
from collections.abc import Callable, Sequence from collections.abc import Callable, Sequence
from typing import Any, TypeVar from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import DeclarativeBase
from ..db import get_transaction from ..db import get_transaction
from ..logger import get_logger from ..logger import get_logger
from ..types import ModelType
from .enum import LoadStrategy from .enum import LoadStrategy
from .registry import Context, FixtureRegistry from .registry import Context, FixtureRegistry
logger = get_logger() logger = get_logger()
T = TypeVar("T", bound=DeclarativeBase)
async def _load_ordered(
session: AsyncSession,
registry: FixtureRegistry,
ordered_names: list[str],
strategy: LoadStrategy,
) -> dict[str, list[DeclarativeBase]]:
"""Load fixtures in order."""
results: dict[str, list[DeclarativeBase]] = {}
for name in ordered_names:
fixture = registry.get(name)
instances = list(fixture.func())
if not instances:
results[name] = []
continue
model_name = type(instances[0]).__name__
loaded: list[DeclarativeBase] = []
async with get_transaction(session):
for instance in instances:
if strategy == LoadStrategy.INSERT:
session.add(instance)
loaded.append(instance)
elif strategy == LoadStrategy.MERGE:
merged = await session.merge(instance)
loaded.append(merged)
else: # LoadStrategy.SKIP_EXISTING
pk = _get_primary_key(instance)
if pk is not None:
existing = await session.get(type(instance), pk)
if existing is None:
session.add(instance)
loaded.append(instance)
else:
session.add(instance)
loaded.append(instance)
results[name] = loaded
logger.info(f"Loaded fixture '{name}': {len(loaded)} {model_name}(s)")
return results
def _get_primary_key(instance: DeclarativeBase) -> Any | None:
"""Get the primary key value of a model instance."""
mapper = instance.__class__.__mapper__
pk_cols = mapper.primary_key
if len(pk_cols) == 1:
return getattr(instance, pk_cols[0].name, None)
pk_values = tuple(getattr(instance, col.name, None) for col in pk_cols)
if all(v is not None for v in pk_values):
return pk_values
return None
def get_obj_by_attr( def get_obj_by_attr(
fixtures: Callable[[], Sequence[T]], attr_name: str, value: Any fixtures: Callable[[], Sequence[ModelType]], attr_name: str, value: Any
) -> T: ) -> ModelType:
"""Get a SQLAlchemy model instance by matching an attribute value. """Get a SQLAlchemy model instance by matching an attribute value.
Args: Args:
@@ -57,13 +117,6 @@ async def load_fixtures(
Returns: Returns:
Dict mapping fixture names to loaded instances Dict mapping fixture names to loaded instances
Example:
```python
# Loads 'roles' first (dependency), then 'users'
result = await load_fixtures(session, fixtures, "users")
print(result["users"]) # [User(...), ...]
```
""" """
ordered = registry.resolve_dependencies(*names) ordered = registry.resolve_dependencies(*names)
return await _load_ordered(session, registry, ordered, strategy) return await _load_ordered(session, registry, ordered, strategy)
@@ -85,76 +138,6 @@ async def load_fixtures_by_context(
Returns: Returns:
Dict mapping fixture names to loaded instances Dict mapping fixture names to loaded instances
Example:
```python
# Load base + testing fixtures
await load_fixtures_by_context(
session, fixtures,
Context.BASE, Context.TESTING
)
```
""" """
ordered = registry.resolve_context_dependencies(*contexts) ordered = registry.resolve_context_dependencies(*contexts)
return await _load_ordered(session, registry, ordered, strategy) return await _load_ordered(session, registry, ordered, strategy)
async def _load_ordered(
session: AsyncSession,
registry: FixtureRegistry,
ordered_names: list[str],
strategy: LoadStrategy,
) -> dict[str, list[DeclarativeBase]]:
"""Load fixtures in order."""
results: dict[str, list[DeclarativeBase]] = {}
for name in ordered_names:
fixture = registry.get(name)
instances = list(fixture.func())
if not instances:
results[name] = []
continue
model_name = type(instances[0]).__name__
loaded: list[DeclarativeBase] = []
async with get_transaction(session):
for instance in instances:
if strategy == LoadStrategy.INSERT:
session.add(instance)
loaded.append(instance)
elif strategy == LoadStrategy.MERGE:
merged = await session.merge(instance)
loaded.append(merged)
elif strategy == LoadStrategy.SKIP_EXISTING:
pk = _get_primary_key(instance)
if pk is not None:
existing = await session.get(type(instance), pk)
if existing is None:
session.add(instance)
loaded.append(instance)
else:
session.add(instance)
loaded.append(instance)
results[name] = loaded
logger.info(f"Loaded fixture '{name}': {len(loaded)} {model_name}(s)")
return results
def _get_primary_key(instance: DeclarativeBase) -> Any | None:
"""Get the primary key value of a model instance."""
mapper = instance.__class__.__mapper__
pk_cols = mapper.primary_key
if len(pk_cols) == 1:
return getattr(instance, pk_cols[0].name, None)
pk_values = tuple(getattr(instance, col.name, None) for col in pk_cols)
if all(v is not None for v in pk_values):
return pk_values
return None

View File

@@ -53,17 +53,23 @@ def init_metrics(
logger.debug("Initialising metric provider '%s'", provider.name) logger.debug("Initialising metric provider '%s'", provider.name)
provider.func() provider.func()
collectors = registry.get_collectors() # Partition collectors and cache env check at startup — both are stable for the app lifetime.
async_collectors = [
c for c in registry.get_collectors() if asyncio.iscoroutinefunction(c.func)
]
sync_collectors = [
c for c in registry.get_collectors() if not asyncio.iscoroutinefunction(c.func)
]
multiprocess_mode = _is_multiprocess()
@app.get(path, include_in_schema=False) @app.get(path, include_in_schema=False)
async def metrics_endpoint() -> Response: async def metrics_endpoint() -> Response:
for collector in collectors: for collector in sync_collectors:
if asyncio.iscoroutinefunction(collector.func): collector.func()
await collector.func() for collector in async_collectors:
else: await collector.func()
collector.func()
if _is_multiprocess(): if multiprocess_mode:
prom_registry = CollectorRegistry() prom_registry = CollectorRegistry()
multiprocess.MultiProcessCollector(prom_registry) multiprocess.MultiProcessCollector(prom_registry)
output = generate_latest(prom_registry) output = generate_latest(prom_registry)

View File

@@ -1,10 +1,12 @@
"""Base Pydantic schemas for API responses.""" """Base Pydantic schemas for API responses."""
from enum import Enum from enum import Enum
from typing import Any, ClassVar, Generic, TypeVar from typing import Any, ClassVar, Generic
from pydantic import BaseModel, ConfigDict from pydantic import BaseModel, ConfigDict
from .types import DataT
__all__ = [ __all__ = [
"ApiError", "ApiError",
"CursorPagination", "CursorPagination",
@@ -16,8 +18,6 @@ __all__ = [
"ResponseStatus", "ResponseStatus",
] ]
DataT = TypeVar("DataT")
class PydanticBase(BaseModel): class PydanticBase(BaseModel):
"""Base class for all Pydantic models with common configuration.""" """Base class for all Pydantic models with common configuration."""

View File

@@ -0,0 +1,27 @@
"""Shared type aliases for the fastapi-toolsets package."""
from collections.abc import AsyncGenerator, Callable, Mapping
from typing import Any, TypeVar
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase, QueryableAttribute
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.sql.elements import ColumnElement
# Generic TypeVars
DataT = TypeVar("DataT")
ModelType = TypeVar("ModelType", bound=DeclarativeBase)
SchemaType = TypeVar("SchemaType", bound=BaseModel)
# CRUD type aliases
JoinType = list[tuple[type[DeclarativeBase], Any]]
M2MFieldType = Mapping[str, QueryableAttribute[Any]]
OrderByClause = ColumnElement[Any] | QueryableAttribute[Any]
# Search / facet type aliases
SearchFieldType = InstrumentedAttribute[Any] | tuple[InstrumentedAttribute[Any], ...]
FacetFieldType = SearchFieldType
# Dependency type aliases
SessionDependency = Callable[[], AsyncGenerator[AsyncSession, None]]

View File

@@ -92,6 +92,15 @@ class IntRole(Base):
name: Mapped[str] = mapped_column(String(50), unique=True) name: Mapped[str] = mapped_column(String(50), unique=True)
class Permission(Base):
"""Test model with composite primary key."""
__tablename__ = "permissions"
subject: Mapped[str] = mapped_column(String(50), primary_key=True)
action: Mapped[str] = mapped_column(String(50), primary_key=True)
class Event(Base): class Event(Base):
"""Test model with DateTime and Date cursor columns.""" """Test model with DateTime and Date cursor columns."""

View File

@@ -1,9 +1,11 @@
"""Tests for CRUD search functionality.""" """Tests for CRUD search functionality."""
import inspect
import uuid import uuid
import pytest import pytest
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.sql.elements import ColumnElement, UnaryExpression
from fastapi_toolsets.crud import ( from fastapi_toolsets.crud import (
CrudFactory, CrudFactory,
@@ -11,6 +13,7 @@ from fastapi_toolsets.crud import (
SearchConfig, SearchConfig,
get_searchable_fields, get_searchable_fields,
) )
from fastapi_toolsets.exceptions import InvalidOrderFieldError
from fastapi_toolsets.schemas import OffsetPagination from fastapi_toolsets.schemas import OffsetPagination
from .conftest import ( from .conftest import (
@@ -1046,3 +1049,144 @@ class TestFilterParamsSchema:
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 2 assert result.pagination.total_count == 2
class TestOrderParamsSchema:
"""Tests for AsyncCrud.order_params()."""
def test_generates_order_by_and_order_params(self):
"""Returned dependency has order_by and order query params."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username, User.email])
dep = UserOrderCrud.order_params()
param_names = set(inspect.signature(dep).parameters)
assert param_names == {"order_by", "order"}
def test_dependency_name_includes_model_name(self):
"""Dependency function is named after the model."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep = UserOrderCrud.order_params()
assert getattr(dep, "__name__") == "UserOrderParams"
def test_raises_when_no_order_fields(self):
"""ValueError raised when no order_fields are configured or provided."""
with pytest.raises(ValueError, match="no order_fields"):
UserCrud.order_params()
def test_order_fields_override(self):
"""order_fields= parameter overrides the class-level default."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username, User.email])
dep = UserOrderCrud.order_params(order_fields=[User.email])
param_names = set(inspect.signature(dep).parameters)
assert "order_by" in param_names
# description should only mention email, not username
sig = inspect.signature(dep)
description = sig.parameters["order_by"].default.description
assert "email" in description
assert "username" not in description
def test_order_by_description_lists_valid_fields(self):
"""order_by query param description mentions each allowed field."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username, User.email])
dep = UserOrderCrud.order_params()
sig = inspect.signature(dep)
description = sig.parameters["order_by"].default.description
assert "username" in description
assert "email" in description
def test_default_order_reflected_in_order_default(self):
"""default_order is used as the default value for order."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep_asc = UserOrderCrud.order_params(default_order="asc")
dep_desc = UserOrderCrud.order_params(default_order="desc")
sig_asc = inspect.signature(dep_asc)
sig_desc = inspect.signature(dep_desc)
assert sig_asc.parameters["order"].default.default == "asc"
assert sig_desc.parameters["order"].default.default == "desc"
@pytest.mark.anyio
async def test_no_order_by_no_default_returns_none(self):
"""Returns None when order_by is absent and no default_field is set."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep = UserOrderCrud.order_params()
result = await dep(order_by=None, order="asc")
assert result is None
@pytest.mark.anyio
async def test_no_order_by_with_default_field_returns_asc_expression(self):
"""Returns default_field.asc() when order_by absent and order=asc."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep = UserOrderCrud.order_params(default_field=User.username)
result = await dep(order_by=None, order="asc")
assert isinstance(result, UnaryExpression)
assert "ASC" in str(result)
@pytest.mark.anyio
async def test_no_order_by_with_default_field_returns_desc_expression(self):
"""Returns default_field.desc() when order_by absent and order=desc."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep = UserOrderCrud.order_params(default_field=User.username)
result = await dep(order_by=None, order="desc")
assert isinstance(result, UnaryExpression)
assert "DESC" in str(result)
@pytest.mark.anyio
async def test_valid_order_by_asc(self):
"""Returns field.asc() for a valid order_by with order=asc."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep = UserOrderCrud.order_params()
result = await dep(order_by="username", order="asc")
assert isinstance(result, UnaryExpression)
assert "ASC" in str(result)
@pytest.mark.anyio
async def test_valid_order_by_desc(self):
"""Returns field.desc() for a valid order_by with order=desc."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep = UserOrderCrud.order_params()
result = await dep(order_by="username", order="desc")
assert isinstance(result, UnaryExpression)
assert "DESC" in str(result)
@pytest.mark.anyio
async def test_invalid_order_by_raises_invalid_order_field_error(self):
"""Raises InvalidOrderFieldError for an unknown order_by value."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep = UserOrderCrud.order_params()
with pytest.raises(InvalidOrderFieldError) as exc_info:
await dep(order_by="nonexistent", order="asc")
assert exc_info.value.field == "nonexistent"
assert "username" in exc_info.value.valid_fields
@pytest.mark.anyio
async def test_multiple_fields_all_resolve(self):
"""All configured fields resolve correctly via order_by."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username, User.email])
dep = UserOrderCrud.order_params()
result_username = await dep(order_by="username", order="asc")
result_email = await dep(order_by="email", order="desc")
assert isinstance(result_username, ColumnElement)
assert isinstance(result_email, ColumnElement)
@pytest.mark.anyio
async def test_order_params_integrates_with_get_multi(
self, db_session: AsyncSession
):
"""order_params output is accepted by get_multi(order_by=...)."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
await UserCrud.create(
db_session, UserCreate(username="charlie", email="c@test.com")
)
await UserCrud.create(
db_session, UserCreate(username="alice", email="a@test.com")
)
dep = UserOrderCrud.order_params()
order_by = await dep(order_by="username", order="asc")
results = await UserOrderCrud.get_multi(db_session, order_by=order_by)
assert results[0].username == "alice"
assert results[1].username == "charlie"

View File

@@ -15,12 +15,14 @@ from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_asyn
from docs_src.examples.pagination_search.db import get_db from docs_src.examples.pagination_search.db import get_db
from docs_src.examples.pagination_search.models import Article, Base, Category from docs_src.examples.pagination_search.models import Article, Base, Category
from docs_src.examples.pagination_search.routes import router from docs_src.examples.pagination_search.routes import router
from fastapi_toolsets.exceptions import init_exceptions_handlers
from .conftest import DATABASE_URL from .conftest import DATABASE_URL
def build_app(session: AsyncSession) -> FastAPI: def build_app(session: AsyncSession) -> FastAPI:
app = FastAPI() app = FastAPI()
init_exceptions_handlers(app)
async def override_get_db(): async def override_get_db():
yield session yield session
@@ -269,3 +271,125 @@ class TestCursorPagination:
body = resp.json() body = resp.json()
assert len(body["data"]) == 1 assert len(body["data"]) == 1
assert body["data"][0]["title"] == "SQLAlchemy async" assert body["data"][0]["title"] == "SQLAlchemy async"
class TestOffsetSorting:
"""Tests for order_by / order query parameters on the offset endpoint."""
@pytest.mark.anyio
async def test_default_order_uses_created_at_asc(
self, client: AsyncClient, ex_db_session
):
"""No order_by → default field (created_at) ASC."""
await seed(ex_db_session)
resp = await client.get("/articles/offset")
assert resp.status_code == 200
titles = [a["title"] for a in resp.json()["data"]]
assert titles == ["FastAPI tips", "SQLAlchemy async", "Draft notes"]
@pytest.mark.anyio
async def test_order_by_title_asc(self, client: AsyncClient, ex_db_session):
"""order_by=title&order=asc returns alphabetical order."""
await seed(ex_db_session)
resp = await client.get("/articles/offset?order_by=title&order=asc")
assert resp.status_code == 200
titles = [a["title"] for a in resp.json()["data"]]
assert titles == ["Draft notes", "FastAPI tips", "SQLAlchemy async"]
@pytest.mark.anyio
async def test_order_by_title_desc(self, client: AsyncClient, ex_db_session):
"""order_by=title&order=desc returns reverse alphabetical order."""
await seed(ex_db_session)
resp = await client.get("/articles/offset?order_by=title&order=desc")
assert resp.status_code == 200
titles = [a["title"] for a in resp.json()["data"]]
assert titles == ["SQLAlchemy async", "FastAPI tips", "Draft notes"]
@pytest.mark.anyio
async def test_order_by_created_at_desc(self, client: AsyncClient, ex_db_session):
"""order_by=created_at&order=desc returns newest-first."""
await seed(ex_db_session)
resp = await client.get("/articles/offset?order_by=created_at&order=desc")
assert resp.status_code == 200
titles = [a["title"] for a in resp.json()["data"]]
assert titles == ["Draft notes", "SQLAlchemy async", "FastAPI tips"]
@pytest.mark.anyio
async def test_invalid_order_by_returns_422(
self, client: AsyncClient, ex_db_session
):
"""Unknown order_by field returns 422 with SORT-422 error code."""
resp = await client.get("/articles/offset?order_by=nonexistent_field")
assert resp.status_code == 422
body = resp.json()
assert body["error_code"] == "SORT-422"
assert body["status"] == "FAIL"
class TestCursorSorting:
"""Tests for order_by / order query parameters on the cursor endpoint.
In cursor_paginate the cursor_column is always the primary sort; order_by
acts as a secondary tiebreaker. With the seeded articles (all having unique
created_at values) the overall ordering is always created_at ASC regardless
of the order_by value — only the valid/invalid field check and the response
shape are meaningful here.
"""
@pytest.mark.anyio
async def test_default_order_uses_created_at_asc(
self, client: AsyncClient, ex_db_session
):
"""No order_by → default field (created_at) ASC."""
await seed(ex_db_session)
resp = await client.get("/articles/cursor")
assert resp.status_code == 200
titles = [a["title"] for a in resp.json()["data"]]
assert titles == ["FastAPI tips", "SQLAlchemy async", "Draft notes"]
@pytest.mark.anyio
async def test_order_by_title_asc_accepted(
self, client: AsyncClient, ex_db_session
):
"""order_by=title is a valid field — request succeeds and returns all articles."""
await seed(ex_db_session)
resp = await client.get("/articles/cursor?order_by=title&order=asc")
assert resp.status_code == 200
assert len(resp.json()["data"]) == 3
@pytest.mark.anyio
async def test_order_by_title_desc_accepted(
self, client: AsyncClient, ex_db_session
):
"""order_by=title&order=desc is valid — request succeeds and returns all articles."""
await seed(ex_db_session)
resp = await client.get("/articles/cursor?order_by=title&order=desc")
assert resp.status_code == 200
assert len(resp.json()["data"]) == 3
@pytest.mark.anyio
async def test_invalid_order_by_returns_422(
self, client: AsyncClient, ex_db_session
):
"""Unknown order_by field returns 422 with SORT-422 error code."""
resp = await client.get("/articles/cursor?order_by=nonexistent_field")
assert resp.status_code == 422
body = resp.json()
assert body["error_code"] == "SORT-422"
assert body["status"] == "FAIL"

View File

@@ -8,6 +8,7 @@ from fastapi_toolsets.exceptions import (
ApiException, ApiException,
ConflictError, ConflictError,
ForbiddenError, ForbiddenError,
InvalidOrderFieldError,
NotFoundError, NotFoundError,
UnauthorizedError, UnauthorizedError,
generate_error_responses, generate_error_responses,
@@ -334,3 +335,43 @@ class TestExceptionIntegration:
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == {"id": 1} assert response.json() == {"id": 1}
class TestInvalidOrderFieldError:
"""Tests for InvalidOrderFieldError exception."""
def test_api_error_attributes(self):
"""InvalidOrderFieldError has correct api_error metadata."""
assert InvalidOrderFieldError.api_error.code == 422
assert InvalidOrderFieldError.api_error.err_code == "SORT-422"
assert InvalidOrderFieldError.api_error.msg == "Invalid Order Field"
def test_stores_field_and_valid_fields(self):
"""InvalidOrderFieldError stores field and valid_fields on the instance."""
error = InvalidOrderFieldError("unknown", ["name", "created_at"])
assert error.field == "unknown"
assert error.valid_fields == ["name", "created_at"]
def test_message_contains_field_and_valid_fields(self):
"""Exception message mentions the bad field and valid options."""
error = InvalidOrderFieldError("bad_field", ["name", "email"])
assert "bad_field" in str(error)
assert "name" in str(error)
assert "email" in str(error)
def test_handled_as_422_by_exception_handler(self):
"""init_exceptions_handlers turns InvalidOrderFieldError into a 422 response."""
app = FastAPI()
init_exceptions_handlers(app)
@app.get("/items")
async def list_items():
raise InvalidOrderFieldError("bad", ["name"])
client = TestClient(app)
response = client.get("/items")
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "SORT-422"
assert data["status"] == "FAIL"

View File

@@ -14,7 +14,9 @@ from fastapi_toolsets.fixtures import (
load_fixtures_by_context, load_fixtures_by_context,
) )
from .conftest import Role, User from fastapi_toolsets.fixtures.utils import _get_primary_key
from .conftest import IntRole, Permission, Role, User
class TestContext: class TestContext:
@@ -597,6 +599,46 @@ class TestLoadFixtures:
count = await RoleCrud.count(db_session) count = await RoleCrud.count(db_session)
assert count == 2 assert count == 2
@pytest.mark.anyio
async def test_skip_existing_skips_if_record_exists(self, db_session: AsyncSession):
"""SKIP_EXISTING returns empty loaded list when the record already exists."""
registry = FixtureRegistry()
role_id = uuid.uuid4()
@registry.register
def roles():
return [Role(id=role_id, name="admin")]
# First load — inserts the record.
result1 = await load_fixtures(
db_session, registry, "roles", strategy=LoadStrategy.SKIP_EXISTING
)
assert len(result1["roles"]) == 1
# Remove from identity map so session.get() queries the DB in the second load.
db_session.expunge_all()
# Second load — record exists in DB, nothing should be added.
result2 = await load_fixtures(
db_session, registry, "roles", strategy=LoadStrategy.SKIP_EXISTING
)
assert result2["roles"] == []
@pytest.mark.anyio
async def test_skip_existing_null_pk_inserts(self, db_session: AsyncSession):
"""SKIP_EXISTING inserts when the instance has no PK set (auto-increment)."""
registry = FixtureRegistry()
@registry.register
def int_roles():
# No id provided — PK is None before INSERT (autoincrement).
return [IntRole(name="member")]
result = await load_fixtures(
db_session, registry, "int_roles", strategy=LoadStrategy.SKIP_EXISTING
)
assert len(result["int_roles"]) == 1
class TestLoadFixturesByContext: class TestLoadFixturesByContext:
"""Tests for load_fixtures_by_context function.""" """Tests for load_fixtures_by_context function."""
@@ -755,3 +797,19 @@ class TestGetObjByAttr:
"""Raises StopIteration when value type doesn't match.""" """Raises StopIteration when value type doesn't match."""
with pytest.raises(StopIteration): with pytest.raises(StopIteration):
get_obj_by_attr(self.roles, "id", "not-a-uuid") get_obj_by_attr(self.roles, "id", "not-a-uuid")
class TestGetPrimaryKey:
"""Unit tests for the _get_primary_key helper (composite PK paths)."""
def test_composite_pk_all_set(self):
"""Returns a tuple when all composite PK values are set."""
instance = Permission(subject="post", action="read")
pk = _get_primary_key(instance)
assert pk == ("post", "read")
def test_composite_pk_partial_none(self):
"""Returns None when any composite PK value is None."""
instance = Permission(subject="post") # action is None
pk = _get_primary_key(instance)
assert pk is None

2
uv.lock generated
View File

@@ -251,7 +251,7 @@ wheels = [
[[package]] [[package]]
name = "fastapi-toolsets" name = "fastapi-toolsets"
version = "1.2.1" version = "1.3.0"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "asyncpg" }, { name = "asyncpg" },