1 Commits

Author SHA1 Message Date
44921e5966 refactor: remove deprecated parameter and function 2026-02-27 15:32:07 -05:00
26 changed files with 275 additions and 788 deletions

View File

@@ -1,6 +1,6 @@
# Pagination & search
This example builds an articles listing endpoint that supports **offset pagination**, **cursor pagination**, **full-text search**, **faceted filtering**, and **sorting** — all from a single `CrudFactory` definition.
This example builds an articles listing endpoint that supports **offset pagination**, **cursor pagination**, **full-text search**, and **faceted filtering** — all from a single `CrudFactory` definition.
## Models
@@ -16,7 +16,7 @@ This example builds an articles listing endpoint that supports **offset paginati
## Crud
Declare `searchable_fields`, `facet_fields`, and `order_fields` once on [`CrudFactory`](../reference/crud.md#fastapi_toolsets.crud.factory.CrudFactory). All endpoints built from this class share the same defaults and can override them per call.
Declare `facet_fields` and `searchable_fields` once on [`CrudFactory`](../reference/crud.md#fastapi_toolsets.crud.factory.CrudFactory). All endpoints built from this class share the same defaults and can override them per call.
```python title="crud.py"
--8<-- "docs_src/examples/pagination_search/crud.py"
@@ -46,14 +46,14 @@ Declare `searchable_fields`, `facet_fields`, and `order_fields` once on [`CrudFa
Best for admin panels or any UI that needs a total item count and numbered pages.
```python title="routes.py:1:36"
--8<-- "docs_src/examples/pagination_search/routes.py:1:36"
```python title="routes.py:1:27"
--8<-- "docs_src/examples/pagination_search/routes.py:1:27"
```
**Example request**
```
GET /articles/offset?page=2&items_per_page=10&search=fastapi&status=published&order_by=title&order=asc
GET /articles/offset?page=2&items_per_page=10&search=fastapi&status=published
```
**Example response**
@@ -83,14 +83,14 @@ GET /articles/offset?page=2&items_per_page=10&search=fastapi&status=published&or
Best for feeds, infinite scroll, or any high-throughput API where offset performance degrades.
```python title="routes.py:39:59"
--8<-- "docs_src/examples/pagination_search/routes.py:39:59"
```python title="routes.py:30:45"
--8<-- "docs_src/examples/pagination_search/routes.py:30:45"
```
**Example request**
```
GET /articles/cursor?items_per_page=10&status=published&order_by=created_at&order=desc
GET /articles/cursor?items_per_page=10&status=published
```
**Example response**

View File

@@ -292,8 +292,6 @@ Use `filter_by` to pass the client's chosen filter values directly — no need t
Use [`filter_params()`](../reference/crud.md#fastapi_toolsets.crud.factory.AsyncCrud.filter_params) to generate a dict with the facet filter values from the query parameters:
```python
from typing import Annotated
from fastapi import Depends
UserCrud = CrudFactory(
@@ -305,7 +303,7 @@ UserCrud = CrudFactory(
async def list_users(
session: SessionDep,
page: int = 1,
filter_by: Annotated[dict[str, list[str]], Depends(UserCrud.filter_params())],
filter_by: dict[str, list[str]] = Depends(UserCrud.filter_params()),
) -> PaginatedResponse[UserRead]:
return await UserCrud.offset_paginate(
session=session,
@@ -322,58 +320,6 @@ GET /users?status=active&country=FR → filter_by={"status": ["active"], "coun
GET /users?role=admin&role=editor → filter_by={"role": ["admin", "editor"]} (IN clause)
```
## Sorting
!!! info "Added in `v1.3`"
Declare `order_fields` on the CRUD class to expose client-driven column ordering via `order_by` and `order` query parameters.
```python
UserCrud = CrudFactory(
model=User,
order_fields=[
User.name,
User.created_at,
],
)
```
Call [`order_params()`](../reference/crud.md#fastapi_toolsets.crud.factory.AsyncCrud.order_params) to generate a FastAPI dependency that maps the query parameters to an [`OrderByClause`](../reference/crud.md#fastapi_toolsets.crud.factory.OrderByClause) expression:
```python
from typing import Annotated
from fastapi import Depends
from fastapi_toolsets.crud import OrderByClause
@router.get("")
async def list_users(
session: SessionDep,
order_by: Annotated[OrderByClause | None, Depends(UserCrud.order_params())],
) -> PaginatedResponse[UserRead]:
return await UserCrud.offset_paginate(session=session, order_by=order_by)
```
The dependency adds two query parameters to the endpoint:
| Parameter | Type |
| ---------- | --------------- |
| `order_by` | `str | null` |
| `order` | `asc` or `desc` |
```
GET /users?order_by=name&order=asc → ORDER BY users.name ASC
GET /users?order_by=name&order=desc → ORDER BY users.name DESC
```
An unknown `order_by` value raises [`InvalidOrderFieldError`](../reference/exceptions.md#fastapi_toolsets.exceptions.exceptions.InvalidOrderFieldError) (HTTP 422).
You can also pass `order_fields` directly to `order_params()` to override the class-level defaults without modifying them:
```python
UserOrderParams = UserCrud.order_params(order_fields=[User.name])
```
## Relationship loading
!!! info "Added in `v1.1`"

View File

@@ -13,7 +13,6 @@ from fastapi_toolsets.exceptions import (
ConflictError,
NoSearchableFieldsError,
InvalidFacetFilterError,
InvalidOrderFieldError,
generate_error_responses,
init_exceptions_handlers,
)
@@ -33,8 +32,6 @@ from fastapi_toolsets.exceptions import (
## ::: fastapi_toolsets.exceptions.exceptions.InvalidFacetFilterError
## ::: fastapi_toolsets.exceptions.exceptions.InvalidOrderFieldError
## ::: fastapi_toolsets.exceptions.exceptions.generate_error_responses
## ::: fastapi_toolsets.exceptions.handler.init_exceptions_handlers

View File

@@ -1,9 +1,6 @@
from fastapi import FastAPI
from fastapi_toolsets.exceptions import init_exceptions_handlers
from .routes import router
app = FastAPI()
init_exceptions_handlers(app=app)
app.include_router(router=router)

View File

@@ -14,8 +14,6 @@ ArticleCrud = CrudFactory(
Article.status,
(Article.category, Category.name),
],
order_fields=[ # fields exposed for client-driven ordering
Article.title,
Article.created_at,
],
)
ArticleFilters = ArticleCrud.filter_params()

View File

@@ -1,13 +1,9 @@
from typing import Annotated
from fastapi import APIRouter, Depends, Query
from fastapi_toolsets.crud import OrderByClause
from fastapi_toolsets.schemas import PaginatedResponse
from .crud import ArticleCrud
from .db import SessionDep
from .models import Article
from .schemas import ArticleRead
router = APIRouter(prefix="/articles")
@@ -16,14 +12,10 @@ router = APIRouter(prefix="/articles")
@router.get("/offset")
async def list_articles_offset(
session: SessionDep,
filter_by: Annotated[dict[str, list[str]], Depends(ArticleCrud.filter_params())],
order_by: Annotated[
OrderByClause | None,
Depends(ArticleCrud.order_params(default_field=Article.created_at)),
],
page: int = Query(1, ge=1),
items_per_page: int = Query(20, ge=1, le=100),
search: str | None = None,
filter_by: dict[str, list[str]] = Depends(ArticleCrud.filter_params()),
) -> PaginatedResponse[ArticleRead]:
return await ArticleCrud.offset_paginate(
session=session,
@@ -31,7 +23,6 @@ async def list_articles_offset(
items_per_page=items_per_page,
search=search,
filter_by=filter_by or None,
order_by=order_by,
schema=ArticleRead,
)
@@ -39,14 +30,10 @@ async def list_articles_offset(
@router.get("/cursor")
async def list_articles_cursor(
session: SessionDep,
filter_by: Annotated[dict[str, list[str]], Depends(ArticleCrud.filter_params())],
order_by: Annotated[
OrderByClause | None,
Depends(ArticleCrud.order_params(default_field=Article.created_at)),
],
cursor: str | None = None,
items_per_page: int = Query(20, ge=1, le=100),
search: str | None = None,
filter_by: dict[str, list[str]] = Depends(ArticleCrud.filter_params()),
) -> PaginatedResponse[ArticleRead]:
return await ArticleCrud.cursor_paginate(
session=session,
@@ -54,6 +41,5 @@ async def list_articles_cursor(
items_per_page=items_per_page,
search=search,
filter_by=filter_by or None,
order_by=order_by,
schema=ArticleRead,
)

View File

@@ -1,6 +1,6 @@
[project]
name = "fastapi-toolsets"
version = "1.3.0"
version = "1.2.1"
description = "Production-ready utilities for FastAPI applications"
readme = "README.md"
license = "MIT"

View File

@@ -21,4 +21,4 @@ Example usage:
return Response(data={"user": user.username}, message="Success")
"""
__version__ = "1.3.0"
__version__ = "1.2.1"

View File

@@ -72,7 +72,7 @@ async def load(
registry = get_fixtures_registry()
db_context = get_db_context()
context_list = list(contexts) if contexts else [Context.BASE]
context_list = [c.value for c in contexts] if contexts else [Context.BASE]
ordered = registry.resolve_context_dependencies(*context_list)

View File

@@ -1,9 +1,12 @@
"""Generic async CRUD operations for SQLAlchemy models."""
from ..exceptions import InvalidFacetFilterError, NoSearchableFieldsError
from ..types import FacetFieldType, JoinType, M2MFieldType, OrderByClause
from .factory import CrudFactory
from .search import SearchConfig, get_searchable_fields
from .factory import CrudFactory, JoinType, M2MFieldType
from .search import (
FacetFieldType,
SearchConfig,
get_searchable_fields,
)
__all__ = [
"CrudFactory",
@@ -13,6 +16,5 @@ __all__ = [
"JoinType",
"M2MFieldType",
"NoSearchableFieldsError",
"OrderByClause",
"SearchConfig",
]

View File

@@ -6,10 +6,10 @@ import base64
import inspect
import json
import uuid as uuid_module
from collections.abc import Awaitable, Callable, Sequence
from collections.abc import Awaitable, Callable, Mapping, Sequence
from datetime import date, datetime
from decimal import Decimal
from typing import Any, ClassVar, Generic, Literal, Self, cast, overload
from typing import Any, ClassVar, Generic, Literal, Self, TypeVar, cast, overload
from fastapi import Query
from pydantic import BaseModel
@@ -23,25 +23,23 @@ from sqlalchemy.sql.base import ExecutableOption
from sqlalchemy.sql.roles import WhereHavingRole
from ..db import get_transaction
from ..exceptions import InvalidOrderFieldError, NotFoundError
from ..exceptions import NotFoundError
from ..schemas import CursorPagination, OffsetPagination, PaginatedResponse, Response
from ..types import (
FacetFieldType,
JoinType,
M2MFieldType,
ModelType,
OrderByClause,
SchemaType,
SearchFieldType,
)
from .search import (
FacetFieldType,
SearchConfig,
SearchFieldType,
build_facets,
build_filter_by,
build_search_filters,
facet_keys,
)
ModelType = TypeVar("ModelType", bound=DeclarativeBase)
SchemaType = TypeVar("SchemaType", bound=BaseModel)
JoinType = list[tuple[type[DeclarativeBase], Any]]
M2MFieldType = Mapping[str, QueryableAttribute[Any]]
def _encode_cursor(value: Any) -> str:
"""Encode cursor column value as an base64 string."""
@@ -53,22 +51,6 @@ def _decode_cursor(cursor: str) -> str:
return json.loads(base64.b64decode(cursor.encode()).decode())
def _apply_joins(q: Any, joins: JoinType | None, outer_join: bool) -> Any:
"""Apply a list of (model, condition) joins to a SQLAlchemy select query."""
if not joins:
return q
for model, condition in joins:
q = q.outerjoin(model, condition) if outer_join else q.join(model, condition)
return q
def _apply_search_joins(q: Any, search_joins: list[Any]) -> Any:
"""Apply relationship-based outer joins (from search/filter_by) to a query."""
for join_rel in search_joins:
q = q.outerjoin(join_rel)
return q
class AsyncCrud(Generic[ModelType]):
"""Generic async CRUD operations for SQLAlchemy models.
@@ -78,7 +60,6 @@ class AsyncCrud(Generic[ModelType]):
model: ClassVar[type[DeclarativeBase]]
searchable_fields: ClassVar[Sequence[SearchFieldType] | None] = None
facet_fields: ClassVar[Sequence[FacetFieldType] | None] = None
order_fields: ClassVar[Sequence[QueryableAttribute[Any]] | None] = None
m2m_fields: ClassVar[M2MFieldType | None] = None
default_load_options: ClassVar[list[ExecutableOption] | None] = None
cursor_column: ClassVar[Any | None] = None
@@ -148,48 +129,6 @@ class AsyncCrud(Generic[ModelType]):
return set()
return set(cls.m2m_fields.keys())
@classmethod
def _resolve_facet_fields(
cls: type[Self],
facet_fields: Sequence[FacetFieldType] | None,
) -> Sequence[FacetFieldType] | None:
"""Return facet_fields if given, otherwise fall back to the class-level default."""
return facet_fields if facet_fields is not None else cls.facet_fields
@classmethod
def _prepare_filter_by(
cls: type[Self],
filter_by: dict[str, Any] | BaseModel | None,
facet_fields: Sequence[FacetFieldType] | None,
) -> tuple[list[Any], list[Any]]:
"""Normalize filter_by and return (filters, joins) to apply to the query."""
if isinstance(filter_by, BaseModel):
filter_by = filter_by.model_dump(exclude_none=True)
if not filter_by:
return [], []
resolved = cls._resolve_facet_fields(facet_fields)
return build_filter_by(filter_by, resolved or [])
@classmethod
async def _build_filter_attributes(
cls: type[Self],
session: AsyncSession,
facet_fields: Sequence[FacetFieldType] | None,
filters: list[Any],
search_joins: list[Any],
) -> dict[str, list[Any]] | None:
"""Build facet filter_attributes, or return None if no facet fields configured."""
resolved = cls._resolve_facet_fields(facet_fields)
if not resolved:
return None
return await build_facets(
session,
cls.model,
resolved,
base_filters=filters,
base_joins=search_joins,
)
@classmethod
def filter_params(
cls: type[Self],
@@ -210,7 +149,7 @@ class AsyncCrud(Generic[ModelType]):
ValueError: If no facet fields are configured on this CRUD class and none are
provided via ``facet_fields``.
"""
fields = cls._resolve_facet_fields(facet_fields)
fields = facet_fields if facet_fields is not None else cls.facet_fields
if not fields:
raise ValueError(
f"{cls.__name__} has no facet_fields configured. "
@@ -236,63 +175,6 @@ class AsyncCrud(Generic[ModelType]):
return dependency
@classmethod
def order_params(
cls: type[Self],
*,
order_fields: Sequence[QueryableAttribute[Any]] | None = None,
default_field: QueryableAttribute[Any] | None = None,
default_order: Literal["asc", "desc"] = "asc",
) -> Callable[..., Awaitable[OrderByClause | None]]:
"""Return a FastAPI dependency that resolves order query params into an order_by clause.
Args:
order_fields: Override the allowed order fields. Falls back to the class-level
``order_fields`` if not provided.
default_field: Field to order by when ``order_by`` query param is absent.
If ``None`` and no ``order_by`` is provided, no ordering is applied.
default_order: Default order direction when ``order`` is absent
(``"asc"`` or ``"desc"``).
Returns:
An async dependency function named ``{Model}OrderParams`` that resolves to an
``OrderByClause`` (or ``None``). Pass it to ``Depends()`` in your route.
Raises:
ValueError: If no order fields are configured on this CRUD class and none are
provided via ``order_fields``.
InvalidOrderFieldError: When the request provides an unknown ``order_by`` value.
"""
fields = order_fields if order_fields is not None else cls.order_fields
if not fields:
raise ValueError(
f"{cls.__name__} has no order_fields configured. "
"Pass order_fields= or set them on CrudFactory."
)
field_map: dict[str, QueryableAttribute[Any]] = {f.key: f for f in fields}
valid_keys = sorted(field_map.keys())
async def dependency(
order_by: str | None = Query(
None, description=f"Field to order by. Valid values: {valid_keys}"
),
order: Literal["asc", "desc"] = Query(
default_order, description="Sort direction"
),
) -> OrderByClause | None:
if order_by is None:
if default_field is None:
return None
field = default_field
elif order_by not in field_map:
raise InvalidOrderFieldError(order_by, valid_keys)
else:
field = field_map[order_by]
return field.asc() if order == "asc" else field.desc()
dependency.__name__ = f"{cls.model.__name__}OrderParams"
return dependency
@overload
@classmethod
async def create( # pragma: no cover
@@ -348,7 +230,8 @@ class AsyncCrud(Generic[ModelType]):
await session.refresh(db_model)
result = cast(ModelType, db_model)
if schema:
return Response(data=schema.model_validate(result))
data_out = schema.model_validate(result) if schema else result
return Response(data=data_out)
return result
@overload
@@ -411,7 +294,13 @@ class AsyncCrud(Generic[ModelType]):
MultipleResultsFound: If more than one record found
"""
q = select(cls.model)
q = _apply_joins(q, joins, outer_join)
if joins:
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options):
q = q.options(*resolved)
@@ -423,7 +312,8 @@ class AsyncCrud(Generic[ModelType]):
raise NotFoundError()
result = cast(ModelType, item)
if schema:
return Response(data=schema.model_validate(result))
data_out = schema.model_validate(result) if schema else result
return Response(data=data_out)
return result
@classmethod
@@ -449,7 +339,13 @@ class AsyncCrud(Generic[ModelType]):
Model instance or None
"""
q = select(cls.model)
q = _apply_joins(q, joins, outer_join)
if joins:
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
if filters:
q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options):
@@ -466,7 +362,7 @@ class AsyncCrud(Generic[ModelType]):
joins: JoinType | None = None,
outer_join: bool = False,
load_options: list[ExecutableOption] | None = None,
order_by: OrderByClause | None = None,
order_by: Any | None = None,
limit: int | None = None,
offset: int | None = None,
) -> Sequence[ModelType]:
@@ -486,7 +382,13 @@ class AsyncCrud(Generic[ModelType]):
List of model instances
"""
q = select(cls.model)
q = _apply_joins(q, joins, outer_join)
if joins:
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
if filters:
q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options):
@@ -584,7 +486,8 @@ class AsyncCrud(Generic[ModelType]):
setattr(db_model, rel_attr, related_instances)
await session.refresh(db_model)
if schema:
return Response(data=schema.model_validate(db_model))
data_out = schema.model_validate(db_model) if schema else db_model
return Response(data=data_out)
return db_model
@classmethod
@@ -701,7 +604,13 @@ class AsyncCrud(Generic[ModelType]):
Number of matching records
"""
q = select(func.count()).select_from(cls.model)
q = _apply_joins(q, joins, outer_join)
if joins:
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
if filters:
q = q.where(and_(*filters))
result = await session.execute(q)
@@ -728,7 +637,13 @@ class AsyncCrud(Generic[ModelType]):
True if at least one record matches
"""
q = select(cls.model)
q = _apply_joins(q, joins, outer_join)
if joins:
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
q = q.where(and_(*filters)).exists().select()
result = await session.execute(q)
return bool(result.scalar())
@@ -742,7 +657,7 @@ class AsyncCrud(Generic[ModelType]):
joins: JoinType | None = None,
outer_join: bool = False,
load_options: list[ExecutableOption] | None = None,
order_by: OrderByClause | None = None,
order_by: Any | None = None,
page: int = 1,
items_per_page: int = 20,
search: str | SearchConfig | None = None,
@@ -775,29 +690,47 @@ class AsyncCrud(Generic[ModelType]):
"""
filters = list(filters) if filters else []
offset = (page - 1) * items_per_page
search_joins: list[Any] = []
fb_filters, search_joins = cls._prepare_filter_by(filter_by, facet_fields)
filters.extend(fb_filters)
if isinstance(filter_by, BaseModel):
filter_by = filter_by.model_dump(exclude_none=True) or None
# Build filter_by conditions from declared facet fields
if filter_by:
resolved_facets_for_filter = (
facet_fields if facet_fields is not None else cls.facet_fields
)
fb_filters, fb_joins = build_filter_by(
filter_by, resolved_facets_for_filter or []
)
filters.extend(fb_filters)
search_joins.extend(fb_joins)
# Build search filters
if search:
search_filters, new_search_joins = build_search_filters(
search_filters, search_joins = build_search_filters(
cls.model,
search,
search_fields=search_fields,
default_fields=cls.searchable_fields,
)
filters.extend(search_filters)
search_joins.extend(new_search_joins)
# Build query with joins
q = select(cls.model)
# Apply explicit joins
q = _apply_joins(q, joins, outer_join)
if joins:
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
# Apply search joins (always outer joins for search)
q = _apply_search_joins(q, search_joins)
for join_rel in search_joins:
q = q.outerjoin(join_rel)
if filters:
q = q.where(and_(*filters))
@@ -817,10 +750,17 @@ class AsyncCrud(Generic[ModelType]):
count_q = count_q.select_from(cls.model)
# Apply explicit joins to count query
count_q = _apply_joins(count_q, joins, outer_join)
if joins:
for model, condition in joins:
count_q = (
count_q.outerjoin(model, condition)
if outer_join
else count_q.join(model, condition)
)
# Apply search joins to count query
count_q = _apply_search_joins(count_q, search_joins)
for join_rel in search_joins:
count_q = count_q.outerjoin(join_rel)
if filters:
count_q = count_q.where(and_(*filters))
@@ -828,9 +768,19 @@ class AsyncCrud(Generic[ModelType]):
count_result = await session.execute(count_q)
total_count = count_result.scalar_one()
filter_attributes = await cls._build_filter_attributes(
session, facet_fields, filters, search_joins
# Build facets
resolved_facet_fields = (
facet_fields if facet_fields is not None else cls.facet_fields
)
filter_attributes: dict[str, list[Any]] | None = None
if resolved_facet_fields:
filter_attributes = await build_facets(
session,
cls.model,
resolved_facet_fields,
base_filters=filters or None,
base_joins=search_joins or None,
)
return PaginatedResponse(
data=items,
@@ -853,7 +803,7 @@ class AsyncCrud(Generic[ModelType]):
joins: JoinType | None = None,
outer_join: bool = False,
load_options: list[ExecutableOption] | None = None,
order_by: OrderByClause | None = None,
order_by: Any | None = None,
items_per_page: int = 20,
search: str | SearchConfig | None = None,
search_fields: Sequence[SearchFieldType] | None = None,
@@ -887,9 +837,21 @@ class AsyncCrud(Generic[ModelType]):
PaginatedResponse with CursorPagination metadata
"""
filters = list(filters) if filters else []
search_joins: list[Any] = []
fb_filters, search_joins = cls._prepare_filter_by(filter_by, facet_fields)
filters.extend(fb_filters)
if isinstance(filter_by, BaseModel):
filter_by = filter_by.model_dump(exclude_none=True) or None
# Build filter_by conditions from declared facet fields
if filter_by:
resolved_facets_for_filter = (
facet_fields if facet_fields is not None else cls.facet_fields
)
fb_filters, fb_joins = build_filter_by(
filter_by, resolved_facets_for_filter or []
)
filters.extend(fb_filters)
search_joins.extend(fb_joins)
if cls.cursor_column is None:
raise ValueError(
@@ -922,23 +884,29 @@ class AsyncCrud(Generic[ModelType]):
# Build search filters
if search:
search_filters, new_search_joins = build_search_filters(
search_filters, search_joins = build_search_filters(
cls.model,
search,
search_fields=search_fields,
default_fields=cls.searchable_fields,
)
filters.extend(search_filters)
search_joins.extend(new_search_joins)
# Build query
q = select(cls.model)
# Apply explicit joins
q = _apply_joins(q, joins, outer_join)
if joins:
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
# Apply search joins (always outer joins)
q = _apply_search_joins(q, search_joins)
for join_rel in search_joins:
q = q.outerjoin(join_rel)
if filters:
q = q.where(and_(*filters))
@@ -970,9 +938,19 @@ class AsyncCrud(Generic[ModelType]):
items: list[Any] = [schema.model_validate(item) for item in items_page]
filter_attributes = await cls._build_filter_attributes(
session, facet_fields, filters, search_joins
# Build facets
resolved_facet_fields = (
facet_fields if facet_fields is not None else cls.facet_fields
)
filter_attributes: dict[str, list[Any]] | None = None
if resolved_facet_fields:
filter_attributes = await build_facets(
session,
cls.model,
resolved_facet_fields,
base_filters=filters or None,
base_joins=search_joins or None,
)
return PaginatedResponse(
data=items,
@@ -991,7 +969,6 @@ def CrudFactory(
*,
searchable_fields: Sequence[SearchFieldType] | None = None,
facet_fields: Sequence[FacetFieldType] | None = None,
order_fields: Sequence[QueryableAttribute[Any]] | None = None,
m2m_fields: M2MFieldType | None = None,
default_load_options: list[ExecutableOption] | None = None,
cursor_column: Any | None = None,
@@ -1004,8 +981,6 @@ def CrudFactory(
facet_fields: Optional list of columns to compute distinct values for in paginated
responses. Supports direct columns (``User.status``) and relationship tuples
(``(User.role, Role.name)``). Can be overridden per call.
order_fields: Optional list of model attributes that callers are allowed to order by
via ``order_params()``. Can be overridden per call.
m2m_fields: Optional mapping for many-to-many relationships.
Maps schema field names (containing lists of IDs) to
SQLAlchemy relationship attributes.
@@ -1099,7 +1074,6 @@ def CrudFactory(
"model": model,
"searchable_fields": searchable_fields,
"facet_fields": facet_fields,
"order_fields": order_fields,
"m2m_fields": m2m_fields,
"default_load_options": default_load_options,
"cursor_column": cursor_column,

View File

@@ -1,23 +1,24 @@
"""Search utilities for AsyncCrud."""
import asyncio
import functools
from collections import Counter
from collections.abc import Sequence
from dataclasses import dataclass, replace
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Literal
from sqlalchemy import String, and_, or_, select
from sqlalchemy import String, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm.attributes import InstrumentedAttribute
from ..exceptions import InvalidFacetFilterError, NoSearchableFieldsError
from ..types import FacetFieldType, SearchFieldType
if TYPE_CHECKING:
from sqlalchemy.sql.elements import ColumnElement
SearchFieldType = InstrumentedAttribute[Any] | tuple[InstrumentedAttribute[Any], ...]
FacetFieldType = SearchFieldType
@dataclass
class SearchConfig:
@@ -36,7 +37,6 @@ class SearchConfig:
match_mode: Literal["any", "all"] = "any"
@functools.lru_cache(maxsize=128)
def get_searchable_fields(
model: type[DeclarativeBase],
*,
@@ -101,11 +101,14 @@ def build_search_filters(
if isinstance(search, str):
config = SearchConfig(query=search, fields=search_fields)
else:
config = (
replace(search, fields=search_fields)
if search_fields is not None
else search
)
config = search
if search_fields is not None:
config = SearchConfig(
query=config.query,
fields=search_fields,
case_sensitive=config.case_sensitive,
match_mode=config.match_mode,
)
if not config.query or not config.query.strip():
return [], []
@@ -224,6 +227,8 @@ async def build_facets(
q = q.outerjoin(rel)
if base_filters:
from sqlalchemy import and_
q = q.where(and_(*base_filters))
q = q.order_by(column)

View File

@@ -1,17 +1,20 @@
"""Dependency factories for FastAPI routes."""
import inspect
from collections.abc import Callable
from typing import Any, cast
from collections.abc import AsyncGenerator, Callable
from typing import Any, TypeVar, cast
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase
from .crud import CrudFactory
from .types import ModelType, SessionDependency
__all__ = ["BodyDependency", "PathDependency"]
ModelType = TypeVar("ModelType", bound=DeclarativeBase)
SessionDependency = Callable[[], AsyncGenerator[AsyncSession, None]]
def PathDependency(
model: type[ModelType],

View File

@@ -6,7 +6,6 @@ from .exceptions import (
ConflictError,
ForbiddenError,
InvalidFacetFilterError,
InvalidOrderFieldError,
NoSearchableFieldsError,
NotFoundError,
UnauthorizedError,
@@ -22,7 +21,6 @@ __all__ = [
"generate_error_responses",
"init_exceptions_handlers",
"InvalidFacetFilterError",
"InvalidOrderFieldError",
"NoSearchableFieldsError",
"NotFoundError",
"UnauthorizedError",

View File

@@ -128,31 +128,6 @@ class InvalidFacetFilterError(ApiException):
super().__init__(detail)
class InvalidOrderFieldError(ApiException):
"""Raised when order_by contains a field not in the allowed order fields."""
api_error = ApiError(
code=422,
msg="Invalid Order Field",
desc="The requested order field is not allowed for this resource.",
err_code="SORT-422",
)
def __init__(self, field: str, valid_fields: list[str]) -> None:
"""Initialize the exception.
Args:
field: The unknown order field provided by the caller
valid_fields: List of valid field names
"""
self.field = field
self.valid_fields = valid_fields
detail = (
f"'{field}' is not an allowed order field. Valid fields: {valid_fields}."
)
super().__init__(detail)
def generate_error_responses(
*errors: type[ApiException],
) -> dict[int | str, dict[str, Any]]:

View File

@@ -10,10 +10,6 @@ from fastapi.responses import JSONResponse
from ..schemas import ErrorResponse, ResponseStatus
from .exceptions import ApiException
_VALIDATION_LOCATION_PARAMS: frozenset[str] = frozenset(
{"body", "query", "path", "header", "cookie"}
)
def init_exceptions_handlers(app: FastAPI) -> FastAPI:
"""Register exception handlers and custom OpenAPI schema on a FastAPI app.
@@ -110,7 +106,9 @@ def _format_validation_error(
for error in errors:
field_path = ".".join(
str(loc) for loc in error["loc"] if loc not in _VALIDATION_LOCATION_PARAMS
str(loc)
for loc in error["loc"]
if loc not in ("body", "query", "path", "header", "cookie")
)
formatted_errors.append(
{

View File

@@ -1,84 +1,24 @@
"""Fixture loading utilities for database seeding."""
from collections.abc import Callable, Sequence
from typing import Any
from typing import Any, TypeVar
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase
from ..db import get_transaction
from ..logger import get_logger
from ..types import ModelType
from .enum import LoadStrategy
from .registry import Context, FixtureRegistry
logger = get_logger()
async def _load_ordered(
session: AsyncSession,
registry: FixtureRegistry,
ordered_names: list[str],
strategy: LoadStrategy,
) -> dict[str, list[DeclarativeBase]]:
"""Load fixtures in order."""
results: dict[str, list[DeclarativeBase]] = {}
for name in ordered_names:
fixture = registry.get(name)
instances = list(fixture.func())
if not instances:
results[name] = []
continue
model_name = type(instances[0]).__name__
loaded: list[DeclarativeBase] = []
async with get_transaction(session):
for instance in instances:
if strategy == LoadStrategy.INSERT:
session.add(instance)
loaded.append(instance)
elif strategy == LoadStrategy.MERGE:
merged = await session.merge(instance)
loaded.append(merged)
else: # LoadStrategy.SKIP_EXISTING
pk = _get_primary_key(instance)
if pk is not None:
existing = await session.get(type(instance), pk)
if existing is None:
session.add(instance)
loaded.append(instance)
else:
session.add(instance)
loaded.append(instance)
results[name] = loaded
logger.info(f"Loaded fixture '{name}': {len(loaded)} {model_name}(s)")
return results
def _get_primary_key(instance: DeclarativeBase) -> Any | None:
"""Get the primary key value of a model instance."""
mapper = instance.__class__.__mapper__
pk_cols = mapper.primary_key
if len(pk_cols) == 1:
return getattr(instance, pk_cols[0].name, None)
pk_values = tuple(getattr(instance, col.name, None) for col in pk_cols)
if all(v is not None for v in pk_values):
return pk_values
return None
T = TypeVar("T", bound=DeclarativeBase)
def get_obj_by_attr(
fixtures: Callable[[], Sequence[ModelType]], attr_name: str, value: Any
) -> ModelType:
fixtures: Callable[[], Sequence[T]], attr_name: str, value: Any
) -> T:
"""Get a SQLAlchemy model instance by matching an attribute value.
Args:
@@ -117,6 +57,13 @@ async def load_fixtures(
Returns:
Dict mapping fixture names to loaded instances
Example:
```python
# Loads 'roles' first (dependency), then 'users'
result = await load_fixtures(session, fixtures, "users")
print(result["users"]) # [User(...), ...]
```
"""
ordered = registry.resolve_dependencies(*names)
return await _load_ordered(session, registry, ordered, strategy)
@@ -138,6 +85,76 @@ async def load_fixtures_by_context(
Returns:
Dict mapping fixture names to loaded instances
Example:
```python
# Load base + testing fixtures
await load_fixtures_by_context(
session, fixtures,
Context.BASE, Context.TESTING
)
```
"""
ordered = registry.resolve_context_dependencies(*contexts)
return await _load_ordered(session, registry, ordered, strategy)
async def _load_ordered(
session: AsyncSession,
registry: FixtureRegistry,
ordered_names: list[str],
strategy: LoadStrategy,
) -> dict[str, list[DeclarativeBase]]:
"""Load fixtures in order."""
results: dict[str, list[DeclarativeBase]] = {}
for name in ordered_names:
fixture = registry.get(name)
instances = list(fixture.func())
if not instances:
results[name] = []
continue
model_name = type(instances[0]).__name__
loaded: list[DeclarativeBase] = []
async with get_transaction(session):
for instance in instances:
if strategy == LoadStrategy.INSERT:
session.add(instance)
loaded.append(instance)
elif strategy == LoadStrategy.MERGE:
merged = await session.merge(instance)
loaded.append(merged)
elif strategy == LoadStrategy.SKIP_EXISTING:
pk = _get_primary_key(instance)
if pk is not None:
existing = await session.get(type(instance), pk)
if existing is None:
session.add(instance)
loaded.append(instance)
else:
session.add(instance)
loaded.append(instance)
results[name] = loaded
logger.info(f"Loaded fixture '{name}': {len(loaded)} {model_name}(s)")
return results
def _get_primary_key(instance: DeclarativeBase) -> Any | None:
"""Get the primary key value of a model instance."""
mapper = instance.__class__.__mapper__
pk_cols = mapper.primary_key
if len(pk_cols) == 1:
return getattr(instance, pk_cols[0].name, None)
pk_values = tuple(getattr(instance, col.name, None) for col in pk_cols)
if all(v is not None for v in pk_values):
return pk_values
return None

View File

@@ -53,23 +53,17 @@ def init_metrics(
logger.debug("Initialising metric provider '%s'", provider.name)
provider.func()
# Partition collectors and cache env check at startup — both are stable for the app lifetime.
async_collectors = [
c for c in registry.get_collectors() if asyncio.iscoroutinefunction(c.func)
]
sync_collectors = [
c for c in registry.get_collectors() if not asyncio.iscoroutinefunction(c.func)
]
multiprocess_mode = _is_multiprocess()
collectors = registry.get_collectors()
@app.get(path, include_in_schema=False)
async def metrics_endpoint() -> Response:
for collector in sync_collectors:
collector.func()
for collector in async_collectors:
await collector.func()
for collector in collectors:
if asyncio.iscoroutinefunction(collector.func):
await collector.func()
else:
collector.func()
if multiprocess_mode:
if _is_multiprocess():
prom_registry = CollectorRegistry()
multiprocess.MultiProcessCollector(prom_registry)
output = generate_latest(prom_registry)

View File

@@ -1,12 +1,10 @@
"""Base Pydantic schemas for API responses."""
from enum import Enum
from typing import Any, ClassVar, Generic
from typing import Any, ClassVar, Generic, TypeVar
from pydantic import BaseModel, ConfigDict
from .types import DataT
__all__ = [
"ApiError",
"CursorPagination",
@@ -18,6 +16,8 @@ __all__ = [
"ResponseStatus",
]
DataT = TypeVar("DataT")
class PydanticBase(BaseModel):
"""Base class for all Pydantic models with common configuration."""

View File

@@ -1,27 +0,0 @@
"""Shared type aliases for the fastapi-toolsets package."""
from collections.abc import AsyncGenerator, Callable, Mapping
from typing import Any, TypeVar
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase, QueryableAttribute
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.sql.elements import ColumnElement
# Generic TypeVars
DataT = TypeVar("DataT")
ModelType = TypeVar("ModelType", bound=DeclarativeBase)
SchemaType = TypeVar("SchemaType", bound=BaseModel)
# CRUD type aliases
JoinType = list[tuple[type[DeclarativeBase], Any]]
M2MFieldType = Mapping[str, QueryableAttribute[Any]]
OrderByClause = ColumnElement[Any] | QueryableAttribute[Any]
# Search / facet type aliases
SearchFieldType = InstrumentedAttribute[Any] | tuple[InstrumentedAttribute[Any], ...]
FacetFieldType = SearchFieldType
# Dependency type aliases
SessionDependency = Callable[[], AsyncGenerator[AsyncSession, None]]

View File

@@ -92,15 +92,6 @@ class IntRole(Base):
name: Mapped[str] = mapped_column(String(50), unique=True)
class Permission(Base):
"""Test model with composite primary key."""
__tablename__ = "permissions"
subject: Mapped[str] = mapped_column(String(50), primary_key=True)
action: Mapped[str] = mapped_column(String(50), primary_key=True)
class Event(Base):
"""Test model with DateTime and Date cursor columns."""

View File

@@ -1,11 +1,9 @@
"""Tests for CRUD search functionality."""
import inspect
import uuid
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.sql.elements import ColumnElement, UnaryExpression
from fastapi_toolsets.crud import (
CrudFactory,
@@ -13,7 +11,6 @@ from fastapi_toolsets.crud import (
SearchConfig,
get_searchable_fields,
)
from fastapi_toolsets.exceptions import InvalidOrderFieldError
from fastapi_toolsets.schemas import OffsetPagination
from .conftest import (
@@ -1049,144 +1046,3 @@ class TestFilterParamsSchema:
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 2
class TestOrderParamsSchema:
"""Tests for AsyncCrud.order_params()."""
def test_generates_order_by_and_order_params(self):
"""Returned dependency has order_by and order query params."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username, User.email])
dep = UserOrderCrud.order_params()
param_names = set(inspect.signature(dep).parameters)
assert param_names == {"order_by", "order"}
def test_dependency_name_includes_model_name(self):
"""Dependency function is named after the model."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep = UserOrderCrud.order_params()
assert getattr(dep, "__name__") == "UserOrderParams"
def test_raises_when_no_order_fields(self):
"""ValueError raised when no order_fields are configured or provided."""
with pytest.raises(ValueError, match="no order_fields"):
UserCrud.order_params()
def test_order_fields_override(self):
"""order_fields= parameter overrides the class-level default."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username, User.email])
dep = UserOrderCrud.order_params(order_fields=[User.email])
param_names = set(inspect.signature(dep).parameters)
assert "order_by" in param_names
# description should only mention email, not username
sig = inspect.signature(dep)
description = sig.parameters["order_by"].default.description
assert "email" in description
assert "username" not in description
def test_order_by_description_lists_valid_fields(self):
"""order_by query param description mentions each allowed field."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username, User.email])
dep = UserOrderCrud.order_params()
sig = inspect.signature(dep)
description = sig.parameters["order_by"].default.description
assert "username" in description
assert "email" in description
def test_default_order_reflected_in_order_default(self):
"""default_order is used as the default value for order."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep_asc = UserOrderCrud.order_params(default_order="asc")
dep_desc = UserOrderCrud.order_params(default_order="desc")
sig_asc = inspect.signature(dep_asc)
sig_desc = inspect.signature(dep_desc)
assert sig_asc.parameters["order"].default.default == "asc"
assert sig_desc.parameters["order"].default.default == "desc"
@pytest.mark.anyio
async def test_no_order_by_no_default_returns_none(self):
"""Returns None when order_by is absent and no default_field is set."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep = UserOrderCrud.order_params()
result = await dep(order_by=None, order="asc")
assert result is None
@pytest.mark.anyio
async def test_no_order_by_with_default_field_returns_asc_expression(self):
"""Returns default_field.asc() when order_by absent and order=asc."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep = UserOrderCrud.order_params(default_field=User.username)
result = await dep(order_by=None, order="asc")
assert isinstance(result, UnaryExpression)
assert "ASC" in str(result)
@pytest.mark.anyio
async def test_no_order_by_with_default_field_returns_desc_expression(self):
"""Returns default_field.desc() when order_by absent and order=desc."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep = UserOrderCrud.order_params(default_field=User.username)
result = await dep(order_by=None, order="desc")
assert isinstance(result, UnaryExpression)
assert "DESC" in str(result)
@pytest.mark.anyio
async def test_valid_order_by_asc(self):
"""Returns field.asc() for a valid order_by with order=asc."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep = UserOrderCrud.order_params()
result = await dep(order_by="username", order="asc")
assert isinstance(result, UnaryExpression)
assert "ASC" in str(result)
@pytest.mark.anyio
async def test_valid_order_by_desc(self):
"""Returns field.desc() for a valid order_by with order=desc."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep = UserOrderCrud.order_params()
result = await dep(order_by="username", order="desc")
assert isinstance(result, UnaryExpression)
assert "DESC" in str(result)
@pytest.mark.anyio
async def test_invalid_order_by_raises_invalid_order_field_error(self):
"""Raises InvalidOrderFieldError for an unknown order_by value."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
dep = UserOrderCrud.order_params()
with pytest.raises(InvalidOrderFieldError) as exc_info:
await dep(order_by="nonexistent", order="asc")
assert exc_info.value.field == "nonexistent"
assert "username" in exc_info.value.valid_fields
@pytest.mark.anyio
async def test_multiple_fields_all_resolve(self):
"""All configured fields resolve correctly via order_by."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username, User.email])
dep = UserOrderCrud.order_params()
result_username = await dep(order_by="username", order="asc")
result_email = await dep(order_by="email", order="desc")
assert isinstance(result_username, ColumnElement)
assert isinstance(result_email, ColumnElement)
@pytest.mark.anyio
async def test_order_params_integrates_with_get_multi(
self, db_session: AsyncSession
):
"""order_params output is accepted by get_multi(order_by=...)."""
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
await UserCrud.create(
db_session, UserCreate(username="charlie", email="c@test.com")
)
await UserCrud.create(
db_session, UserCreate(username="alice", email="a@test.com")
)
dep = UserOrderCrud.order_params()
order_by = await dep(order_by="username", order="asc")
results = await UserOrderCrud.get_multi(db_session, order_by=order_by)
assert results[0].username == "alice"
assert results[1].username == "charlie"

View File

@@ -15,14 +15,12 @@ from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_asyn
from docs_src.examples.pagination_search.db import get_db
from docs_src.examples.pagination_search.models import Article, Base, Category
from docs_src.examples.pagination_search.routes import router
from fastapi_toolsets.exceptions import init_exceptions_handlers
from .conftest import DATABASE_URL
def build_app(session: AsyncSession) -> FastAPI:
app = FastAPI()
init_exceptions_handlers(app)
async def override_get_db():
yield session
@@ -271,125 +269,3 @@ class TestCursorPagination:
body = resp.json()
assert len(body["data"]) == 1
assert body["data"][0]["title"] == "SQLAlchemy async"
class TestOffsetSorting:
"""Tests for order_by / order query parameters on the offset endpoint."""
@pytest.mark.anyio
async def test_default_order_uses_created_at_asc(
self, client: AsyncClient, ex_db_session
):
"""No order_by → default field (created_at) ASC."""
await seed(ex_db_session)
resp = await client.get("/articles/offset")
assert resp.status_code == 200
titles = [a["title"] for a in resp.json()["data"]]
assert titles == ["FastAPI tips", "SQLAlchemy async", "Draft notes"]
@pytest.mark.anyio
async def test_order_by_title_asc(self, client: AsyncClient, ex_db_session):
"""order_by=title&order=asc returns alphabetical order."""
await seed(ex_db_session)
resp = await client.get("/articles/offset?order_by=title&order=asc")
assert resp.status_code == 200
titles = [a["title"] for a in resp.json()["data"]]
assert titles == ["Draft notes", "FastAPI tips", "SQLAlchemy async"]
@pytest.mark.anyio
async def test_order_by_title_desc(self, client: AsyncClient, ex_db_session):
"""order_by=title&order=desc returns reverse alphabetical order."""
await seed(ex_db_session)
resp = await client.get("/articles/offset?order_by=title&order=desc")
assert resp.status_code == 200
titles = [a["title"] for a in resp.json()["data"]]
assert titles == ["SQLAlchemy async", "FastAPI tips", "Draft notes"]
@pytest.mark.anyio
async def test_order_by_created_at_desc(self, client: AsyncClient, ex_db_session):
"""order_by=created_at&order=desc returns newest-first."""
await seed(ex_db_session)
resp = await client.get("/articles/offset?order_by=created_at&order=desc")
assert resp.status_code == 200
titles = [a["title"] for a in resp.json()["data"]]
assert titles == ["Draft notes", "SQLAlchemy async", "FastAPI tips"]
@pytest.mark.anyio
async def test_invalid_order_by_returns_422(
self, client: AsyncClient, ex_db_session
):
"""Unknown order_by field returns 422 with SORT-422 error code."""
resp = await client.get("/articles/offset?order_by=nonexistent_field")
assert resp.status_code == 422
body = resp.json()
assert body["error_code"] == "SORT-422"
assert body["status"] == "FAIL"
class TestCursorSorting:
"""Tests for order_by / order query parameters on the cursor endpoint.
In cursor_paginate the cursor_column is always the primary sort; order_by
acts as a secondary tiebreaker. With the seeded articles (all having unique
created_at values) the overall ordering is always created_at ASC regardless
of the order_by value — only the valid/invalid field check and the response
shape are meaningful here.
"""
@pytest.mark.anyio
async def test_default_order_uses_created_at_asc(
self, client: AsyncClient, ex_db_session
):
"""No order_by → default field (created_at) ASC."""
await seed(ex_db_session)
resp = await client.get("/articles/cursor")
assert resp.status_code == 200
titles = [a["title"] for a in resp.json()["data"]]
assert titles == ["FastAPI tips", "SQLAlchemy async", "Draft notes"]
@pytest.mark.anyio
async def test_order_by_title_asc_accepted(
self, client: AsyncClient, ex_db_session
):
"""order_by=title is a valid field — request succeeds and returns all articles."""
await seed(ex_db_session)
resp = await client.get("/articles/cursor?order_by=title&order=asc")
assert resp.status_code == 200
assert len(resp.json()["data"]) == 3
@pytest.mark.anyio
async def test_order_by_title_desc_accepted(
self, client: AsyncClient, ex_db_session
):
"""order_by=title&order=desc is valid — request succeeds and returns all articles."""
await seed(ex_db_session)
resp = await client.get("/articles/cursor?order_by=title&order=desc")
assert resp.status_code == 200
assert len(resp.json()["data"]) == 3
@pytest.mark.anyio
async def test_invalid_order_by_returns_422(
self, client: AsyncClient, ex_db_session
):
"""Unknown order_by field returns 422 with SORT-422 error code."""
resp = await client.get("/articles/cursor?order_by=nonexistent_field")
assert resp.status_code == 422
body = resp.json()
assert body["error_code"] == "SORT-422"
assert body["status"] == "FAIL"

View File

@@ -8,7 +8,6 @@ from fastapi_toolsets.exceptions import (
ApiException,
ConflictError,
ForbiddenError,
InvalidOrderFieldError,
NotFoundError,
UnauthorizedError,
generate_error_responses,
@@ -335,43 +334,3 @@ class TestExceptionIntegration:
assert response.status_code == 200
assert response.json() == {"id": 1}
class TestInvalidOrderFieldError:
"""Tests for InvalidOrderFieldError exception."""
def test_api_error_attributes(self):
"""InvalidOrderFieldError has correct api_error metadata."""
assert InvalidOrderFieldError.api_error.code == 422
assert InvalidOrderFieldError.api_error.err_code == "SORT-422"
assert InvalidOrderFieldError.api_error.msg == "Invalid Order Field"
def test_stores_field_and_valid_fields(self):
"""InvalidOrderFieldError stores field and valid_fields on the instance."""
error = InvalidOrderFieldError("unknown", ["name", "created_at"])
assert error.field == "unknown"
assert error.valid_fields == ["name", "created_at"]
def test_message_contains_field_and_valid_fields(self):
"""Exception message mentions the bad field and valid options."""
error = InvalidOrderFieldError("bad_field", ["name", "email"])
assert "bad_field" in str(error)
assert "name" in str(error)
assert "email" in str(error)
def test_handled_as_422_by_exception_handler(self):
"""init_exceptions_handlers turns InvalidOrderFieldError into a 422 response."""
app = FastAPI()
init_exceptions_handlers(app)
@app.get("/items")
async def list_items():
raise InvalidOrderFieldError("bad", ["name"])
client = TestClient(app)
response = client.get("/items")
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "SORT-422"
assert data["status"] == "FAIL"

View File

@@ -14,9 +14,7 @@ from fastapi_toolsets.fixtures import (
load_fixtures_by_context,
)
from fastapi_toolsets.fixtures.utils import _get_primary_key
from .conftest import IntRole, Permission, Role, User
from .conftest import Role, User
class TestContext:
@@ -599,46 +597,6 @@ class TestLoadFixtures:
count = await RoleCrud.count(db_session)
assert count == 2
@pytest.mark.anyio
async def test_skip_existing_skips_if_record_exists(self, db_session: AsyncSession):
"""SKIP_EXISTING returns empty loaded list when the record already exists."""
registry = FixtureRegistry()
role_id = uuid.uuid4()
@registry.register
def roles():
return [Role(id=role_id, name="admin")]
# First load — inserts the record.
result1 = await load_fixtures(
db_session, registry, "roles", strategy=LoadStrategy.SKIP_EXISTING
)
assert len(result1["roles"]) == 1
# Remove from identity map so session.get() queries the DB in the second load.
db_session.expunge_all()
# Second load — record exists in DB, nothing should be added.
result2 = await load_fixtures(
db_session, registry, "roles", strategy=LoadStrategy.SKIP_EXISTING
)
assert result2["roles"] == []
@pytest.mark.anyio
async def test_skip_existing_null_pk_inserts(self, db_session: AsyncSession):
"""SKIP_EXISTING inserts when the instance has no PK set (auto-increment)."""
registry = FixtureRegistry()
@registry.register
def int_roles():
# No id provided — PK is None before INSERT (autoincrement).
return [IntRole(name="member")]
result = await load_fixtures(
db_session, registry, "int_roles", strategy=LoadStrategy.SKIP_EXISTING
)
assert len(result["int_roles"]) == 1
class TestLoadFixturesByContext:
"""Tests for load_fixtures_by_context function."""
@@ -797,19 +755,3 @@ class TestGetObjByAttr:
"""Raises StopIteration when value type doesn't match."""
with pytest.raises(StopIteration):
get_obj_by_attr(self.roles, "id", "not-a-uuid")
class TestGetPrimaryKey:
"""Unit tests for the _get_primary_key helper (composite PK paths)."""
def test_composite_pk_all_set(self):
"""Returns a tuple when all composite PK values are set."""
instance = Permission(subject="post", action="read")
pk = _get_primary_key(instance)
assert pk == ("post", "read")
def test_composite_pk_partial_none(self):
"""Returns None when any composite PK value is None."""
instance = Permission(subject="post") # action is None
pk = _get_primary_key(instance)
assert pk is None

2
uv.lock generated
View File

@@ -251,7 +251,7 @@ wheels = [
[[package]]
name = "fastapi-toolsets"
version = "1.3.0"
version = "1.2.1"
source = { editable = "." }
dependencies = [
{ name = "asyncpg" },