Compare commits

..

12 Commits

15 changed files with 1043 additions and 99 deletions

View File

@@ -6,6 +6,9 @@ on:
pull_request:
branches: [main]
permissions:
contents: read
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

View File

@@ -34,26 +34,17 @@ jobs:
MAJOR=$(echo "$VERSION" | cut -d. -f1)
DEPLOY_VERSION="v$(echo "$VERSION" | cut -d. -f1-2)"
# On new major: consolidate previous major's feature versions into vX
# On new major: keep only the latest feature version of the previous major
PREV_MAJOR=$((MAJOR - 1))
OLD_FEATURE_VERSIONS=$(uv run mike list 2>/dev/null | grep -oE "^v${PREV_MAJOR}\.[0-9]+" || true)
if [ -n "$OLD_FEATURE_VERSIONS" ]; then
LATEST_PREV_TAG=$(git tag -l "v${PREV_MAJOR}.*" | sort -V | tail -1)
if [ -n "$LATEST_PREV_TAG" ]; then
git checkout "$LATEST_PREV_TAG" -- docs/ docs_src/ src/ zensical.toml
if ! grep -q '\[project\.extra\.version\]' zensical.toml; then
printf '\n[project.extra.version]\nprovider = "mike"\ndefault = "stable"\nalias = true\n' >> zensical.toml
fi
uv run mike deploy "v${PREV_MAJOR}"
git checkout HEAD -- docs/ docs_src/ src/ zensical.toml
fi
# Delete old feature versions
LATEST_PREV=$(echo "$OLD_FEATURE_VERSIONS" | sort -t. -k2 -n | tail -1)
echo "$OLD_FEATURE_VERSIONS" | while read -r OLD_V; do
echo "Deleting $OLD_V"
uv run mike delete "$OLD_V"
if [ "$OLD_V" != "$LATEST_PREV" ]; then
echo "Deleting $OLD_V"
uv run mike delete "$OLD_V"
fi
done
fi

View File

@@ -324,6 +324,12 @@ result = await UserCrud.offset_paginate(
)
```
Or via the dependency to narrow which fields are exposed as query parameters:
```python
params = UserCrud.offset_paginate_params(search_fields=[Post.title])
```
This allows searching with both [`offset_paginate`](../reference/crud.md#fastapi_toolsets.crud.factory.AsyncCrud.offset_paginate) and [`cursor_paginate`](../reference/crud.md#fastapi_toolsets.crud.factory.AsyncCrud.cursor_paginate):
```python
@@ -344,13 +350,37 @@ async def get_users(
return await UserCrud.cursor_paginate(session=session, **params, schema=UserRead)
```
The dependency adds two query parameters to the endpoint:
| Parameter | Type |
| --------------- | ------------- |
| `search` | `str \| null` |
| `search_column` | `str \| null` |
```
GET /posts?search=hello → search all configured columns
GET /posts?search=hello&search_column=title → search only Post.title
```
The available search column keys are returned in the `search_columns` field of [`PaginatedResponse`](../reference/schemas.md#fastapi_toolsets.schemas.PaginatedResponse). Use them to populate a column picker in the UI, or to validate `search_column` values on the client side:
```json
{
"status": "SUCCESS",
"data": ["..."],
"pagination": { "..." },
"search_columns": ["content", "author__username", "title"]
}
```
!!! info "Key format uses `__` as a separator for relationship chains."
A direct column `Post.title` produces `"title"`. A relationship tuple `(Post.author, User.username)` produces `"author__username"`. An unknown `search_column` value raises [`InvalidSearchColumnError`](../reference/exceptions.md#fastapi_toolsets.exceptions.exceptions.InvalidSearchColumnError) (HTTP 422).
### Faceted search
!!! info "Added in `v1.2`"
Declare `facet_fields` on the CRUD class to return distinct column values alongside paginated results. This is useful for populating filter dropdowns or building faceted search UIs.
Facet fields use the same syntax as `searchable_fields` — direct columns or relationship tuples:
Declare `facet_fields` on the CRUD class to return distinct column values alongside paginated results. This is useful for populating filter dropdowns or building faceted search UIs. Relationship traversal is supported via tuples, using the same syntax as `searchable_fields`:
```python
UserCrud = CrudFactory(
@@ -372,7 +402,47 @@ result = await UserCrud.offset_paginate(
)
```
The distinct values are returned in the `filter_attributes` field of [`PaginatedResponse`](../reference/schemas.md#fastapi_toolsets.schemas.PaginatedResponse):
Or via the dependency to narrow which fields are exposed as query parameters:
```python
params = UserCrud.offset_paginate_params(facet_fields=[User.country])
```
Facet filtering is built into the consolidated params dependencies. When `filter=True` (the default), each facet field is exposed as a query parameter and values are collected into `filter_by` automatically:
```python
from typing import Annotated
from fastapi import Depends
@router.get("", response_model_exclude_none=True)
async def list_users(
session: SessionDep,
params: Annotated[dict, Depends(UserCrud.offset_paginate_params())],
) -> OffsetPaginatedResponse[UserRead]:
return await UserCrud.offset_paginate(session=session, **params, schema=UserRead)
```
```python
@router.get("", response_model_exclude_none=True)
async def list_users(
session: SessionDep,
params: Annotated[dict, Depends(UserCrud.cursor_paginate_params())],
) -> CursorPaginatedResponse[UserRead]:
return await UserCrud.cursor_paginate(session=session, **params, schema=UserRead)
```
Both single-value and multi-value query parameters work:
```
GET /users?status=active → filter_by={"status": ["active"]}
GET /users?status=active&country=FR → filter_by={"status": ["active"], "country": ["FR"]}
GET /users?role__name=admin&role__name=editor → filter_by={"role__name": ["admin", "editor"]} (IN clause)
```
`filter_by` and `filters` can be combined — both are applied with AND logic.
The distinct values for each facet field are returned in the `filter_attributes` field of [`PaginatedResponse`](../reference/schemas.md#fastapi_toolsets.schemas.PaginatedResponse). Use them to populate filter dropdowns in the UI, or to validate `filter_by` keys on the client side:
```json
{
@@ -387,50 +457,14 @@ The distinct values are returned in the `filter_attributes` field of [`Paginated
}
```
Use `filter_by` to pass the client's chosen filter values directly — no need to build SQLAlchemy conditions by hand. Any unknown key raises [`InvalidFacetFilterError`](../reference/exceptions.md#fastapi_toolsets.exceptions.exceptions.InvalidFacetFilterError).
!!! info "The keys in `filter_by` are the same keys the client received in `filter_attributes`."
Keys use `__` as a separator for the full relationship chain. A direct column `User.status` produces `"status"`. A relationship tuple `(User.role, Role.name)` produces `"role__name"`. A deeper chain `(User.role, Role.permission, Permission.name)` produces `"role__permission__name"`.
`filter_by` and `filters` can be combined — both are applied with AND logic.
Facet filtering is built into the consolidated params dependencies. When `filter=True` (the default), facet fields are exposed as query parameters and collected into `filter_by` automatically:
```python
from typing import Annotated
from fastapi import Depends
UserCrud = CrudFactory(
model=User,
facet_fields=[User.status, User.country, (User.role, Role.name)],
)
@router.get("", response_model_exclude_none=True)
async def list_users(
session: SessionDep,
params: Annotated[dict, Depends(UserCrud.offset_paginate_params())],
) -> OffsetPaginatedResponse[UserRead]:
return await UserCrud.offset_paginate(
session=session,
**params,
schema=UserRead,
)
```
Both single-value and multi-value query parameters work:
```
GET /users?status=active → filter_by={"status": ["active"]}
GET /users?status=active&country=FR → filter_by={"status": ["active"], "country": ["FR"]}
GET /users?role__name=admin&role__name=editor → filter_by={"role__name": ["admin", "editor"]} (IN clause)
```
!!! info "Key format uses `__` as a separator for relationship chains."
A direct column `User.status` produces `"status"`. A relationship tuple `(User.role, Role.name)` produces `"role__name"`. A deeper chain `(User.role, Role.permission, Permission.name)` produces `"role__permission__name"`. An unknown `filter_by` key raises [`InvalidFacetFilterError`](../reference/exceptions.md#fastapi_toolsets.exceptions.exceptions.InvalidFacetFilterError) (HTTP 422).
## Sorting
!!! info "Added in `v1.3`"
Declare `order_fields` on the CRUD class to expose client-driven column ordering via `order_by` and `order` query parameters.
Declare `order_fields` on the CRUD class. Relationship traversal is supported via tuples, using the same syntax as `searchable_fields` and `facet_fields`:
```python
UserCrud = CrudFactory(
@@ -438,11 +472,27 @@ UserCrud = CrudFactory(
order_fields=[
User.name,
User.created_at,
(User.role, Role.name), # sort by a related model column
],
)
```
Ordering is built into the consolidated params dependencies. When `order=True` (the default), `order_by` and `order` query parameters are exposed and resolved into an `OrderByClause` automatically:
You can override `order_fields` per call:
```python
result = await UserCrud.offset_paginate(
session=session,
order_fields=[User.name],
)
```
Or via the dependency to narrow which fields are exposed as query parameters:
```python
params = UserCrud.offset_paginate_params(order_fields=[User.name])
```
Sorting is built into the consolidated params dependencies. When `order=True` (the default), `order_by` and `order` query parameters are exposed and resolved into an `OrderByClause` automatically:
```python
from typing import Annotated
@@ -452,33 +502,50 @@ from fastapi import Depends
@router.get("")
async def list_users(
session: SessionDep,
params: Annotated[dict, Depends(UserCrud.offset_paginate_params(
default_order_field=User.created_at,
))],
params: Annotated[dict, Depends(UserCrud.offset_paginate_params())],
) -> OffsetPaginatedResponse[UserRead]:
return await UserCrud.offset_paginate(session=session, **params, schema=UserRead)
```
```python
@router.get("")
async def list_users(
session: SessionDep,
params: Annotated[dict, Depends(UserCrud.cursor_paginate_params())],
) -> CursorPaginatedResponse[UserRead]:
return await UserCrud.cursor_paginate(session=session, **params, schema=UserRead)
```
The dependency adds two query parameters to the endpoint:
| Parameter | Type |
| ---------- | --------------- |
| `order_by` | `str | null` |
| `order_by` | `str \| null` |
| `order` | `asc` or `desc` |
```
GET /users?order_by=name&order=asc → ORDER BY users.name ASC
GET /users?order_by=name&order=desc → ORDER BY users.name DESC
GET /users?order_by=name&order=asc → ORDER BY users.name ASC
GET /users?order_by=role__name&order=desc → LEFT JOIN roles ON ... ORDER BY roles.name DESC
```
An unknown `order_by` value raises [`InvalidOrderFieldError`](../reference/exceptions.md#fastapi_toolsets.exceptions.exceptions.InvalidOrderFieldError) (HTTP 422).
!!! info "Relationship tuples are joined automatically."
When a relation field is selected, the related table is LEFT OUTER JOINed automatically. An unknown `order_by` value raises [`InvalidOrderFieldError`](../reference/exceptions.md#fastapi_toolsets.exceptions.exceptions.InvalidOrderFieldError) (HTTP 422).
You can also pass `order_fields` directly to override the class-level defaults:
```python
params = UserCrud.offset_paginate_params(order_fields=[User.name])
The available sort keys are returned in the `order_columns` field of [`PaginatedResponse`](../reference/schemas.md#fastapi_toolsets.schemas.PaginatedResponse). Use them to populate a sort picker in the UI, or to validate `order_by` values on the client side:
```json
{
"status": "SUCCESS",
"data": ["..."],
"pagination": { "..." },
"order_columns": ["created_at", "name", "role__name"]
}
```
!!! info "Key format uses `__` as a separator for relationship chains."
A direct column `User.name` produces `"name"`. A relationship tuple `(User.role, Role.name)` produces `"role__name"`.
## Relationship loading
!!! info "Added in `v1.1`"

View File

@@ -1,6 +1,6 @@
[project]
name = "fastapi-toolsets"
version = "3.0.0"
version = "3.0.3"
description = "Production-ready utilities for FastAPI applications"
readme = "README.md"
license = "MIT"

View File

@@ -21,4 +21,4 @@ Example usage:
return Response(data={"user": user.username}, message="Success")
"""
__version__ = "3.0.0"
__version__ = "3.0.3"

View File

@@ -12,6 +12,7 @@ from ..types import (
JoinType,
M2MFieldType,
OrderByClause,
OrderFieldType,
SearchFieldType,
)
from .factory import AsyncCrud, CrudFactory
@@ -28,6 +29,7 @@ __all__ = [
"M2MFieldType",
"NoSearchableFieldsError",
"OrderByClause",
"OrderFieldType",
"PaginationType",
"SearchConfig",
"SearchFieldType",

View File

@@ -38,6 +38,7 @@ from ..types import (
M2MFieldType,
ModelType,
OrderByClause,
OrderFieldType,
SchemaType,
SearchFieldType,
)
@@ -134,7 +135,7 @@ class AsyncCrud(Generic[ModelType]):
model: ClassVar[type[DeclarativeBase]]
searchable_fields: ClassVar[Sequence[SearchFieldType] | None] = None
facet_fields: ClassVar[Sequence[FacetFieldType] | None] = None
order_fields: ClassVar[Sequence[QueryableAttribute[Any]] | None] = None
order_fields: ClassVar[Sequence[OrderFieldType] | None] = None
m2m_fields: ClassVar[M2MFieldType | None] = None
default_load_options: ClassVar[Sequence[ExecutableOption] | None] = None
cursor_column: ClassVar[Any | None] = None
@@ -169,6 +170,18 @@ class AsyncCrud(Generic[ModelType]):
return load_options
return cls.default_load_options
@classmethod
async def _reload_with_options(
cls: type[Self], session: AsyncSession, instance: ModelType
) -> ModelType:
"""Re-query instance by PK with default_load_options applied."""
mapper = cls.model.__mapper__
pk_filters = [
getattr(cls.model, col.key) == getattr(instance, col.key)
for col in mapper.primary_key
]
return await cls.get(session, filters=pk_filters)
@classmethod
async def _resolve_m2m(
cls: type[Self],
@@ -278,6 +291,17 @@ class AsyncCrud(Generic[ModelType]):
return None
return search_field_keys(fields)
@classmethod
def _resolve_order_columns(
cls: type[Self],
order_fields: Sequence[OrderFieldType] | None,
) -> list[str] | None:
"""Return sort column keys, or None if no order fields configured."""
fields = order_fields if order_fields is not None else cls.order_fields
if not fields:
return None
return sorted(facet_keys(fields))
@classmethod
def _build_paginate_params(
cls: type[Self],
@@ -290,7 +314,7 @@ class AsyncCrud(Generic[ModelType]):
order: bool,
search_fields: Sequence[SearchFieldType] | None,
facet_fields: Sequence[FacetFieldType] | None,
order_fields: Sequence[QueryableAttribute[Any]] | None,
order_fields: Sequence[OrderFieldType] | None,
default_order_field: QueryableAttribute[Any] | None,
default_order: Literal["asc", "desc"],
) -> Callable[..., Awaitable[dict[str, Any]]]:
@@ -349,14 +373,15 @@ class AsyncCrud(Generic[ModelType]):
)
reserved_names.update(filter_keys)
order_field_map: dict[str, QueryableAttribute[Any]] | None = None
order_field_map: dict[str, OrderFieldType] | None = None
order_valid_keys: list[str] | None = None
if order:
resolved_order = (
order_fields if order_fields is not None else cls.order_fields
)
if resolved_order:
order_field_map = {f.key: f for f in resolved_order}
keys = facet_keys(resolved_order)
order_field_map = dict(zip(keys, resolved_order))
order_valid_keys = sorted(order_field_map.keys())
all_params.extend(
[
@@ -408,9 +433,16 @@ class AsyncCrud(Generic[ModelType]):
else:
field = order_field_map[order_by_val]
if field is not None:
result["order_by"] = (
field.asc() if order_dir == "asc" else field.desc()
)
if isinstance(field, tuple):
col = field[-1]
result["order_by"] = (
col.asc() if order_dir == "asc" else col.desc()
)
result["order_joins"] = list(field[:-1])
else:
result["order_by"] = (
field.asc() if order_dir == "asc" else field.desc()
)
else:
result["order_by"] = None
@@ -434,7 +466,7 @@ class AsyncCrud(Generic[ModelType]):
order: bool = True,
search_fields: Sequence[SearchFieldType] | None = None,
facet_fields: Sequence[FacetFieldType] | None = None,
order_fields: Sequence[QueryableAttribute[Any]] | None = None,
order_fields: Sequence[OrderFieldType] | None = None,
default_order_field: QueryableAttribute[Any] | None = None,
default_order: Literal["asc", "desc"] = "asc",
) -> Callable[..., Awaitable[dict[str, Any]]]:
@@ -496,7 +528,7 @@ class AsyncCrud(Generic[ModelType]):
order: bool = True,
search_fields: Sequence[SearchFieldType] | None = None,
facet_fields: Sequence[FacetFieldType] | None = None,
order_fields: Sequence[QueryableAttribute[Any]] | None = None,
order_fields: Sequence[OrderFieldType] | None = None,
default_order_field: QueryableAttribute[Any] | None = None,
default_order: Literal["asc", "desc"] = "asc",
) -> Callable[..., Awaitable[dict[str, Any]]]:
@@ -561,7 +593,7 @@ class AsyncCrud(Generic[ModelType]):
order: bool = True,
search_fields: Sequence[SearchFieldType] | None = None,
facet_fields: Sequence[FacetFieldType] | None = None,
order_fields: Sequence[QueryableAttribute[Any]] | None = None,
order_fields: Sequence[OrderFieldType] | None = None,
default_order_field: QueryableAttribute[Any] | None = None,
default_order: Literal["asc", "desc"] = "asc",
) -> Callable[..., Awaitable[dict[str, Any]]]:
@@ -685,6 +717,8 @@ class AsyncCrud(Generic[ModelType]):
session.add(db_model)
await session.refresh(db_model)
if cls.default_load_options:
db_model = await cls._reload_with_options(session, db_model)
result = cast(ModelType, db_model)
if schema:
return Response(data=schema.model_validate(result))
@@ -1040,6 +1074,8 @@ class AsyncCrud(Generic[ModelType]):
for rel_attr, related_instances in m2m_resolved.items():
setattr(db_model, rel_attr, related_instances)
await session.refresh(db_model)
if cls.default_load_options:
db_model = await cls._reload_with_options(session, db_model)
if schema:
return Response(data=schema.model_validate(db_model))
return db_model
@@ -1202,12 +1238,14 @@ class AsyncCrud(Generic[ModelType]):
outer_join: bool = False,
load_options: Sequence[ExecutableOption] | None = None,
order_by: OrderByClause | None = None,
order_joins: list[Any] | None = None,
page: int = 1,
items_per_page: int = 20,
include_total: bool = True,
search: str | SearchConfig | None = None,
search_fields: Sequence[SearchFieldType] | None = None,
search_column: str | None = None,
order_fields: Sequence[OrderFieldType] | None = None,
facet_fields: Sequence[FacetFieldType] | None = None,
filter_by: dict[str, Any] | BaseModel | None = None,
schema: type[BaseModel],
@@ -1228,6 +1266,7 @@ class AsyncCrud(Generic[ModelType]):
search: Search query string or SearchConfig object
search_fields: Fields to search in (overrides class default)
search_column: Restrict search to a single column key.
order_fields: Fields allowed for sorting (overrides class default).
facet_fields: Columns to compute distinct values for (overrides class default)
filter_by: Dict of {column_key: value} to filter by declared facet fields.
Keys must match the column.key of a facet field. Scalar → equality,
@@ -1264,6 +1303,10 @@ class AsyncCrud(Generic[ModelType]):
# Apply search joins (always outer joins for search)
q = _apply_search_joins(q, search_joins)
# Apply order joins (relation joins required for order_by field)
if order_joins:
q = _apply_search_joins(q, order_joins)
if filters:
q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options):
@@ -1308,6 +1351,7 @@ class AsyncCrud(Generic[ModelType]):
session, facet_fields, filters, search_joins
)
search_columns = cls._resolve_search_columns(search_fields)
order_columns = cls._resolve_order_columns(order_fields)
return OffsetPaginatedResponse(
data=items,
@@ -1319,6 +1363,7 @@ class AsyncCrud(Generic[ModelType]):
),
filter_attributes=filter_attributes,
search_columns=search_columns,
order_columns=order_columns,
)
@classmethod
@@ -1332,10 +1377,12 @@ class AsyncCrud(Generic[ModelType]):
outer_join: bool = False,
load_options: Sequence[ExecutableOption] | None = None,
order_by: OrderByClause | None = None,
order_joins: list[Any] | None = None,
items_per_page: int = 20,
search: str | SearchConfig | None = None,
search_fields: Sequence[SearchFieldType] | None = None,
search_column: str | None = None,
order_fields: Sequence[OrderFieldType] | None = None,
facet_fields: Sequence[FacetFieldType] | None = None,
filter_by: dict[str, Any] | BaseModel | None = None,
schema: type[BaseModel],
@@ -1357,6 +1404,7 @@ class AsyncCrud(Generic[ModelType]):
search: Search query string or SearchConfig object.
search_fields: Fields to search in (overrides class default).
search_column: Restrict search to a single column key.
order_fields: Fields allowed for sorting (overrides class default).
facet_fields: Columns to compute distinct values for (overrides class default).
filter_by: Dict of {column_key: value} to filter by declared facet fields.
Keys must match the column.key of a facet field. Scalar → equality,
@@ -1410,6 +1458,10 @@ class AsyncCrud(Generic[ModelType]):
# Apply search joins (always outer joins)
q = _apply_search_joins(q, search_joins)
# Apply order joins (relation joins required for order_by field)
if order_joins:
q = _apply_search_joins(q, order_joins)
if filters:
q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options):
@@ -1468,6 +1520,7 @@ class AsyncCrud(Generic[ModelType]):
session, facet_fields, filters, search_joins
)
search_columns = cls._resolve_search_columns(search_fields)
order_columns = cls._resolve_order_columns(order_fields)
return CursorPaginatedResponse(
data=items,
@@ -1479,6 +1532,7 @@ class AsyncCrud(Generic[ModelType]):
),
filter_attributes=filter_attributes,
search_columns=search_columns,
order_columns=order_columns,
)
@overload
@@ -1493,6 +1547,7 @@ class AsyncCrud(Generic[ModelType]):
outer_join: bool = ...,
load_options: Sequence[ExecutableOption] | None = ...,
order_by: OrderByClause | None = ...,
order_joins: list[Any] | None = ...,
page: int = ...,
cursor: str | None = ...,
items_per_page: int = ...,
@@ -1500,6 +1555,7 @@ class AsyncCrud(Generic[ModelType]):
search: str | SearchConfig | None = ...,
search_fields: Sequence[SearchFieldType] | None = ...,
search_column: str | None = ...,
order_fields: Sequence[OrderFieldType] | None = ...,
facet_fields: Sequence[FacetFieldType] | None = ...,
filter_by: dict[str, Any] | BaseModel | None = ...,
schema: type[BaseModel],
@@ -1517,6 +1573,7 @@ class AsyncCrud(Generic[ModelType]):
outer_join: bool = ...,
load_options: Sequence[ExecutableOption] | None = ...,
order_by: OrderByClause | None = ...,
order_joins: list[Any] | None = ...,
page: int = ...,
cursor: str | None = ...,
items_per_page: int = ...,
@@ -1524,6 +1581,7 @@ class AsyncCrud(Generic[ModelType]):
search: str | SearchConfig | None = ...,
search_fields: Sequence[SearchFieldType] | None = ...,
search_column: str | None = ...,
order_fields: Sequence[OrderFieldType] | None = ...,
facet_fields: Sequence[FacetFieldType] | None = ...,
filter_by: dict[str, Any] | BaseModel | None = ...,
schema: type[BaseModel],
@@ -1540,6 +1598,7 @@ class AsyncCrud(Generic[ModelType]):
outer_join: bool = False,
load_options: Sequence[ExecutableOption] | None = None,
order_by: OrderByClause | None = None,
order_joins: list[Any] | None = None,
page: int = 1,
cursor: str | None = None,
items_per_page: int = 20,
@@ -1547,6 +1606,7 @@ class AsyncCrud(Generic[ModelType]):
search: str | SearchConfig | None = None,
search_fields: Sequence[SearchFieldType] | None = None,
search_column: str | None = None,
order_fields: Sequence[OrderFieldType] | None = None,
facet_fields: Sequence[FacetFieldType] | None = None,
filter_by: dict[str, Any] | BaseModel | None = None,
schema: type[BaseModel],
@@ -1575,6 +1635,7 @@ class AsyncCrud(Generic[ModelType]):
search: Search query string or :class:`.SearchConfig` object.
search_fields: Fields to search in (overrides class default).
search_column: Restrict search to a single column key.
order_fields: Fields allowed for sorting (overrides class default).
facet_fields: Columns to compute distinct values for (overrides
class default).
filter_by: Dict of ``{column_key: value}`` to filter by declared
@@ -1600,10 +1661,12 @@ class AsyncCrud(Generic[ModelType]):
outer_join=outer_join,
load_options=load_options,
order_by=order_by,
order_joins=order_joins,
items_per_page=items_per_page,
search=search,
search_fields=search_fields,
search_column=search_column,
order_fields=order_fields,
facet_fields=facet_fields,
filter_by=filter_by,
schema=schema,
@@ -1618,12 +1681,14 @@ class AsyncCrud(Generic[ModelType]):
outer_join=outer_join,
load_options=load_options,
order_by=order_by,
order_joins=order_joins,
page=page,
items_per_page=items_per_page,
include_total=include_total,
search=search,
search_fields=search_fields,
search_column=search_column,
order_fields=order_fields,
facet_fields=facet_fields,
filter_by=filter_by,
schema=schema,
@@ -1638,7 +1703,7 @@ def CrudFactory(
base_class: type[AsyncCrud[Any]] = AsyncCrud,
searchable_fields: Sequence[SearchFieldType] | None = None,
facet_fields: Sequence[FacetFieldType] | None = None,
order_fields: Sequence[QueryableAttribute[Any]] | None = None,
order_fields: Sequence[OrderFieldType] | None = None,
m2m_fields: M2MFieldType | None = None,
default_load_options: Sequence[ExecutableOption] | None = None,
cursor_column: Any | None = None,

View File

@@ -265,7 +265,15 @@ async def build_facets(
else:
q = q.order_by(column)
result = await session.execute(q)
values = [row[0] for row in result.all() if row[0] is not None]
col_type = column.property.columns[0].type
enum_class = getattr(col_type, "enum_class", None)
values = [
row[0].name
if (enum_class is not None and isinstance(row[0], enum_class))
else row[0]
for row in result.all()
if row[0] is not None
]
return key, values
pairs = await asyncio.gather(
@@ -278,6 +286,18 @@ _EQUALITY_TYPES = (String, Integer, Numeric, Date, DateTime, Time, Enum, Uuid)
"""Column types that support equality / IN filtering in build_filter_by."""
def _coerce_bool(value: Any) -> bool:
"""Coerce a string value to a Python bool for Boolean column filtering."""
if isinstance(value, bool):
return value
if isinstance(value, str):
if value.lower() == "true":
return True
if value.lower() == "false":
return False
raise ValueError(f"Cannot coerce {value!r} to bool")
def build_filter_by(
filter_by: dict[str, Any],
facet_fields: Sequence[FacetFieldType],
@@ -324,16 +344,35 @@ def build_filter_by(
added_join_keys.add(rel_key)
col_type = column.property.columns[0].type
if isinstance(col_type, ARRAY):
if isinstance(col_type, Boolean):
coerce = _coerce_bool
if isinstance(value, list):
filters.append(column.in_([coerce(v) for v in value]))
else:
filters.append(column == coerce(value))
elif isinstance(col_type, ARRAY):
if isinstance(value, list):
filters.append(column.overlap(value))
else:
filters.append(column.any(value))
elif isinstance(col_type, Boolean):
if isinstance(value, list):
filters.append(column.in_(value))
else:
filters.append(column.is_(value))
elif isinstance(col_type, Enum):
enum_class = col_type.enum_class
if enum_class is not None:
def _coerce_enum(v: Any) -> Any:
if isinstance(v, enum_class):
return v
return enum_class[v] # lookup by name: "PENDING", "RED"
if isinstance(value, list):
filters.append(column.in_([_coerce_enum(v) for v in value]))
else:
filters.append(column == _coerce_enum(value))
else: # pragma: no cover
if isinstance(value, list):
filters.append(column.in_(value))
else:
filters.append(column == value)
elif isinstance(col_type, _EQUALITY_TYPES):
if isinstance(value, list):
filters.append(column.in_(value))

View File

@@ -163,6 +163,7 @@ class PaginatedResponse(BaseResponse, Generic[DataT]):
pagination_type: PaginationType | None = None
filter_attributes: dict[str, list[Any]] | None = None
search_columns: list[str] | None = None
order_columns: list[str] | None = None
_discriminated_union_cache: ClassVar[dict[Any, Any]] = {}

View File

@@ -15,13 +15,14 @@ ModelType = TypeVar("ModelType", bound=DeclarativeBase)
SchemaType = TypeVar("SchemaType", bound=BaseModel)
# CRUD type aliases
JoinType = list[tuple[type[DeclarativeBase], Any]]
JoinType = list[tuple[type[DeclarativeBase] | Any, Any]]
M2MFieldType = Mapping[str, QueryableAttribute[Any]]
OrderByClause = ColumnElement[Any] | QueryableAttribute[Any]
# Search / facet type aliases
# Search / facet / order type aliases
SearchFieldType = InstrumentedAttribute[Any] | tuple[InstrumentedAttribute[Any], ...]
FacetFieldType = SearchFieldType
OrderFieldType = SearchFieldType
# Dependency type aliases
SessionDependency = Callable[[], AsyncGenerator[AsyncSession, None]] | Any

View File

@@ -2,6 +2,7 @@
import os
import uuid
from enum import Enum
import pytest
from pydantic import BaseModel
@@ -12,6 +13,7 @@ from sqlalchemy import (
Column,
Date,
DateTime,
Enum as SAEnum,
ForeignKey,
Integer,
JSON,
@@ -139,6 +141,46 @@ class Post(Base):
tags: Mapped[list[Tag]] = relationship(secondary=post_tags)
class OrderStatus(int, Enum):
"""Integer-backed enum for order status."""
PENDING = 1
PROCESSING = 2
SHIPPED = 3
CANCELLED = 4
class Color(str, Enum):
"""String-backed enum for color."""
RED = "red"
GREEN = "green"
BLUE = "blue"
class Order(Base):
"""Test model with an IntEnum column (Enum(int, Enum)) and a raw Integer column."""
__tablename__ = "orders"
id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(String(100))
status: Mapped[OrderStatus] = mapped_column(SAEnum(OrderStatus))
priority: Mapped[int] = mapped_column(Integer)
color: Mapped[Color] = mapped_column(SAEnum(Color))
class Transfer(Base):
"""Test model with two FKs to the same table (users)."""
__tablename__ = "transfers"
id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)
amount: Mapped[str] = mapped_column(String(50))
sender_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"))
receiver_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"))
class Article(Base):
"""Test article model with ARRAY and JSON columns."""
@@ -300,6 +342,44 @@ class ArticleRead(PydanticBase):
labels: list[str]
class OrderCreate(BaseModel):
"""Schema for creating an order."""
id: uuid.UUID | None = None
name: str
status: OrderStatus
priority: int = 0
color: Color = Color.RED
class OrderRead(PydanticBase):
"""Schema for reading an order."""
id: uuid.UUID
name: str
status: OrderStatus
priority: int
color: Color
class TransferCreate(BaseModel):
"""Schema for creating a transfer."""
id: uuid.UUID | None = None
amount: str
sender_id: uuid.UUID
receiver_id: uuid.UUID
class TransferRead(PydanticBase):
"""Schema for reading a transfer."""
id: uuid.UUID
amount: str
OrderCrud = CrudFactory(Order)
TransferCrud = CrudFactory(Transfer)
ArticleCrud = CrudFactory(Article)
RoleCrud = CrudFactory(Role)
RoleCursorCrud = CrudFactory(Role, cursor_column=Role.id)

View File

@@ -38,6 +38,10 @@ from .conftest import (
Tag,
TagCreate,
TagCrud,
Transfer,
TransferCreate,
TransferCrud,
TransferRead,
User,
UserCreate,
UserCrud,
@@ -243,6 +247,60 @@ class TestResolveSearchColumns:
assert "username" not in result
class TestResolveOrderColumns:
"""Tests for _resolve_order_columns logic."""
def test_returns_none_when_no_order_fields(self):
"""Returns None when cls.order_fields is None and no order_fields passed."""
class AbstractCrud(AsyncCrud[User]):
pass
assert AbstractCrud._resolve_order_columns(None) is None
def test_returns_none_when_empty_order_fields_passed(self):
"""Returns None when an empty list is passed explicitly."""
crud = CrudFactory(User)
assert crud._resolve_order_columns([]) is None
def test_returns_keys_from_class_order_fields(self):
"""Returns sorted column keys from cls.order_fields when no override passed."""
crud = CrudFactory(User, order_fields=[User.username])
result = crud._resolve_order_columns(None)
assert result is not None
assert "username" in result
def test_order_fields_override_takes_priority(self):
"""Explicit order_fields override cls.order_fields."""
crud = CrudFactory(User, order_fields=[User.username])
result = crud._resolve_order_columns([User.email])
assert result is not None
assert "email" in result
assert "username" not in result
def test_returns_sorted_keys(self):
"""Keys are returned in sorted order."""
crud = CrudFactory(User, order_fields=[User.email, User.username])
result = crud._resolve_order_columns(None)
assert result is not None
assert result == sorted(result)
def test_relation_tuple_produces_dunder_key(self):
"""A (rel, column) tuple produces a 'rel__column' key."""
crud = CrudFactory(User, order_fields=[(User.role, Role.name)])
result = crud._resolve_order_columns(None)
assert result == ["role__name"]
def test_mixed_flat_and_relation_fields(self):
"""Flat and relation fields can be mixed; keys are sorted."""
crud = CrudFactory(User, order_fields=[User.username, (User.role, Role.name)])
result = crud._resolve_order_columns(None)
assert result is not None
assert "username" in result
assert "role__name" in result
assert result == sorted(result)
class TestDefaultLoadOptionsIntegration:
"""Integration tests for default_load_options with real DB queries."""
@@ -322,6 +380,43 @@ class TestDefaultLoadOptionsIntegration:
assert result.data[0].role is not None
assert result.data[0].role.name == "admin"
@pytest.mark.anyio
async def test_default_load_options_applied_to_create(
self, db_session: AsyncSession
):
"""default_load_options loads relationships after create()."""
UserWithDefaultLoad = CrudFactory(
User, default_load_options=[selectinload(User.role)]
)
role = await RoleCrud.create(db_session, RoleCreate(name="admin"))
user = await UserWithDefaultLoad.create(
db_session,
UserCreate(username="alice", email="alice@test.com", role_id=role.id),
)
assert user.role is not None
assert user.role.name == "admin"
@pytest.mark.anyio
async def test_default_load_options_applied_to_update(
self, db_session: AsyncSession
):
"""default_load_options loads relationships after update()."""
UserWithDefaultLoad = CrudFactory(
User, default_load_options=[selectinload(User.role)]
)
role = await RoleCrud.create(db_session, RoleCreate(name="admin"))
user = await UserCrud.create(
db_session,
UserCreate(username="alice", email="alice@test.com"),
)
updated = await UserWithDefaultLoad.update(
db_session,
UserUpdate(role_id=role.id),
filters=[User.id == user.id],
)
assert updated.role is not None
assert updated.role.name == "admin"
@pytest.mark.anyio
async def test_load_options_overrides_default_load_options(
self, db_session: AsyncSession
@@ -1282,6 +1377,128 @@ class TestCrudJoins:
assert users[0].username == "multi_join"
class TestCrudAliasedJoins:
"""Tests for CRUD operations with aliased joins (same table joined twice)."""
@pytest.mark.anyio
async def test_get_multi_with_aliased_joins(self, db_session: AsyncSession):
"""Aliased joins allow joining the same table twice."""
from sqlalchemy.orm import aliased
alice = await UserCrud.create(
db_session, UserCreate(username="alice", email="alice@test.com")
)
bob = await UserCrud.create(
db_session, UserCreate(username="bob", email="bob@test.com")
)
await TransferCrud.create(
db_session,
TransferCreate(amount="100", sender_id=alice.id, receiver_id=bob.id),
)
Sender = aliased(User)
Receiver = aliased(User)
results = await TransferCrud.get_multi(
db_session,
joins=[
(Sender, Transfer.sender_id == Sender.id),
(Receiver, Transfer.receiver_id == Receiver.id),
],
filters=[Sender.username == "alice", Receiver.username == "bob"],
)
assert len(results) == 1
assert results[0].amount == "100"
@pytest.mark.anyio
async def test_get_multi_aliased_no_match(self, db_session: AsyncSession):
"""Aliased joins correctly filter out non-matching rows."""
from sqlalchemy.orm import aliased
alice = await UserCrud.create(
db_session, UserCreate(username="alice", email="alice@test.com")
)
bob = await UserCrud.create(
db_session, UserCreate(username="bob", email="bob@test.com")
)
await TransferCrud.create(
db_session,
TransferCreate(amount="100", sender_id=alice.id, receiver_id=bob.id),
)
Sender = aliased(User)
Receiver = aliased(User)
# bob is receiver, not sender — should return nothing
results = await TransferCrud.get_multi(
db_session,
joins=[
(Sender, Transfer.sender_id == Sender.id),
(Receiver, Transfer.receiver_id == Receiver.id),
],
filters=[Sender.username == "bob", Receiver.username == "alice"],
)
assert len(results) == 0
@pytest.mark.anyio
async def test_paginate_with_aliased_joins(self, db_session: AsyncSession):
"""Aliased joins work with offset_paginate."""
from sqlalchemy.orm import aliased
alice = await UserCrud.create(
db_session, UserCreate(username="alice", email="alice@test.com")
)
bob = await UserCrud.create(
db_session, UserCreate(username="bob", email="bob@test.com")
)
await TransferCrud.create(
db_session,
TransferCreate(amount="50", sender_id=alice.id, receiver_id=bob.id),
)
await TransferCrud.create(
db_session,
TransferCreate(amount="75", sender_id=bob.id, receiver_id=alice.id),
)
Sender = aliased(User)
result = await TransferCrud.offset_paginate(
db_session,
joins=[(Sender, Transfer.sender_id == Sender.id)],
filters=[Sender.username == "alice"],
schema=TransferRead,
)
assert result.pagination.total_count == 1
assert result.data[0].amount == "50"
@pytest.mark.anyio
async def test_count_with_aliased_join(self, db_session: AsyncSession):
"""Aliased joins work with count."""
from sqlalchemy.orm import aliased
alice = await UserCrud.create(
db_session, UserCreate(username="alice", email="alice@test.com")
)
bob = await UserCrud.create(
db_session, UserCreate(username="bob", email="bob@test.com")
)
await TransferCrud.create(
db_session,
TransferCreate(amount="10", sender_id=alice.id, receiver_id=bob.id),
)
await TransferCrud.create(
db_session,
TransferCreate(amount="20", sender_id=alice.id, receiver_id=bob.id),
)
Sender = aliased(User)
count = await TransferCrud.count(
db_session,
joins=[(Sender, Transfer.sender_id == Sender.id)],
filters=[Sender.username == "alice"],
)
assert count == 2
class TestCrudFactoryM2M:
"""Tests for CrudFactory with m2m_fields parameter."""

View File

@@ -23,6 +23,12 @@ from .conftest import (
ArticleCreate,
ArticleCrud,
ArticleRead,
Color,
Order,
OrderCreate,
OrderCrud,
OrderRead,
OrderStatus,
Role,
RoleCreate,
RoleCrud,
@@ -971,7 +977,7 @@ class TestFilterBy:
@pytest.mark.anyio
async def test_bool_filter_false(self, db_session: AsyncSession):
"""filter_by with a boolean False value correctly filters rows."""
"""filter_by with a string 'false' value correctly filters rows."""
UserBoolCrud = CrudFactory(User, facet_fields=[User.is_active])
await UserCrud.create(
db_session, UserCreate(username="alice", email="a@test.com", is_active=True)
@@ -982,7 +988,7 @@ class TestFilterBy:
)
result = await UserBoolCrud.offset_paginate(
db_session, filter_by={"is_active": False}, schema=UserRead
db_session, filter_by={"is_active": "false"}, schema=UserRead
)
assert isinstance(result.pagination, OffsetPagination)
@@ -991,7 +997,7 @@ class TestFilterBy:
@pytest.mark.anyio
async def test_bool_filter_true(self, db_session: AsyncSession):
"""filter_by with a boolean True value correctly filters rows."""
"""filter_by with a string 'true' value correctly filters rows."""
UserBoolCrud = CrudFactory(User, facet_fields=[User.is_active])
await UserCrud.create(
db_session, UserCreate(username="alice", email="a@test.com", is_active=True)
@@ -1002,7 +1008,7 @@ class TestFilterBy:
)
result = await UserBoolCrud.offset_paginate(
db_session, filter_by={"is_active": True}, schema=UserRead
db_session, filter_by={"is_active": "true"}, schema=UserRead
)
assert isinstance(result.pagination, OffsetPagination)
@@ -1011,7 +1017,7 @@ class TestFilterBy:
@pytest.mark.anyio
async def test_bool_filter_list(self, db_session: AsyncSession):
"""filter_by with a list of booleans produces an IN clause."""
"""filter_by with a list of string booleans produces an IN clause."""
UserBoolCrud = CrudFactory(User, facet_fields=[User.is_active])
await UserCrud.create(
db_session, UserCreate(username="alice", email="a@test.com", is_active=True)
@@ -1022,12 +1028,41 @@ class TestFilterBy:
)
result = await UserBoolCrud.offset_paginate(
db_session, filter_by={"is_active": [True, False]}, schema=UserRead
db_session, filter_by={"is_active": ["true", "false"]}, schema=UserRead
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 2
@pytest.mark.anyio
async def test_bool_filter_native_bool(self, db_session: AsyncSession):
"""filter_by with a native Python bool passes through coercion."""
UserBoolCrud = CrudFactory(User, facet_fields=[User.is_active])
await UserCrud.create(
db_session, UserCreate(username="alice", email="a@test.com", is_active=True)
)
result = await UserBoolCrud.offset_paginate(
db_session, filter_by={"is_active": True}, schema=UserRead
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 1
def test_bool_coerce_invalid_value(self):
"""_coerce_bool raises ValueError for non-bool, non-string values."""
from fastapi_toolsets.crud.search import _coerce_bool
with pytest.raises(ValueError, match="Cannot coerce"):
_coerce_bool(42)
def test_bool_coerce_invalid_string(self):
"""_coerce_bool raises ValueError for unrecognized string values."""
from fastapi_toolsets.crud.search import _coerce_bool
with pytest.raises(ValueError, match="Cannot coerce"):
_coerce_bool("maybe")
@pytest.mark.anyio
async def test_array_contains_single_value(self, db_session: AsyncSession):
"""filter_by on an ARRAY column with a scalar checks containment."""
@@ -1092,6 +1127,253 @@ class TestFilterBy:
assert "JSON" in exc_info.value.col_type
class TestFilterByIntEnum:
"""Tests for filter_by on columns typed as (int, Enum) / IntEnum."""
@pytest.mark.anyio
async def test_filter_by_intenum_member(self, db_session: AsyncSession):
"""filter_by with an IntEnum member value filters correctly."""
OrderFacetCrud = CrudFactory(Order, facet_fields=[Order.status])
await OrderCrud.create(
db_session, OrderCreate(name="order-1", status=OrderStatus.PENDING)
)
await OrderCrud.create(
db_session, OrderCreate(name="order-2", status=OrderStatus.SHIPPED)
)
await OrderCrud.create(
db_session, OrderCreate(name="order-3", status=OrderStatus.PENDING)
)
result = await OrderFacetCrud.offset_paginate(
db_session,
filter_by={"status": OrderStatus.PENDING},
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 2
names = {o.name for o in result.data}
assert names == {"order-1", "order-3"}
@pytest.mark.anyio
async def test_filter_by_plain_int_value_raises(self, db_session: AsyncSession):
"""filter_by with a plain int on an IntEnum column raises KeyError — use name or member."""
OrderFacetCrud = CrudFactory(Order, facet_fields=[Order.status])
with pytest.raises(KeyError):
await OrderFacetCrud.offset_paginate(
db_session,
filter_by={"status": 1},
schema=OrderRead,
)
@pytest.mark.anyio
async def test_filter_by_intenum_list(self, db_session: AsyncSession):
"""filter_by with a list of IntEnum members produces an IN filter."""
OrderFacetCrud = CrudFactory(Order, facet_fields=[Order.status])
await OrderCrud.create(
db_session, OrderCreate(name="order-1", status=OrderStatus.PENDING)
)
await OrderCrud.create(
db_session, OrderCreate(name="order-2", status=OrderStatus.SHIPPED)
)
await OrderCrud.create(
db_session, OrderCreate(name="order-3", status=OrderStatus.CANCELLED)
)
result = await OrderFacetCrud.offset_paginate(
db_session,
filter_by={"status": [OrderStatus.PENDING, OrderStatus.SHIPPED]},
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 2
names = {o.name for o in result.data}
assert names == {"order-1", "order-2"}
@pytest.mark.anyio
async def test_filter_by_plain_int_list_raises(self, db_session: AsyncSession):
"""filter_by with a list of plain ints on an IntEnum column raises KeyError — use names or members."""
OrderFacetCrud = CrudFactory(Order, facet_fields=[Order.status])
with pytest.raises(KeyError):
await OrderFacetCrud.offset_paginate(
db_session,
filter_by={"status": [1, 3]},
schema=OrderRead,
)
@pytest.mark.anyio
async def test_filter_by_intenum_name_string(self, db_session: AsyncSession):
"""filter_by with the enum member name as a string filters correctly."""
OrderFacetCrud = CrudFactory(Order, facet_fields=[Order.status])
await OrderCrud.create(
db_session, OrderCreate(name="order-1", status=OrderStatus.PENDING)
)
await OrderCrud.create(
db_session, OrderCreate(name="order-2", status=OrderStatus.SHIPPED)
)
result = await OrderFacetCrud.offset_paginate(
db_session,
filter_by={
"status": "PENDING"
}, # name as string, e.g. from HTTP query param
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 1
assert result.data[0].name == "order-1"
@pytest.mark.anyio
async def test_filter_by_intenum_name_string_list(self, db_session: AsyncSession):
"""filter_by with a list of enum name strings produces an IN filter."""
OrderFacetCrud = CrudFactory(Order, facet_fields=[Order.status])
await OrderCrud.create(
db_session, OrderCreate(name="order-1", status=OrderStatus.PENDING)
)
await OrderCrud.create(
db_session, OrderCreate(name="order-2", status=OrderStatus.SHIPPED)
)
await OrderCrud.create(
db_session, OrderCreate(name="order-3", status=OrderStatus.CANCELLED)
)
result = await OrderFacetCrud.offset_paginate(
db_session,
filter_by={"status": ["PENDING", "SHIPPED"]},
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 2
names = {o.name for o in result.data}
assert names == {"order-1", "order-2"}
class TestFilterByStrEnum:
"""Tests for filter_by on columns typed as (str, Enum) / StrEnum (lines 364-367)."""
@pytest.mark.anyio
async def test_filter_by_strenum_member(self, db_session: AsyncSession):
"""filter_by with a StrEnum member on a string Enum column filters correctly."""
OrderColorCrud = CrudFactory(Order, facet_fields=[Order.color])
await OrderCrud.create(
db_session,
OrderCreate(name="red-order", status=OrderStatus.PENDING, color=Color.RED),
)
await OrderCrud.create(
db_session,
OrderCreate(
name="blue-order", status=OrderStatus.PENDING, color=Color.BLUE
),
)
result = await OrderColorCrud.offset_paginate(
db_session,
filter_by={"color": Color.RED},
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 1
assert result.data[0].name == "red-order"
@pytest.mark.anyio
async def test_filter_by_strenum_list(self, db_session: AsyncSession):
"""filter_by with a list of StrEnum members produces an IN filter."""
OrderColorCrud = CrudFactory(Order, facet_fields=[Order.color])
await OrderCrud.create(
db_session,
OrderCreate(name="red-order", status=OrderStatus.PENDING, color=Color.RED),
)
await OrderCrud.create(
db_session,
OrderCreate(
name="green-order", status=OrderStatus.PENDING, color=Color.GREEN
),
)
await OrderCrud.create(
db_session,
OrderCreate(
name="blue-order", status=OrderStatus.PENDING, color=Color.BLUE
),
)
result = await OrderColorCrud.offset_paginate(
db_session,
filter_by={"color": [Color.RED, Color.BLUE]},
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 2
names = {o.name for o in result.data}
assert names == {"red-order", "blue-order"}
class TestFilterByIntegerColumn:
"""Tests for filter_by on plain Integer columns with IntEnum values."""
@pytest.mark.anyio
async def test_filter_by_integer_column_with_intenum_member(
self, db_session: AsyncSession
):
"""filter_by with an IntEnum member on an Integer column works correctly."""
OrderPriorityCrud = CrudFactory(Order, facet_fields=[Order.priority])
await OrderCrud.create(
db_session,
OrderCreate(
name="order-1", status=OrderStatus.PENDING, priority=OrderStatus.PENDING
),
)
await OrderCrud.create(
db_session,
OrderCreate(
name="order-2", status=OrderStatus.SHIPPED, priority=OrderStatus.SHIPPED
),
)
result = await OrderPriorityCrud.offset_paginate(
db_session,
filter_by={
"priority": OrderStatus.PENDING
}, # IntEnum member on Integer col
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 1
assert result.data[0].name == "order-1"
@pytest.mark.anyio
async def test_filter_by_integer_column_with_plain_int(
self, db_session: AsyncSession
):
"""filter_by with a plain int on an Integer column works correctly."""
OrderPriorityCrud = CrudFactory(Order, facet_fields=[Order.priority])
await OrderCrud.create(
db_session,
OrderCreate(name="order-1", status=OrderStatus.PENDING, priority=1),
)
await OrderCrud.create(
db_session,
OrderCreate(name="order-2", status=OrderStatus.SHIPPED, priority=3),
)
result = await OrderPriorityCrud.offset_paginate(
db_session,
filter_by={"priority": 1},
schema=OrderRead,
)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 1
assert result.data[0].name == "order-1"
class TestFilterParamsViaConsolidated:
"""Tests for filter params via consolidated offset_paginate_params()."""
@@ -1487,6 +1769,111 @@ class TestSearchColumns:
assert result.data[0].username == "bob"
class TestOrderColumns:
"""Tests for order_columns in paginated responses."""
@pytest.mark.anyio
async def test_order_columns_returned_in_offset_paginate(
self, db_session: AsyncSession
):
"""offset_paginate response includes order_columns."""
UserSortCrud = CrudFactory(User, order_fields=[User.username, User.email])
await UserCrud.create(
db_session, UserCreate(username="alice", email="a@test.com")
)
result = await UserSortCrud.offset_paginate(db_session, schema=UserRead)
assert result.order_columns is not None
assert "username" in result.order_columns
assert "email" in result.order_columns
@pytest.mark.anyio
async def test_order_columns_returned_in_cursor_paginate(
self, db_session: AsyncSession
):
"""cursor_paginate response includes order_columns."""
UserSortCursorCrud = CrudFactory(
User,
cursor_column=User.id,
order_fields=[User.username, User.email],
)
await UserCrud.create(
db_session, UserCreate(username="alice", email="a@test.com")
)
result = await UserSortCursorCrud.cursor_paginate(db_session, schema=UserRead)
assert result.order_columns is not None
assert "username" in result.order_columns
assert "email" in result.order_columns
@pytest.mark.anyio
async def test_order_columns_none_when_no_order_fields(
self, db_session: AsyncSession
):
"""order_columns is None when no order_fields are configured."""
result = await UserCrud.offset_paginate(db_session, schema=UserRead)
assert result.order_columns is None
@pytest.mark.anyio
async def test_order_columns_override_in_offset_paginate(
self, db_session: AsyncSession
):
"""order_fields override in offset_paginate is reflected in order_columns."""
await UserCrud.create(
db_session, UserCreate(username="alice", email="a@test.com")
)
result = await UserCrud.offset_paginate(
db_session, order_fields=[User.email], schema=UserRead
)
assert result.order_columns == ["email"]
@pytest.mark.anyio
async def test_order_columns_override_in_cursor_paginate(
self, db_session: AsyncSession
):
"""order_fields override in cursor_paginate is reflected in order_columns."""
UserCursorCrud = CrudFactory(User, cursor_column=User.id)
await UserCrud.create(
db_session, UserCreate(username="alice", email="a@test.com")
)
result = await UserCursorCrud.cursor_paginate(
db_session, order_fields=[User.username], schema=UserRead
)
assert result.order_columns == ["username"]
@pytest.mark.anyio
async def test_order_columns_are_sorted_alphabetically(
self, db_session: AsyncSession
):
"""order_columns keys are returned in alphabetical order."""
UserSortCrud = CrudFactory(User, order_fields=[User.email, User.username])
await UserCrud.create(
db_session, UserCreate(username="alice", email="a@test.com")
)
result = await UserSortCrud.offset_paginate(db_session, schema=UserRead)
assert result.order_columns is not None
assert result.order_columns == sorted(result.order_columns)
@pytest.mark.anyio
async def test_relation_order_field_in_order_columns(
self, db_session: AsyncSession
):
"""A relation tuple order field produces 'rel__column' key in order_columns."""
UserSortCrud = CrudFactory(User, order_fields=[(User.role, Role.name)])
result = await UserSortCrud.offset_paginate(db_session, schema=UserRead)
assert result.order_columns == ["role__name"]
class TestOrderParamsViaConsolidated:
"""Tests for order params via consolidated offset_paginate_params()."""
@@ -1641,6 +2028,92 @@ class TestOrderParamsViaConsolidated:
assert result.data[0].username == "alice"
assert result.data[1].username == "charlie"
def test_relation_order_field_key_in_enum(self):
"""A relation tuple field produces a 'rel__column' key in the order_by enum."""
UserOrderCrud = CrudFactory(User, order_fields=[(User.role, Role.name)])
dep = UserOrderCrud.offset_paginate_params(search=False, filter=False)
sig = inspect.signature(dep)
description = sig.parameters["order_by"].default.description
assert "role__name" in description
@pytest.mark.anyio
async def test_relation_order_field_produces_order_joins(self):
"""Selecting a relation order field emits order_by and order_joins."""
UserOrderCrud = CrudFactory(User, order_fields=[(User.role, Role.name)])
dep = UserOrderCrud.offset_paginate_params(search=False, filter=False)
result = await dep(
page=1, items_per_page=20, order_by="role__name", order="asc"
)
assert "order_by" in result
assert "order_joins" in result
assert result["order_joins"] == [User.role]
@pytest.mark.anyio
async def test_relation_order_integrates_with_offset_paginate(
self, db_session: AsyncSession
):
"""Relation order field joins the related table and sorts correctly."""
UserOrderCrud = CrudFactory(User, order_fields=[(User.role, Role.name)])
role_b = await RoleCrud.create(db_session, RoleCreate(name="beta"))
role_a = await RoleCrud.create(db_session, RoleCreate(name="alpha"))
await UserCrud.create(
db_session,
UserCreate(username="u1", email="u1@test.com", role_id=role_b.id),
)
await UserCrud.create(
db_session,
UserCreate(username="u2", email="u2@test.com", role_id=role_a.id),
)
await UserCrud.create(
db_session, UserCreate(username="u3", email="u3@test.com")
)
dep = UserOrderCrud.offset_paginate_params(search=False, filter=False)
params = await dep(
page=1, items_per_page=20, order_by="role__name", order="asc"
)
result = await UserOrderCrud.offset_paginate(
db_session, **params, schema=UserRead
)
usernames = [u.username for u in result.data]
# u2 (alpha) before u1 (beta); u3 (no role, NULL) comes last or first depending on DB
assert usernames.index("u2") < usernames.index("u1")
@pytest.mark.anyio
async def test_relation_order_integrates_with_cursor_paginate(
self, db_session: AsyncSession
):
"""Relation order field works with cursor_paginate (order_joins applied)."""
UserOrderCrud = CrudFactory(
User,
order_fields=[(User.role, Role.name)],
cursor_column=User.id,
)
role_b = await RoleCrud.create(db_session, RoleCreate(name="zeta"))
role_a = await RoleCrud.create(db_session, RoleCreate(name="alpha"))
await UserCrud.create(
db_session,
UserCreate(username="cx1", email="cx1@test.com", role_id=role_b.id),
)
await UserCrud.create(
db_session,
UserCreate(username="cx2", email="cx2@test.com", role_id=role_a.id),
)
dep = UserOrderCrud.cursor_paginate_params(search=False, filter=False)
params = await dep(
cursor=None, items_per_page=20, order_by="role__name", order="asc"
)
result = await UserOrderCrud.cursor_paginate(
db_session, **params, schema=UserRead
)
assert result.data is not None
assert len(result.data) == 2
class TestOffsetPaginateParamsSchema:
"""Tests for AsyncCrud.offset_paginate_params()."""

2
uv.lock generated
View File

@@ -251,7 +251,7 @@ wheels = [
[[package]]
name = "fastapi-toolsets"
version = "3.0.0"
version = "3.0.3"
source = { editable = "." }
dependencies = [
{ name = "asyncpg" },

View File

@@ -2,10 +2,15 @@
site_name = "FastAPI Toolsets"
site_description = "Production-ready utilities for FastAPI applications."
site_author = "d3vyce"
site_url = "https://fastapi-toolsets.d3vyce.fr"
site_url = "https://fastapi-toolsets.d3vyce.fr/"
copyright = "Copyright &copy; 2026 d3vyce"
repo_url = "https://github.com/d3vyce/fastapi-toolsets"
[project.extra.version]
provider = "mike"
default = "stable"
alias = true
[project.theme]
custom_dir = "docs/overrides"
language = "en"