4 Commits

16 changed files with 528 additions and 695 deletions

View File

@@ -95,9 +95,6 @@ The [`offset_paginate`](../reference/crud.md#fastapi_toolsets.crud.factory.Async
} }
``` ```
!!! warning "Deprecated: `paginate`"
The `paginate` function is a backward-compatible alias for `offset_paginate`. This function is **deprecated** and will be removed in **v2.0**.
### Cursor pagination ### Cursor pagination
```python ```python
@@ -471,9 +468,6 @@ async def list_users(session: SessionDep, page: int = 1) -> PaginatedResponse[Us
The schema must have `from_attributes=True` (or inherit from [`PydanticBase`](../reference/schemas.md#fastapi_toolsets.schemas.PydanticBase)) so it can be built from SQLAlchemy model instances. The schema must have `from_attributes=True` (or inherit from [`PydanticBase`](../reference/schemas.md#fastapi_toolsets.schemas.PydanticBase)) so it can be built from SQLAlchemy model instances.
!!! warning "Deprecated: `as_response`"
The `as_response=True` parameter is **deprecated** and will be removed in **v2.0**. Replace it with `schema=YourSchema`.
--- ---
[:material-api: API Reference](../reference/crud.md) [:material-api: API Reference](../reference/crud.md)

View File

@@ -72,7 +72,7 @@ async def load(
registry = get_fixtures_registry() registry = get_fixtures_registry()
db_context = get_db_context() db_context = get_db_context()
context_list = [c.value for c in contexts] if contexts else [Context.BASE] context_list = list(contexts) if contexts else [Context.BASE]
ordered = registry.resolve_context_dependencies(*context_list) ordered = registry.resolve_context_dependencies(*context_list)

View File

@@ -1,12 +1,9 @@
"""Generic async CRUD operations for SQLAlchemy models.""" """Generic async CRUD operations for SQLAlchemy models."""
from ..exceptions import InvalidFacetFilterError, NoSearchableFieldsError from ..exceptions import InvalidFacetFilterError, NoSearchableFieldsError
from .factory import CrudFactory, JoinType, M2MFieldType, OrderByClause from ..types import FacetFieldType, JoinType, M2MFieldType, OrderByClause
from .search import ( from .factory import CrudFactory
FacetFieldType, from .search import SearchConfig, get_searchable_fields
SearchConfig,
get_searchable_fields,
)
__all__ = [ __all__ = [
"CrudFactory", "CrudFactory",

View File

@@ -6,11 +6,10 @@ import base64
import inspect import inspect
import json import json
import uuid as uuid_module import uuid as uuid_module
import warnings from collections.abc import Awaitable, Callable, Sequence
from collections.abc import Awaitable, Callable, Mapping, Sequence
from datetime import date, datetime from datetime import date, datetime
from decimal import Decimal from decimal import Decimal
from typing import Any, ClassVar, Generic, Literal, Self, TypeVar, cast, overload from typing import Any, ClassVar, Generic, Literal, Self, cast, overload
from fastapi import Query from fastapi import Query
from pydantic import BaseModel from pydantic import BaseModel
@@ -21,28 +20,28 @@ from sqlalchemy.exc import NoResultFound
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase, QueryableAttribute, selectinload from sqlalchemy.orm import DeclarativeBase, QueryableAttribute, selectinload
from sqlalchemy.sql.base import ExecutableOption from sqlalchemy.sql.base import ExecutableOption
from sqlalchemy.sql.elements import ColumnElement
from sqlalchemy.sql.roles import WhereHavingRole from sqlalchemy.sql.roles import WhereHavingRole
from ..db import get_transaction from ..db import get_transaction
from ..exceptions import InvalidOrderFieldError, NotFoundError from ..exceptions import InvalidOrderFieldError, NotFoundError
from ..schemas import CursorPagination, OffsetPagination, PaginatedResponse, Response from ..schemas import CursorPagination, OffsetPagination, PaginatedResponse, Response
from .search import ( from ..types import (
FacetFieldType, FacetFieldType,
SearchConfig, JoinType,
M2MFieldType,
ModelType,
OrderByClause,
SchemaType,
SearchFieldType, SearchFieldType,
)
from .search import (
SearchConfig,
build_facets, build_facets,
build_filter_by, build_filter_by,
build_search_filters, build_search_filters,
facet_keys, facet_keys,
) )
ModelType = TypeVar("ModelType", bound=DeclarativeBase)
SchemaType = TypeVar("SchemaType", bound=BaseModel)
JoinType = list[tuple[type[DeclarativeBase], Any]]
M2MFieldType = Mapping[str, QueryableAttribute[Any]]
OrderByClause = ColumnElement[Any] | QueryableAttribute[Any]
def _encode_cursor(value: Any) -> str: def _encode_cursor(value: Any) -> str:
"""Encode cursor column value as an base64 string.""" """Encode cursor column value as an base64 string."""
@@ -54,6 +53,22 @@ def _decode_cursor(cursor: str) -> str:
return json.loads(base64.b64decode(cursor.encode()).decode()) return json.loads(base64.b64decode(cursor.encode()).decode())
def _apply_joins(q: Any, joins: JoinType | None, outer_join: bool) -> Any:
"""Apply a list of (model, condition) joins to a SQLAlchemy select query."""
if not joins:
return q
for model, condition in joins:
q = q.outerjoin(model, condition) if outer_join else q.join(model, condition)
return q
def _apply_search_joins(q: Any, search_joins: list[Any]) -> Any:
"""Apply relationship-based outer joins (from search/filter_by) to a query."""
for join_rel in search_joins:
q = q.outerjoin(join_rel)
return q
class AsyncCrud(Generic[ModelType]): class AsyncCrud(Generic[ModelType]):
"""Generic async CRUD operations for SQLAlchemy models. """Generic async CRUD operations for SQLAlchemy models.
@@ -133,6 +148,48 @@ class AsyncCrud(Generic[ModelType]):
return set() return set()
return set(cls.m2m_fields.keys()) return set(cls.m2m_fields.keys())
@classmethod
def _resolve_facet_fields(
cls: type[Self],
facet_fields: Sequence[FacetFieldType] | None,
) -> Sequence[FacetFieldType] | None:
"""Return facet_fields if given, otherwise fall back to the class-level default."""
return facet_fields if facet_fields is not None else cls.facet_fields
@classmethod
def _prepare_filter_by(
cls: type[Self],
filter_by: dict[str, Any] | BaseModel | None,
facet_fields: Sequence[FacetFieldType] | None,
) -> tuple[list[Any], list[Any]]:
"""Normalize filter_by and return (filters, joins) to apply to the query."""
if isinstance(filter_by, BaseModel):
filter_by = filter_by.model_dump(exclude_none=True)
if not filter_by:
return [], []
resolved = cls._resolve_facet_fields(facet_fields)
return build_filter_by(filter_by, resolved or [])
@classmethod
async def _build_filter_attributes(
cls: type[Self],
session: AsyncSession,
facet_fields: Sequence[FacetFieldType] | None,
filters: list[Any],
search_joins: list[Any],
) -> dict[str, list[Any]] | None:
"""Build facet filter_attributes, or return None if no facet fields configured."""
resolved = cls._resolve_facet_fields(facet_fields)
if not resolved:
return None
return await build_facets(
session,
cls.model,
resolved,
base_filters=filters,
base_joins=search_joins,
)
@classmethod @classmethod
def filter_params( def filter_params(
cls: type[Self], cls: type[Self],
@@ -153,7 +210,7 @@ class AsyncCrud(Generic[ModelType]):
ValueError: If no facet fields are configured on this CRUD class and none are ValueError: If no facet fields are configured on this CRUD class and none are
provided via ``facet_fields``. provided via ``facet_fields``.
""" """
fields = facet_fields if facet_fields is not None else cls.facet_fields fields = cls._resolve_facet_fields(facet_fields)
if not fields: if not fields:
raise ValueError( raise ValueError(
f"{cls.__name__} has no facet_fields configured. " f"{cls.__name__} has no facet_fields configured. "
@@ -244,10 +301,8 @@ class AsyncCrud(Generic[ModelType]):
obj: BaseModel, obj: BaseModel,
*, *,
schema: type[SchemaType], schema: type[SchemaType],
as_response: bool = ...,
) -> Response[SchemaType]: ... ) -> Response[SchemaType]: ...
# Backward-compatible - will be removed in v2.0
@overload @overload
@classmethod @classmethod
async def create( # pragma: no cover async def create( # pragma: no cover
@@ -255,18 +310,6 @@ class AsyncCrud(Generic[ModelType]):
session: AsyncSession, session: AsyncSession,
obj: BaseModel, obj: BaseModel,
*, *,
as_response: Literal[True],
schema: None = ...,
) -> Response[ModelType]: ...
@overload
@classmethod
async def create( # pragma: no cover
cls: type[Self],
session: AsyncSession,
obj: BaseModel,
*,
as_response: Literal[False] = ...,
schema: None = ..., schema: None = ...,
) -> ModelType: ... ) -> ModelType: ...
@@ -276,29 +319,19 @@ class AsyncCrud(Generic[ModelType]):
session: AsyncSession, session: AsyncSession,
obj: BaseModel, obj: BaseModel,
*, *,
as_response: bool = False,
schema: type[BaseModel] | None = None, schema: type[BaseModel] | None = None,
) -> ModelType | Response[ModelType] | Response[Any]: ) -> ModelType | Response[Any]:
"""Create a new record in the database. """Create a new record in the database.
Args: Args:
session: DB async session session: DB async session
obj: Pydantic model with data to create obj: Pydantic model with data to create
as_response: Deprecated. Use ``schema`` instead. Will be removed in v2.0.
schema: Pydantic schema to serialize the result into. When provided, schema: Pydantic schema to serialize the result into. When provided,
the result is automatically wrapped in a ``Response[schema]``. the result is automatically wrapped in a ``Response[schema]``.
Returns: Returns:
Created model instance, or ``Response[schema]`` when ``schema`` is given, Created model instance, or ``Response[schema]`` when ``schema`` is given.
or ``Response[ModelType]`` when ``as_response=True`` (deprecated).
""" """
if as_response and schema is None:
warnings.warn(
"as_response is deprecated and will be removed in v2.0. "
"Use schema=YourSchema instead.",
DeprecationWarning,
stacklevel=2,
)
async with get_transaction(session): async with get_transaction(session):
m2m_exclude = cls._m2m_schema_fields() m2m_exclude = cls._m2m_schema_fields()
data = ( data = (
@@ -314,9 +347,8 @@ class AsyncCrud(Generic[ModelType]):
session.add(db_model) session.add(db_model)
await session.refresh(db_model) await session.refresh(db_model)
result = cast(ModelType, db_model) result = cast(ModelType, db_model)
if as_response or schema: if schema:
data_out = schema.model_validate(result) if schema else result return Response(data=schema.model_validate(result))
return Response(data=data_out)
return result return result
@overload @overload
@@ -331,10 +363,8 @@ class AsyncCrud(Generic[ModelType]):
with_for_update: bool = False, with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None, load_options: list[ExecutableOption] | None = None,
schema: type[SchemaType], schema: type[SchemaType],
as_response: bool = ...,
) -> Response[SchemaType]: ... ) -> Response[SchemaType]: ...
# Backward-compatible - will be removed in v2.0
@overload @overload
@classmethod @classmethod
async def get( # pragma: no cover async def get( # pragma: no cover
@@ -346,22 +376,6 @@ class AsyncCrud(Generic[ModelType]):
outer_join: bool = False, outer_join: bool = False,
with_for_update: bool = False, with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None, load_options: list[ExecutableOption] | None = None,
as_response: Literal[True],
schema: None = ...,
) -> Response[ModelType]: ...
@overload
@classmethod
async def get( # pragma: no cover
cls: type[Self],
session: AsyncSession,
filters: list[Any],
*,
joins: JoinType | None = None,
outer_join: bool = False,
with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None,
as_response: Literal[False] = ...,
schema: None = ..., schema: None = ...,
) -> ModelType: ... ) -> ModelType: ...
@@ -375,9 +389,8 @@ class AsyncCrud(Generic[ModelType]):
outer_join: bool = False, outer_join: bool = False,
with_for_update: bool = False, with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None, load_options: list[ExecutableOption] | None = None,
as_response: bool = False,
schema: type[BaseModel] | None = None, schema: type[BaseModel] | None = None,
) -> ModelType | Response[ModelType] | Response[Any]: ) -> ModelType | Response[Any]:
"""Get exactly one record. Raises NotFoundError if not found. """Get exactly one record. Raises NotFoundError if not found.
Args: Args:
@@ -387,33 +400,18 @@ class AsyncCrud(Generic[ModelType]):
outer_join: Use LEFT OUTER JOIN instead of INNER JOIN outer_join: Use LEFT OUTER JOIN instead of INNER JOIN
with_for_update: Lock the row for update with_for_update: Lock the row for update
load_options: SQLAlchemy loader options (e.g., selectinload) load_options: SQLAlchemy loader options (e.g., selectinload)
as_response: Deprecated. Use ``schema`` instead. Will be removed in v2.0.
schema: Pydantic schema to serialize the result into. When provided, schema: Pydantic schema to serialize the result into. When provided,
the result is automatically wrapped in a ``Response[schema]``. the result is automatically wrapped in a ``Response[schema]``.
Returns: Returns:
Model instance, or ``Response[schema]`` when ``schema`` is given, Model instance, or ``Response[schema]`` when ``schema`` is given.
or ``Response[ModelType]`` when ``as_response=True`` (deprecated).
Raises: Raises:
NotFoundError: If no record found NotFoundError: If no record found
MultipleResultsFound: If more than one record found MultipleResultsFound: If more than one record found
""" """
if as_response and schema is None:
warnings.warn(
"as_response is deprecated and will be removed in v2.0. "
"Use schema=YourSchema instead.",
DeprecationWarning,
stacklevel=2,
)
q = select(cls.model) q = select(cls.model)
if joins: q = _apply_joins(q, joins, outer_join)
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
q = q.where(and_(*filters)) q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options): if resolved := cls._resolve_load_options(load_options):
q = q.options(*resolved) q = q.options(*resolved)
@@ -424,9 +422,8 @@ class AsyncCrud(Generic[ModelType]):
if not item: if not item:
raise NotFoundError() raise NotFoundError()
result = cast(ModelType, item) result = cast(ModelType, item)
if as_response or schema: if schema:
data_out = schema.model_validate(result) if schema else result return Response(data=schema.model_validate(result))
return Response(data=data_out)
return result return result
@classmethod @classmethod
@@ -452,13 +449,7 @@ class AsyncCrud(Generic[ModelType]):
Model instance or None Model instance or None
""" """
q = select(cls.model) q = select(cls.model)
if joins: q = _apply_joins(q, joins, outer_join)
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
if filters: if filters:
q = q.where(and_(*filters)) q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options): if resolved := cls._resolve_load_options(load_options):
@@ -495,13 +486,7 @@ class AsyncCrud(Generic[ModelType]):
List of model instances List of model instances
""" """
q = select(cls.model) q = select(cls.model)
if joins: q = _apply_joins(q, joins, outer_join)
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
if filters: if filters:
q = q.where(and_(*filters)) q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options): if resolved := cls._resolve_load_options(load_options):
@@ -526,10 +511,8 @@ class AsyncCrud(Generic[ModelType]):
exclude_unset: bool = True, exclude_unset: bool = True,
exclude_none: bool = False, exclude_none: bool = False,
schema: type[SchemaType], schema: type[SchemaType],
as_response: bool = ...,
) -> Response[SchemaType]: ... ) -> Response[SchemaType]: ...
# Backward-compatible - will be removed in v2.0
@overload @overload
@classmethod @classmethod
async def update( # pragma: no cover async def update( # pragma: no cover
@@ -540,21 +523,6 @@ class AsyncCrud(Generic[ModelType]):
*, *,
exclude_unset: bool = True, exclude_unset: bool = True,
exclude_none: bool = False, exclude_none: bool = False,
as_response: Literal[True],
schema: None = ...,
) -> Response[ModelType]: ...
@overload
@classmethod
async def update( # pragma: no cover
cls: type[Self],
session: AsyncSession,
obj: BaseModel,
filters: list[Any],
*,
exclude_unset: bool = True,
exclude_none: bool = False,
as_response: Literal[False] = ...,
schema: None = ..., schema: None = ...,
) -> ModelType: ... ) -> ModelType: ...
@@ -567,9 +535,8 @@ class AsyncCrud(Generic[ModelType]):
*, *,
exclude_unset: bool = True, exclude_unset: bool = True,
exclude_none: bool = False, exclude_none: bool = False,
as_response: bool = False,
schema: type[BaseModel] | None = None, schema: type[BaseModel] | None = None,
) -> ModelType | Response[ModelType] | Response[Any]: ) -> ModelType | Response[Any]:
"""Update a record in the database. """Update a record in the database.
Args: Args:
@@ -578,24 +545,15 @@ class AsyncCrud(Generic[ModelType]):
filters: List of SQLAlchemy filter conditions filters: List of SQLAlchemy filter conditions
exclude_unset: Exclude fields not explicitly set in the schema exclude_unset: Exclude fields not explicitly set in the schema
exclude_none: Exclude fields with None value exclude_none: Exclude fields with None value
as_response: Deprecated. Use ``schema`` instead. Will be removed in v2.0.
schema: Pydantic schema to serialize the result into. When provided, schema: Pydantic schema to serialize the result into. When provided,
the result is automatically wrapped in a ``Response[schema]``. the result is automatically wrapped in a ``Response[schema]``.
Returns: Returns:
Updated model instance, or ``Response[schema]`` when ``schema`` is given, Updated model instance, or ``Response[schema]`` when ``schema`` is given.
or ``Response[ModelType]`` when ``as_response=True`` (deprecated).
Raises: Raises:
NotFoundError: If no record found NotFoundError: If no record found
""" """
if as_response and schema is None:
warnings.warn(
"as_response is deprecated and will be removed in v2.0. "
"Use schema=YourSchema instead.",
DeprecationWarning,
stacklevel=2,
)
async with get_transaction(session): async with get_transaction(session):
m2m_exclude = cls._m2m_schema_fields() m2m_exclude = cls._m2m_schema_fields()
@@ -625,9 +583,8 @@ class AsyncCrud(Generic[ModelType]):
for rel_attr, related_instances in m2m_resolved.items(): for rel_attr, related_instances in m2m_resolved.items():
setattr(db_model, rel_attr, related_instances) setattr(db_model, rel_attr, related_instances)
await session.refresh(db_model) await session.refresh(db_model)
if as_response or schema: if schema:
data_out = schema.model_validate(db_model) if schema else db_model return Response(data=schema.model_validate(db_model))
return Response(data=data_out)
return db_model return db_model
@classmethod @classmethod
@@ -683,7 +640,7 @@ class AsyncCrud(Generic[ModelType]):
session: AsyncSession, session: AsyncSession,
filters: list[Any], filters: list[Any],
*, *,
as_response: Literal[True], return_response: Literal[True],
) -> Response[None]: ... ) -> Response[None]: ...
@overload @overload
@@ -693,8 +650,8 @@ class AsyncCrud(Generic[ModelType]):
session: AsyncSession, session: AsyncSession,
filters: list[Any], filters: list[Any],
*, *,
as_response: Literal[False] = ..., return_response: Literal[False] = ...,
) -> bool: ... ) -> None: ...
@classmethod @classmethod
async def delete( async def delete(
@@ -702,33 +659,26 @@ class AsyncCrud(Generic[ModelType]):
session: AsyncSession, session: AsyncSession,
filters: list[Any], filters: list[Any],
*, *,
as_response: bool = False, return_response: bool = False,
) -> bool | Response[None]: ) -> None | Response[None]:
"""Delete records from the database. """Delete records from the database.
Args: Args:
session: DB async session session: DB async session
filters: List of SQLAlchemy filter conditions filters: List of SQLAlchemy filter conditions
as_response: Deprecated. Will be removed in v2.0. When ``True``, return_response: When ``True``, returns ``Response[None]`` instead
returns ``Response[None]`` instead of ``bool``. of ``None``. Useful for API endpoints that expect a consistent
response envelope.
Returns: Returns:
``True`` if deletion was executed, or ``Response[None]`` when ``None``, or ``Response[None]`` when ``return_response=True``.
``as_response=True`` (deprecated).
""" """
if as_response:
warnings.warn(
"as_response is deprecated and will be removed in v2.0. "
"Use schema=YourSchema instead.",
DeprecationWarning,
stacklevel=2,
)
async with get_transaction(session): async with get_transaction(session):
q = sql_delete(cls.model).where(and_(*filters)) q = sql_delete(cls.model).where(and_(*filters))
await session.execute(q) await session.execute(q)
if as_response: if return_response:
return Response(data=None) return Response(data=None)
return True return None
@classmethod @classmethod
async def count( async def count(
@@ -751,13 +701,7 @@ class AsyncCrud(Generic[ModelType]):
Number of matching records Number of matching records
""" """
q = select(func.count()).select_from(cls.model) q = select(func.count()).select_from(cls.model)
if joins: q = _apply_joins(q, joins, outer_join)
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
if filters: if filters:
q = q.where(and_(*filters)) q = q.where(and_(*filters))
result = await session.execute(q) result = await session.execute(q)
@@ -784,58 +728,11 @@ class AsyncCrud(Generic[ModelType]):
True if at least one record matches True if at least one record matches
""" """
q = select(cls.model) q = select(cls.model)
if joins: q = _apply_joins(q, joins, outer_join)
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
q = q.where(and_(*filters)).exists().select() q = q.where(and_(*filters)).exists().select()
result = await session.execute(q) result = await session.execute(q)
return bool(result.scalar()) return bool(result.scalar())
@overload
@classmethod
async def offset_paginate( # pragma: no cover
cls: type[Self],
session: AsyncSession,
*,
filters: list[Any] | None = None,
joins: JoinType | None = None,
outer_join: bool = False,
load_options: list[ExecutableOption] | None = None,
order_by: OrderByClause | None = None,
page: int = 1,
items_per_page: int = 20,
search: str | SearchConfig | None = None,
search_fields: Sequence[SearchFieldType] | None = None,
facet_fields: Sequence[FacetFieldType] | None = None,
filter_by: dict[str, Any] | BaseModel | None = None,
schema: type[SchemaType],
) -> PaginatedResponse[SchemaType]: ...
# Backward-compatible - will be removed in v2.0
@overload
@classmethod
async def offset_paginate( # pragma: no cover
cls: type[Self],
session: AsyncSession,
*,
filters: list[Any] | None = None,
joins: JoinType | None = None,
outer_join: bool = False,
load_options: list[ExecutableOption] | None = None,
order_by: OrderByClause | None = None,
page: int = 1,
items_per_page: int = 20,
search: str | SearchConfig | None = None,
search_fields: Sequence[SearchFieldType] | None = None,
facet_fields: Sequence[FacetFieldType] | None = None,
filter_by: dict[str, Any] | BaseModel | None = None,
schema: None = ...,
) -> PaginatedResponse[ModelType]: ...
@classmethod @classmethod
async def offset_paginate( async def offset_paginate(
cls: type[Self], cls: type[Self],
@@ -852,8 +749,8 @@ class AsyncCrud(Generic[ModelType]):
search_fields: Sequence[SearchFieldType] | None = None, search_fields: Sequence[SearchFieldType] | None = None,
facet_fields: Sequence[FacetFieldType] | None = None, facet_fields: Sequence[FacetFieldType] | None = None,
filter_by: dict[str, Any] | BaseModel | None = None, filter_by: dict[str, Any] | BaseModel | None = None,
schema: type[BaseModel] | None = None, schema: type[BaseModel],
) -> PaginatedResponse[ModelType] | PaginatedResponse[Any]: ) -> PaginatedResponse[Any]:
"""Get paginated results using offset-based pagination. """Get paginated results using offset-based pagination.
Args: Args:
@@ -871,54 +768,36 @@ class AsyncCrud(Generic[ModelType]):
filter_by: Dict of {column_key: value} to filter by declared facet fields. filter_by: Dict of {column_key: value} to filter by declared facet fields.
Keys must match the column.key of a facet field. Scalar → equality, Keys must match the column.key of a facet field. Scalar → equality,
list → IN clause. Raises InvalidFacetFilterError for unknown keys. list → IN clause. Raises InvalidFacetFilterError for unknown keys.
schema: Optional Pydantic schema to serialize each item into. schema: Pydantic schema to serialize each item into.
Returns: Returns:
PaginatedResponse with OffsetPagination metadata PaginatedResponse with OffsetPagination metadata
""" """
filters = list(filters) if filters else [] filters = list(filters) if filters else []
offset = (page - 1) * items_per_page offset = (page - 1) * items_per_page
search_joins: list[Any] = []
if isinstance(filter_by, BaseModel): fb_filters, search_joins = cls._prepare_filter_by(filter_by, facet_fields)
filter_by = filter_by.model_dump(exclude_none=True) or None filters.extend(fb_filters)
# Build filter_by conditions from declared facet fields
if filter_by:
resolved_facets_for_filter = (
facet_fields if facet_fields is not None else cls.facet_fields
)
fb_filters, fb_joins = build_filter_by(
filter_by, resolved_facets_for_filter or []
)
filters.extend(fb_filters)
search_joins.extend(fb_joins)
# Build search filters # Build search filters
if search: if search:
search_filters, search_joins = build_search_filters( search_filters, new_search_joins = build_search_filters(
cls.model, cls.model,
search, search,
search_fields=search_fields, search_fields=search_fields,
default_fields=cls.searchable_fields, default_fields=cls.searchable_fields,
) )
filters.extend(search_filters) filters.extend(search_filters)
search_joins.extend(new_search_joins)
# Build query with joins # Build query with joins
q = select(cls.model) q = select(cls.model)
# Apply explicit joins # Apply explicit joins
if joins: q = _apply_joins(q, joins, outer_join)
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
# Apply search joins (always outer joins for search) # Apply search joins (always outer joins for search)
for join_rel in search_joins: q = _apply_search_joins(q, search_joins)
q = q.outerjoin(join_rel)
if filters: if filters:
q = q.where(and_(*filters)) q = q.where(and_(*filters))
@@ -930,9 +809,7 @@ class AsyncCrud(Generic[ModelType]):
q = q.offset(offset).limit(items_per_page) q = q.offset(offset).limit(items_per_page)
result = await session.execute(q) result = await session.execute(q)
raw_items = cast(list[ModelType], result.unique().scalars().all()) raw_items = cast(list[ModelType], result.unique().scalars().all())
items: list[Any] = ( items: list[Any] = [schema.model_validate(item) for item in raw_items]
[schema.model_validate(item) for item in raw_items] if schema else raw_items
)
# Count query (with same joins and filters) # Count query (with same joins and filters)
pk_col = cls.model.__mapper__.primary_key[0] pk_col = cls.model.__mapper__.primary_key[0]
@@ -940,17 +817,10 @@ class AsyncCrud(Generic[ModelType]):
count_q = count_q.select_from(cls.model) count_q = count_q.select_from(cls.model)
# Apply explicit joins to count query # Apply explicit joins to count query
if joins: count_q = _apply_joins(count_q, joins, outer_join)
for model, condition in joins:
count_q = (
count_q.outerjoin(model, condition)
if outer_join
else count_q.join(model, condition)
)
# Apply search joins to count query # Apply search joins to count query
for join_rel in search_joins: count_q = _apply_search_joins(count_q, search_joins)
count_q = count_q.outerjoin(join_rel)
if filters: if filters:
count_q = count_q.where(and_(*filters)) count_q = count_q.where(and_(*filters))
@@ -958,19 +828,9 @@ class AsyncCrud(Generic[ModelType]):
count_result = await session.execute(count_q) count_result = await session.execute(count_q)
total_count = count_result.scalar_one() total_count = count_result.scalar_one()
# Build facets filter_attributes = await cls._build_filter_attributes(
resolved_facet_fields = ( session, facet_fields, filters, search_joins
facet_fields if facet_fields is not None else cls.facet_fields
) )
filter_attributes: dict[str, list[Any]] | None = None
if resolved_facet_fields:
filter_attributes = await build_facets(
session,
cls.model,
resolved_facet_fields,
base_filters=filters or None,
base_joins=search_joins or None,
)
return PaginatedResponse( return PaginatedResponse(
data=items, data=items,
@@ -983,50 +843,6 @@ class AsyncCrud(Generic[ModelType]):
filter_attributes=filter_attributes, filter_attributes=filter_attributes,
) )
# Backward-compatible - will be removed in v2.0
paginate = offset_paginate
@overload
@classmethod
async def cursor_paginate( # pragma: no cover
cls: type[Self],
session: AsyncSession,
*,
cursor: str | None = None,
filters: list[Any] | None = None,
joins: JoinType | None = None,
outer_join: bool = False,
load_options: list[ExecutableOption] | None = None,
order_by: OrderByClause | None = None,
items_per_page: int = 20,
search: str | SearchConfig | None = None,
search_fields: Sequence[SearchFieldType] | None = None,
facet_fields: Sequence[FacetFieldType] | None = None,
filter_by: dict[str, Any] | BaseModel | None = None,
schema: type[SchemaType],
) -> PaginatedResponse[SchemaType]: ...
# Backward-compatible - will be removed in v2.0
@overload
@classmethod
async def cursor_paginate( # pragma: no cover
cls: type[Self],
session: AsyncSession,
*,
cursor: str | None = None,
filters: list[Any] | None = None,
joins: JoinType | None = None,
outer_join: bool = False,
load_options: list[ExecutableOption] | None = None,
order_by: OrderByClause | None = None,
items_per_page: int = 20,
search: str | SearchConfig | None = None,
search_fields: Sequence[SearchFieldType] | None = None,
facet_fields: Sequence[FacetFieldType] | None = None,
filter_by: dict[str, Any] | BaseModel | None = None,
schema: None = ...,
) -> PaginatedResponse[ModelType]: ...
@classmethod @classmethod
async def cursor_paginate( async def cursor_paginate(
cls: type[Self], cls: type[Self],
@@ -1043,8 +859,8 @@ class AsyncCrud(Generic[ModelType]):
search_fields: Sequence[SearchFieldType] | None = None, search_fields: Sequence[SearchFieldType] | None = None,
facet_fields: Sequence[FacetFieldType] | None = None, facet_fields: Sequence[FacetFieldType] | None = None,
filter_by: dict[str, Any] | BaseModel | None = None, filter_by: dict[str, Any] | BaseModel | None = None,
schema: type[BaseModel] | None = None, schema: type[BaseModel],
) -> PaginatedResponse[ModelType] | PaginatedResponse[Any]: ) -> PaginatedResponse[Any]:
"""Get paginated results using cursor-based pagination. """Get paginated results using cursor-based pagination.
Args: Args:
@@ -1071,21 +887,9 @@ class AsyncCrud(Generic[ModelType]):
PaginatedResponse with CursorPagination metadata PaginatedResponse with CursorPagination metadata
""" """
filters = list(filters) if filters else [] filters = list(filters) if filters else []
search_joins: list[Any] = []
if isinstance(filter_by, BaseModel): fb_filters, search_joins = cls._prepare_filter_by(filter_by, facet_fields)
filter_by = filter_by.model_dump(exclude_none=True) or None filters.extend(fb_filters)
# Build filter_by conditions from declared facet fields
if filter_by:
resolved_facets_for_filter = (
facet_fields if facet_fields is not None else cls.facet_fields
)
fb_filters, fb_joins = build_filter_by(
filter_by, resolved_facets_for_filter or []
)
filters.extend(fb_filters)
search_joins.extend(fb_joins)
if cls.cursor_column is None: if cls.cursor_column is None:
raise ValueError( raise ValueError(
@@ -1118,29 +922,23 @@ class AsyncCrud(Generic[ModelType]):
# Build search filters # Build search filters
if search: if search:
search_filters, search_joins = build_search_filters( search_filters, new_search_joins = build_search_filters(
cls.model, cls.model,
search, search,
search_fields=search_fields, search_fields=search_fields,
default_fields=cls.searchable_fields, default_fields=cls.searchable_fields,
) )
filters.extend(search_filters) filters.extend(search_filters)
search_joins.extend(new_search_joins)
# Build query # Build query
q = select(cls.model) q = select(cls.model)
# Apply explicit joins # Apply explicit joins
if joins: q = _apply_joins(q, joins, outer_join)
for model, condition in joins:
q = (
q.outerjoin(model, condition)
if outer_join
else q.join(model, condition)
)
# Apply search joins (always outer joins) # Apply search joins (always outer joins)
for join_rel in search_joins: q = _apply_search_joins(q, search_joins)
q = q.outerjoin(join_rel)
if filters: if filters:
q = q.where(and_(*filters)) q = q.where(and_(*filters))
@@ -1170,25 +968,11 @@ class AsyncCrud(Generic[ModelType]):
if cursor is not None and items_page: if cursor is not None and items_page:
prev_cursor = _encode_cursor(getattr(items_page[0], cursor_col_name)) prev_cursor = _encode_cursor(getattr(items_page[0], cursor_col_name))
items: list[Any] = ( items: list[Any] = [schema.model_validate(item) for item in items_page]
[schema.model_validate(item) for item in items_page]
if schema
else items_page
)
# Build facets filter_attributes = await cls._build_filter_attributes(
resolved_facet_fields = ( session, facet_fields, filters, search_joins
facet_fields if facet_fields is not None else cls.facet_fields
) )
filter_attributes: dict[str, list[Any]] | None = None
if resolved_facet_fields:
filter_attributes = await build_facets(
session,
cls.model,
resolved_facet_fields,
base_filters=filters or None,
base_joins=search_joins or None,
)
return PaginatedResponse( return PaginatedResponse(
data=items, data=items,

View File

@@ -1,24 +1,23 @@
"""Search utilities for AsyncCrud.""" """Search utilities for AsyncCrud."""
import asyncio import asyncio
import functools
from collections import Counter from collections import Counter
from collections.abc import Sequence from collections.abc import Sequence
from dataclasses import dataclass from dataclasses import dataclass, replace
from typing import TYPE_CHECKING, Any, Literal from typing import TYPE_CHECKING, Any, Literal
from sqlalchemy import String, or_, select from sqlalchemy import String, and_, or_, select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm.attributes import InstrumentedAttribute from sqlalchemy.orm.attributes import InstrumentedAttribute
from ..exceptions import InvalidFacetFilterError, NoSearchableFieldsError from ..exceptions import InvalidFacetFilterError, NoSearchableFieldsError
from ..types import FacetFieldType, SearchFieldType
if TYPE_CHECKING: if TYPE_CHECKING:
from sqlalchemy.sql.elements import ColumnElement from sqlalchemy.sql.elements import ColumnElement
SearchFieldType = InstrumentedAttribute[Any] | tuple[InstrumentedAttribute[Any], ...]
FacetFieldType = SearchFieldType
@dataclass @dataclass
class SearchConfig: class SearchConfig:
@@ -37,6 +36,7 @@ class SearchConfig:
match_mode: Literal["any", "all"] = "any" match_mode: Literal["any", "all"] = "any"
@functools.lru_cache(maxsize=128)
def get_searchable_fields( def get_searchable_fields(
model: type[DeclarativeBase], model: type[DeclarativeBase],
*, *,
@@ -101,14 +101,11 @@ def build_search_filters(
if isinstance(search, str): if isinstance(search, str):
config = SearchConfig(query=search, fields=search_fields) config = SearchConfig(query=search, fields=search_fields)
else: else:
config = search config = (
if search_fields is not None: replace(search, fields=search_fields)
config = SearchConfig( if search_fields is not None
query=config.query, else search
fields=search_fields, )
case_sensitive=config.case_sensitive,
match_mode=config.match_mode,
)
if not config.query or not config.query.strip(): if not config.query or not config.query.strip():
return [], [] return [], []
@@ -227,8 +224,6 @@ async def build_facets(
q = q.outerjoin(rel) q = q.outerjoin(rel)
if base_filters: if base_filters:
from sqlalchemy import and_
q = q.where(and_(*base_filters)) q = q.where(and_(*base_filters))
q = q.order_by(column) q = q.order_by(column)

View File

@@ -1,20 +1,17 @@
"""Dependency factories for FastAPI routes.""" """Dependency factories for FastAPI routes."""
import inspect import inspect
from collections.abc import AsyncGenerator, Callable from collections.abc import Callable
from typing import Any, TypeVar, cast from typing import Any, cast
from fastapi import Depends from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase
from .crud import CrudFactory from .crud import CrudFactory
from .types import ModelType, SessionDependency
__all__ = ["BodyDependency", "PathDependency"] __all__ = ["BodyDependency", "PathDependency"]
ModelType = TypeVar("ModelType", bound=DeclarativeBase)
SessionDependency = Callable[[], AsyncGenerator[AsyncSession, None]]
def PathDependency( def PathDependency(
model: type[ModelType], model: type[ModelType],

View File

@@ -10,6 +10,10 @@ from fastapi.responses import JSONResponse
from ..schemas import ErrorResponse, ResponseStatus from ..schemas import ErrorResponse, ResponseStatus
from .exceptions import ApiException from .exceptions import ApiException
_VALIDATION_LOCATION_PARAMS: frozenset[str] = frozenset(
{"body", "query", "path", "header", "cookie"}
)
def init_exceptions_handlers(app: FastAPI) -> FastAPI: def init_exceptions_handlers(app: FastAPI) -> FastAPI:
"""Register exception handlers and custom OpenAPI schema on a FastAPI app. """Register exception handlers and custom OpenAPI schema on a FastAPI app.
@@ -106,9 +110,7 @@ def _format_validation_error(
for error in errors: for error in errors:
field_path = ".".join( field_path = ".".join(
str(loc) str(loc) for loc in error["loc"] if loc not in _VALIDATION_LOCATION_PARAMS
for loc in error["loc"]
if loc not in ("body", "query", "path", "header", "cookie")
) )
formatted_errors.append( formatted_errors.append(
{ {

View File

@@ -1,24 +1,84 @@
"""Fixture loading utilities for database seeding.""" """Fixture loading utilities for database seeding."""
from collections.abc import Callable, Sequence from collections.abc import Callable, Sequence
from typing import Any, TypeVar from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import DeclarativeBase
from ..db import get_transaction from ..db import get_transaction
from ..logger import get_logger from ..logger import get_logger
from ..types import ModelType
from .enum import LoadStrategy from .enum import LoadStrategy
from .registry import Context, FixtureRegistry from .registry import Context, FixtureRegistry
logger = get_logger() logger = get_logger()
T = TypeVar("T", bound=DeclarativeBase)
async def _load_ordered(
session: AsyncSession,
registry: FixtureRegistry,
ordered_names: list[str],
strategy: LoadStrategy,
) -> dict[str, list[DeclarativeBase]]:
"""Load fixtures in order."""
results: dict[str, list[DeclarativeBase]] = {}
for name in ordered_names:
fixture = registry.get(name)
instances = list(fixture.func())
if not instances:
results[name] = []
continue
model_name = type(instances[0]).__name__
loaded: list[DeclarativeBase] = []
async with get_transaction(session):
for instance in instances:
if strategy == LoadStrategy.INSERT:
session.add(instance)
loaded.append(instance)
elif strategy == LoadStrategy.MERGE:
merged = await session.merge(instance)
loaded.append(merged)
else: # LoadStrategy.SKIP_EXISTING
pk = _get_primary_key(instance)
if pk is not None:
existing = await session.get(type(instance), pk)
if existing is None:
session.add(instance)
loaded.append(instance)
else:
session.add(instance)
loaded.append(instance)
results[name] = loaded
logger.info(f"Loaded fixture '{name}': {len(loaded)} {model_name}(s)")
return results
def _get_primary_key(instance: DeclarativeBase) -> Any | None:
"""Get the primary key value of a model instance."""
mapper = instance.__class__.__mapper__
pk_cols = mapper.primary_key
if len(pk_cols) == 1:
return getattr(instance, pk_cols[0].name, None)
pk_values = tuple(getattr(instance, col.name, None) for col in pk_cols)
if all(v is not None for v in pk_values):
return pk_values
return None
def get_obj_by_attr( def get_obj_by_attr(
fixtures: Callable[[], Sequence[T]], attr_name: str, value: Any fixtures: Callable[[], Sequence[ModelType]], attr_name: str, value: Any
) -> T: ) -> ModelType:
"""Get a SQLAlchemy model instance by matching an attribute value. """Get a SQLAlchemy model instance by matching an attribute value.
Args: Args:
@@ -57,13 +117,6 @@ async def load_fixtures(
Returns: Returns:
Dict mapping fixture names to loaded instances Dict mapping fixture names to loaded instances
Example:
```python
# Loads 'roles' first (dependency), then 'users'
result = await load_fixtures(session, fixtures, "users")
print(result["users"]) # [User(...), ...]
```
""" """
ordered = registry.resolve_dependencies(*names) ordered = registry.resolve_dependencies(*names)
return await _load_ordered(session, registry, ordered, strategy) return await _load_ordered(session, registry, ordered, strategy)
@@ -85,76 +138,6 @@ async def load_fixtures_by_context(
Returns: Returns:
Dict mapping fixture names to loaded instances Dict mapping fixture names to loaded instances
Example:
```python
# Load base + testing fixtures
await load_fixtures_by_context(
session, fixtures,
Context.BASE, Context.TESTING
)
```
""" """
ordered = registry.resolve_context_dependencies(*contexts) ordered = registry.resolve_context_dependencies(*contexts)
return await _load_ordered(session, registry, ordered, strategy) return await _load_ordered(session, registry, ordered, strategy)
async def _load_ordered(
session: AsyncSession,
registry: FixtureRegistry,
ordered_names: list[str],
strategy: LoadStrategy,
) -> dict[str, list[DeclarativeBase]]:
"""Load fixtures in order."""
results: dict[str, list[DeclarativeBase]] = {}
for name in ordered_names:
fixture = registry.get(name)
instances = list(fixture.func())
if not instances:
results[name] = []
continue
model_name = type(instances[0]).__name__
loaded: list[DeclarativeBase] = []
async with get_transaction(session):
for instance in instances:
if strategy == LoadStrategy.INSERT:
session.add(instance)
loaded.append(instance)
elif strategy == LoadStrategy.MERGE:
merged = await session.merge(instance)
loaded.append(merged)
elif strategy == LoadStrategy.SKIP_EXISTING:
pk = _get_primary_key(instance)
if pk is not None:
existing = await session.get(type(instance), pk)
if existing is None:
session.add(instance)
loaded.append(instance)
else:
session.add(instance)
loaded.append(instance)
results[name] = loaded
logger.info(f"Loaded fixture '{name}': {len(loaded)} {model_name}(s)")
return results
def _get_primary_key(instance: DeclarativeBase) -> Any | None:
"""Get the primary key value of a model instance."""
mapper = instance.__class__.__mapper__
pk_cols = mapper.primary_key
if len(pk_cols) == 1:
return getattr(instance, pk_cols[0].name, None)
pk_values = tuple(getattr(instance, col.name, None) for col in pk_cols)
if all(v is not None for v in pk_values):
return pk_values
return None

View File

@@ -53,17 +53,23 @@ def init_metrics(
logger.debug("Initialising metric provider '%s'", provider.name) logger.debug("Initialising metric provider '%s'", provider.name)
provider.func() provider.func()
collectors = registry.get_collectors() # Partition collectors and cache env check at startup — both are stable for the app lifetime.
async_collectors = [
c for c in registry.get_collectors() if asyncio.iscoroutinefunction(c.func)
]
sync_collectors = [
c for c in registry.get_collectors() if not asyncio.iscoroutinefunction(c.func)
]
multiprocess_mode = _is_multiprocess()
@app.get(path, include_in_schema=False) @app.get(path, include_in_schema=False)
async def metrics_endpoint() -> Response: async def metrics_endpoint() -> Response:
for collector in collectors: for collector in sync_collectors:
if asyncio.iscoroutinefunction(collector.func): collector.func()
await collector.func() for collector in async_collectors:
else: await collector.func()
collector.func()
if _is_multiprocess(): if multiprocess_mode:
prom_registry = CollectorRegistry() prom_registry = CollectorRegistry()
multiprocess.MultiProcessCollector(prom_registry) multiprocess.MultiProcessCollector(prom_registry)
output = generate_latest(prom_registry) output = generate_latest(prom_registry)

View File

@@ -1,24 +1,23 @@
"""Base Pydantic schemas for API responses.""" """Base Pydantic schemas for API responses."""
from enum import Enum from enum import Enum
from typing import Any, ClassVar, Generic, TypeVar from typing import Any, ClassVar, Generic
from pydantic import BaseModel, ConfigDict from pydantic import BaseModel, ConfigDict
from .types import DataT
__all__ = [ __all__ = [
"ApiError", "ApiError",
"CursorPagination", "CursorPagination",
"ErrorResponse", "ErrorResponse",
"OffsetPagination", "OffsetPagination",
"Pagination",
"PaginatedResponse", "PaginatedResponse",
"PydanticBase", "PydanticBase",
"Response", "Response",
"ResponseStatus", "ResponseStatus",
] ]
DataT = TypeVar("DataT")
class PydanticBase(BaseModel): class PydanticBase(BaseModel):
"""Base class for all Pydantic models with common configuration.""" """Base class for all Pydantic models with common configuration."""
@@ -108,10 +107,6 @@ class OffsetPagination(PydanticBase):
has_more: bool has_more: bool
# Backward-compatible - will be removed in v2.0
Pagination = OffsetPagination
class CursorPagination(PydanticBase): class CursorPagination(PydanticBase):
"""Pagination metadata for cursor-based list responses. """Pagination metadata for cursor-based list responses.

View File

@@ -0,0 +1,27 @@
"""Shared type aliases for the fastapi-toolsets package."""
from collections.abc import AsyncGenerator, Callable, Mapping
from typing import Any, TypeVar
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase, QueryableAttribute
from sqlalchemy.orm.attributes import InstrumentedAttribute
from sqlalchemy.sql.elements import ColumnElement
# Generic TypeVars
DataT = TypeVar("DataT")
ModelType = TypeVar("ModelType", bound=DeclarativeBase)
SchemaType = TypeVar("SchemaType", bound=BaseModel)
# CRUD type aliases
JoinType = list[tuple[type[DeclarativeBase], Any]]
M2MFieldType = Mapping[str, QueryableAttribute[Any]]
OrderByClause = ColumnElement[Any] | QueryableAttribute[Any]
# Search / facet type aliases
SearchFieldType = InstrumentedAttribute[Any] | tuple[InstrumentedAttribute[Any], ...]
FacetFieldType = SearchFieldType
# Dependency type aliases
SessionDependency = Callable[[], AsyncGenerator[AsyncSession, None]]

View File

@@ -92,6 +92,15 @@ class IntRole(Base):
name: Mapped[str] = mapped_column(String(50), unique=True) name: Mapped[str] = mapped_column(String(50), unique=True)
class Permission(Base):
"""Test model with composite primary key."""
__tablename__ = "permissions"
subject: Mapped[str] = mapped_column(String(50), primary_key=True)
action: Mapped[str] = mapped_column(String(50), primary_key=True)
class Event(Base): class Event(Base):
"""Test model with DateTime and Date cursor columns.""" """Test model with DateTime and Date cursor columns."""
@@ -162,6 +171,7 @@ class UserRead(PydanticBase):
id: uuid.UUID id: uuid.UUID
username: str username: str
is_active: bool = True
class UserUpdate(BaseModel): class UserUpdate(BaseModel):
@@ -218,12 +228,26 @@ class PostM2MUpdate(BaseModel):
tag_ids: list[uuid.UUID] | None = None tag_ids: list[uuid.UUID] | None = None
class IntRoleRead(PydanticBase):
"""Schema for reading an IntRole."""
id: int
name: str
class IntRoleCreate(BaseModel): class IntRoleCreate(BaseModel):
"""Schema for creating an IntRole.""" """Schema for creating an IntRole."""
name: str name: str
class EventRead(PydanticBase):
"""Schema for reading an Event."""
id: uuid.UUID
name: str
class EventCreate(BaseModel): class EventCreate(BaseModel):
"""Schema for creating an Event.""" """Schema for creating an Event."""
@@ -232,6 +256,13 @@ class EventCreate(BaseModel):
scheduled_date: datetime.date scheduled_date: datetime.date
class ProductRead(PydanticBase):
"""Schema for reading a Product."""
id: uuid.UUID
name: str
class ProductCreate(BaseModel): class ProductCreate(BaseModel):
"""Schema for creating a Product.""" """Schema for creating a Product."""

View File

@@ -15,8 +15,10 @@ from .conftest import (
EventCrud, EventCrud,
EventDateCursorCrud, EventDateCursorCrud,
EventDateTimeCursorCrud, EventDateTimeCursorCrud,
EventRead,
IntRoleCreate, IntRoleCreate,
IntRoleCursorCrud, IntRoleCursorCrud,
IntRoleRead,
Post, Post,
PostCreate, PostCreate,
PostCrud, PostCrud,
@@ -26,6 +28,7 @@ from .conftest import (
ProductCreate, ProductCreate,
ProductCrud, ProductCrud,
ProductNumericCursorCrud, ProductNumericCursorCrud,
ProductRead,
Role, Role,
RoleCreate, RoleCreate,
RoleCrud, RoleCrud,
@@ -169,7 +172,14 @@ class TestDefaultLoadOptionsIntegration:
async def test_default_load_options_applied_to_paginate( async def test_default_load_options_applied_to_paginate(
self, db_session: AsyncSession self, db_session: AsyncSession
): ):
"""default_load_options loads relationships automatically on paginate().""" """default_load_options loads relationships automatically on offset_paginate()."""
from fastapi_toolsets.schemas import PydanticBase
class UserWithRoleRead(PydanticBase):
id: uuid.UUID
username: str
role: RoleRead | None = None
UserWithDefaultLoad = CrudFactory( UserWithDefaultLoad = CrudFactory(
User, default_load_options=[selectinload(User.role)] User, default_load_options=[selectinload(User.role)]
) )
@@ -178,7 +188,9 @@ class TestDefaultLoadOptionsIntegration:
db_session, db_session,
UserCreate(username="alice", email="alice@test.com", role_id=role.id), UserCreate(username="alice", email="alice@test.com", role_id=role.id),
) )
result = await UserWithDefaultLoad.paginate(db_session) result = await UserWithDefaultLoad.offset_paginate(
db_session, schema=UserWithRoleRead
)
assert result.data[0].role is not None assert result.data[0].role is not None
assert result.data[0].role.name == "admin" assert result.data[0].role.name == "admin"
@@ -430,7 +442,7 @@ class TestCrudDelete:
role = await RoleCrud.create(db_session, RoleCreate(name="to_delete")) role = await RoleCrud.create(db_session, RoleCreate(name="to_delete"))
result = await RoleCrud.delete(db_session, [Role.id == role.id]) result = await RoleCrud.delete(db_session, [Role.id == role.id])
assert result is True assert result is None
assert await RoleCrud.first(db_session, [Role.id == role.id]) is None assert await RoleCrud.first(db_session, [Role.id == role.id]) is None
@pytest.mark.anyio @pytest.mark.anyio
@@ -454,6 +466,20 @@ class TestCrudDelete:
assert len(remaining) == 1 assert len(remaining) == 1
assert remaining[0].username == "u3" assert remaining[0].username == "u3"
@pytest.mark.anyio
async def test_delete_return_response(self, db_session: AsyncSession):
"""Delete with return_response=True returns Response[None]."""
from fastapi_toolsets.schemas import Response
role = await RoleCrud.create(db_session, RoleCreate(name="to_delete_resp"))
result = await RoleCrud.delete(
db_session, [Role.id == role.id], return_response=True
)
assert isinstance(result, Response)
assert result.data is None
assert await RoleCrud.first(db_session, [Role.id == role.id]) is None
class TestCrudExists: class TestCrudExists:
"""Tests for CRUD exists operations.""" """Tests for CRUD exists operations."""
@@ -594,7 +620,9 @@ class TestCrudPaginate:
from fastapi_toolsets.schemas import OffsetPagination from fastapi_toolsets.schemas import OffsetPagination
result = await RoleCrud.paginate(db_session, page=1, items_per_page=10) result = await RoleCrud.offset_paginate(
db_session, page=1, items_per_page=10, schema=RoleRead
)
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
assert len(result.data) == 10 assert len(result.data) == 10
@@ -609,7 +637,9 @@ class TestCrudPaginate:
for i in range(25): for i in range(25):
await RoleCrud.create(db_session, RoleCreate(name=f"role{i:02d}")) await RoleCrud.create(db_session, RoleCreate(name=f"role{i:02d}"))
result = await RoleCrud.paginate(db_session, page=3, items_per_page=10) result = await RoleCrud.offset_paginate(
db_session, page=3, items_per_page=10, schema=RoleRead
)
assert len(result.data) == 5 assert len(result.data) == 5
assert result.pagination.has_more is False assert result.pagination.has_more is False
@@ -629,11 +659,12 @@ class TestCrudPaginate:
from fastapi_toolsets.schemas import OffsetPagination from fastapi_toolsets.schemas import OffsetPagination
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
filters=[User.is_active == True], # noqa: E712 filters=[User.is_active == True], # noqa: E712
page=1, page=1,
items_per_page=10, items_per_page=10,
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -646,11 +677,12 @@ class TestCrudPaginate:
await RoleCrud.create(db_session, RoleCreate(name="alpha")) await RoleCrud.create(db_session, RoleCreate(name="alpha"))
await RoleCrud.create(db_session, RoleCreate(name="bravo")) await RoleCrud.create(db_session, RoleCreate(name="bravo"))
result = await RoleCrud.paginate( result = await RoleCrud.offset_paginate(
db_session, db_session,
order_by=Role.name, order_by=Role.name,
page=1, page=1,
items_per_page=10, items_per_page=10,
schema=RoleRead,
) )
names = [r.name for r in result.data] names = [r.name for r in result.data]
@@ -855,12 +887,13 @@ class TestCrudJoins:
from fastapi_toolsets.schemas import OffsetPagination from fastapi_toolsets.schemas import OffsetPagination
# Paginate users with published posts # Paginate users with published posts
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
joins=[(Post, Post.author_id == User.id)], joins=[(Post, Post.author_id == User.id)],
filters=[Post.is_published == True], # noqa: E712 filters=[Post.is_published == True], # noqa: E712
page=1, page=1,
items_per_page=10, items_per_page=10,
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -889,12 +922,13 @@ class TestCrudJoins:
from fastapi_toolsets.schemas import OffsetPagination from fastapi_toolsets.schemas import OffsetPagination
# Paginate with outer join # Paginate with outer join
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
joins=[(Post, Post.author_id == User.id)], joins=[(Post, Post.author_id == User.id)],
outer_join=True, outer_join=True,
page=1, page=1,
items_per_page=10, items_per_page=10,
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -931,70 +965,6 @@ class TestCrudJoins:
assert users[0].username == "multi_join" assert users[0].username == "multi_join"
class TestAsResponse:
"""Tests for as_response parameter (deprecated, kept for backward compat)."""
@pytest.mark.anyio
async def test_create_as_response(self, db_session: AsyncSession):
"""Create with as_response=True returns Response and emits DeprecationWarning."""
from fastapi_toolsets.schemas import Response
data = RoleCreate(name="response_role")
with pytest.warns(DeprecationWarning, match="as_response is deprecated"):
result = await RoleCrud.create(db_session, data, as_response=True)
assert isinstance(result, Response)
assert result.data is not None
assert result.data.name == "response_role"
@pytest.mark.anyio
async def test_get_as_response(self, db_session: AsyncSession):
"""Get with as_response=True returns Response and emits DeprecationWarning."""
from fastapi_toolsets.schemas import Response
created = await RoleCrud.create(db_session, RoleCreate(name="get_response"))
with pytest.warns(DeprecationWarning, match="as_response is deprecated"):
result = await RoleCrud.get(
db_session, [Role.id == created.id], as_response=True
)
assert isinstance(result, Response)
assert result.data is not None
assert result.data.id == created.id
@pytest.mark.anyio
async def test_update_as_response(self, db_session: AsyncSession):
"""Update with as_response=True returns Response and emits DeprecationWarning."""
from fastapi_toolsets.schemas import Response
created = await RoleCrud.create(db_session, RoleCreate(name="old_name"))
with pytest.warns(DeprecationWarning, match="as_response is deprecated"):
result = await RoleCrud.update(
db_session,
RoleUpdate(name="new_name"),
[Role.id == created.id],
as_response=True,
)
assert isinstance(result, Response)
assert result.data is not None
assert result.data.name == "new_name"
@pytest.mark.anyio
async def test_delete_as_response(self, db_session: AsyncSession):
"""Delete with as_response=True returns Response and emits DeprecationWarning."""
from fastapi_toolsets.schemas import Response
created = await RoleCrud.create(db_session, RoleCreate(name="to_delete"))
with pytest.warns(DeprecationWarning, match="as_response is deprecated"):
result = await RoleCrud.delete(
db_session, [Role.id == created.id], as_response=True
)
assert isinstance(result, Response)
assert result.data is None
class TestCrudFactoryM2M: class TestCrudFactoryM2M:
"""Tests for CrudFactory with m2m_fields parameter.""" """Tests for CrudFactory with m2m_fields parameter."""
@@ -1475,92 +1445,35 @@ class TestSchemaResponse:
assert isinstance(result, Response) assert isinstance(result, Response)
@pytest.mark.anyio @pytest.mark.anyio
async def test_paginate_with_schema(self, db_session: AsyncSession): async def test_offset_paginate_with_schema(self, db_session: AsyncSession):
"""paginate with schema returns PaginatedResponse[SchemaType].""" """offset_paginate with schema returns PaginatedResponse[SchemaType]."""
from fastapi_toolsets.schemas import PaginatedResponse from fastapi_toolsets.schemas import PaginatedResponse
await RoleCrud.create(db_session, RoleCreate(name="p_role1")) await RoleCrud.create(db_session, RoleCreate(name="p_role1"))
await RoleCrud.create(db_session, RoleCreate(name="p_role2")) await RoleCrud.create(db_session, RoleCreate(name="p_role2"))
result = await RoleCrud.paginate(db_session, schema=RoleRead) result = await RoleCrud.offset_paginate(db_session, schema=RoleRead)
assert isinstance(result, PaginatedResponse) assert isinstance(result, PaginatedResponse)
assert len(result.data) == 2 assert len(result.data) == 2
assert all(isinstance(item, RoleRead) for item in result.data) assert all(isinstance(item, RoleRead) for item in result.data)
@pytest.mark.anyio @pytest.mark.anyio
async def test_paginate_schema_filters_fields(self, db_session: AsyncSession): async def test_offset_paginate_schema_filters_fields(
"""paginate with schema only exposes schema fields per item.""" self, db_session: AsyncSession
):
"""offset_paginate with schema only exposes schema fields per item."""
await UserCrud.create( await UserCrud.create(
db_session, db_session,
UserCreate(username="pg_user", email="pg@test.com"), UserCreate(username="pg_user", email="pg@test.com"),
) )
result = await UserCrud.paginate(db_session, schema=UserRead) result = await UserCrud.offset_paginate(db_session, schema=UserRead)
assert isinstance(result.data[0], UserRead) assert isinstance(result.data[0], UserRead)
assert result.data[0].username == "pg_user" assert result.data[0].username == "pg_user"
assert not hasattr(result.data[0], "email") assert not hasattr(result.data[0], "email")
@pytest.mark.anyio
async def test_as_response_true_without_schema_unchanged(
self, db_session: AsyncSession
):
"""as_response=True without schema still returns Response[ModelType] with a warning."""
from fastapi_toolsets.schemas import Response
created = await RoleCrud.create(db_session, RoleCreate(name="compat"))
with pytest.warns(DeprecationWarning, match="as_response is deprecated"):
result = await RoleCrud.get(
db_session, [Role.id == created.id], as_response=True
)
assert isinstance(result, Response)
assert isinstance(result.data, Role)
@pytest.mark.anyio
async def test_schema_with_explicit_as_response_true(
self, db_session: AsyncSession
):
"""schema combined with explicit as_response=True works correctly."""
from fastapi_toolsets.schemas import Response
created = await RoleCrud.create(db_session, RoleCreate(name="combined"))
result = await RoleCrud.get(
db_session,
[Role.id == created.id],
as_response=True,
schema=RoleRead,
)
assert isinstance(result, Response)
assert isinstance(result.data, RoleRead)
class TestPaginateAlias:
"""Tests that paginate is a backward-compatible alias for offset_paginate."""
def test_paginate_is_alias_of_offset_paginate(self):
"""paginate and offset_paginate are the same underlying function."""
assert RoleCrud.paginate.__func__ is RoleCrud.offset_paginate.__func__
@pytest.mark.anyio
async def test_paginate_alias_returns_offset_pagination(
self, db_session: AsyncSession
):
"""paginate() still works and returns PaginatedResponse with OffsetPagination."""
from fastapi_toolsets.schemas import OffsetPagination, PaginatedResponse
for i in range(3):
await RoleCrud.create(db_session, RoleCreate(name=f"role{i:02d}"))
result = await RoleCrud.paginate(db_session, page=1, items_per_page=10)
assert isinstance(result, PaginatedResponse)
assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 3
assert result.pagination.page == 1
class TestCursorPaginate: class TestCursorPaginate:
"""Tests for cursor-based pagination via cursor_paginate().""" """Tests for cursor-based pagination via cursor_paginate()."""
@@ -1573,7 +1486,9 @@ class TestCursorPaginate:
for i in range(25): for i in range(25):
await RoleCrud.create(db_session, RoleCreate(name=f"role{i:02d}")) await RoleCrud.create(db_session, RoleCreate(name=f"role{i:02d}"))
result = await RoleCursorCrud.cursor_paginate(db_session, items_per_page=10) result = await RoleCursorCrud.cursor_paginate(
db_session, items_per_page=10, schema=RoleRead
)
assert isinstance(result, PaginatedResponse) assert isinstance(result, PaginatedResponse)
assert isinstance(result.pagination, CursorPagination) assert isinstance(result.pagination, CursorPagination)
@@ -1591,7 +1506,9 @@ class TestCursorPaginate:
for i in range(5): for i in range(5):
await RoleCrud.create(db_session, RoleCreate(name=f"role{i:02d}")) await RoleCrud.create(db_session, RoleCreate(name=f"role{i:02d}"))
result = await RoleCursorCrud.cursor_paginate(db_session, items_per_page=10) result = await RoleCursorCrud.cursor_paginate(
db_session, items_per_page=10, schema=RoleRead
)
assert isinstance(result.pagination, CursorPagination) assert isinstance(result.pagination, CursorPagination)
assert len(result.data) == 5 assert len(result.data) == 5
@@ -1606,14 +1523,16 @@ class TestCursorPaginate:
from fastapi_toolsets.schemas import CursorPagination from fastapi_toolsets.schemas import CursorPagination
page1 = await RoleCursorCrud.cursor_paginate(db_session, items_per_page=10) page1 = await RoleCursorCrud.cursor_paginate(
db_session, items_per_page=10, schema=RoleRead
)
assert isinstance(page1.pagination, CursorPagination) assert isinstance(page1.pagination, CursorPagination)
assert len(page1.data) == 10 assert len(page1.data) == 10
assert page1.pagination.has_more is True assert page1.pagination.has_more is True
cursor = page1.pagination.next_cursor cursor = page1.pagination.next_cursor
page2 = await RoleCursorCrud.cursor_paginate( page2 = await RoleCursorCrud.cursor_paginate(
db_session, cursor=cursor, items_per_page=10 db_session, cursor=cursor, items_per_page=10, schema=RoleRead
) )
assert isinstance(page2.pagination, CursorPagination) assert isinstance(page2.pagination, CursorPagination)
assert len(page2.data) == 5 assert len(page2.data) == 5
@@ -1628,12 +1547,15 @@ class TestCursorPaginate:
from fastapi_toolsets.schemas import CursorPagination from fastapi_toolsets.schemas import CursorPagination
page1 = await RoleCursorCrud.cursor_paginate(db_session, items_per_page=4) page1 = await RoleCursorCrud.cursor_paginate(
db_session, items_per_page=4, schema=RoleRead
)
assert isinstance(page1.pagination, CursorPagination) assert isinstance(page1.pagination, CursorPagination)
page2 = await RoleCursorCrud.cursor_paginate( page2 = await RoleCursorCrud.cursor_paginate(
db_session, db_session,
cursor=page1.pagination.next_cursor, cursor=page1.pagination.next_cursor,
items_per_page=4, items_per_page=4,
schema=RoleRead,
) )
ids_page1 = {r.id for r in page1.data} ids_page1 = {r.id for r in page1.data}
@@ -1646,7 +1568,9 @@ class TestCursorPaginate:
"""cursor_paginate on an empty table returns empty data with no cursor.""" """cursor_paginate on an empty table returns empty data with no cursor."""
from fastapi_toolsets.schemas import CursorPagination from fastapi_toolsets.schemas import CursorPagination
result = await RoleCursorCrud.cursor_paginate(db_session, items_per_page=10) result = await RoleCursorCrud.cursor_paginate(
db_session, items_per_page=10, schema=RoleRead
)
assert isinstance(result.pagination, CursorPagination) assert isinstance(result.pagination, CursorPagination)
assert result.data == [] assert result.data == []
@@ -1671,6 +1595,7 @@ class TestCursorPaginate:
db_session, db_session,
filters=[User.is_active == True], # noqa: E712 filters=[User.is_active == True], # noqa: E712
items_per_page=20, items_per_page=20,
schema=UserRead,
) )
assert len(result.data) == 5 assert len(result.data) == 5
@@ -1703,7 +1628,9 @@ class TestCursorPaginate:
for i in range(5): for i in range(5):
await RoleNameCrud.create(db_session, RoleCreate(name=f"role{i:02d}")) await RoleNameCrud.create(db_session, RoleCreate(name=f"role{i:02d}"))
result = await RoleNameCrud.cursor_paginate(db_session, items_per_page=3) result = await RoleNameCrud.cursor_paginate(
db_session, items_per_page=3, schema=RoleRead
)
assert isinstance(result.pagination, CursorPagination) assert isinstance(result.pagination, CursorPagination)
assert len(result.data) == 3 assert len(result.data) == 3
@@ -1714,7 +1641,7 @@ class TestCursorPaginate:
async def test_raises_without_cursor_column(self, db_session: AsyncSession): async def test_raises_without_cursor_column(self, db_session: AsyncSession):
"""cursor_paginate raises ValueError when cursor_column is not configured.""" """cursor_paginate raises ValueError when cursor_column is not configured."""
with pytest.raises(ValueError, match="cursor_column is not set"): with pytest.raises(ValueError, match="cursor_column is not set"):
await RoleCrud.cursor_paginate(db_session) await RoleCrud.cursor_paginate(db_session, schema=RoleRead)
class TestCursorPaginatePrevCursor: class TestCursorPaginatePrevCursor:
@@ -1728,7 +1655,9 @@ class TestCursorPaginatePrevCursor:
from fastapi_toolsets.schemas import CursorPagination from fastapi_toolsets.schemas import CursorPagination
result = await RoleCursorCrud.cursor_paginate(db_session, items_per_page=3) result = await RoleCursorCrud.cursor_paginate(
db_session, items_per_page=3, schema=RoleRead
)
assert isinstance(result.pagination, CursorPagination) assert isinstance(result.pagination, CursorPagination)
assert result.pagination.prev_cursor is None assert result.pagination.prev_cursor is None
@@ -1741,12 +1670,15 @@ class TestCursorPaginatePrevCursor:
from fastapi_toolsets.schemas import CursorPagination from fastapi_toolsets.schemas import CursorPagination
page1 = await RoleCursorCrud.cursor_paginate(db_session, items_per_page=5) page1 = await RoleCursorCrud.cursor_paginate(
db_session, items_per_page=5, schema=RoleRead
)
assert isinstance(page1.pagination, CursorPagination) assert isinstance(page1.pagination, CursorPagination)
page2 = await RoleCursorCrud.cursor_paginate( page2 = await RoleCursorCrud.cursor_paginate(
db_session, db_session,
cursor=page1.pagination.next_cursor, cursor=page1.pagination.next_cursor,
items_per_page=5, items_per_page=5,
schema=RoleRead,
) )
assert isinstance(page2.pagination, CursorPagination) assert isinstance(page2.pagination, CursorPagination)
assert page2.pagination.prev_cursor is not None assert page2.pagination.prev_cursor is not None
@@ -1762,12 +1694,15 @@ class TestCursorPaginatePrevCursor:
from fastapi_toolsets.schemas import CursorPagination from fastapi_toolsets.schemas import CursorPagination
page1 = await RoleCursorCrud.cursor_paginate(db_session, items_per_page=5) page1 = await RoleCursorCrud.cursor_paginate(
db_session, items_per_page=5, schema=RoleRead
)
assert isinstance(page1.pagination, CursorPagination) assert isinstance(page1.pagination, CursorPagination)
page2 = await RoleCursorCrud.cursor_paginate( page2 = await RoleCursorCrud.cursor_paginate(
db_session, db_session,
cursor=page1.pagination.next_cursor, cursor=page1.pagination.next_cursor,
items_per_page=5, items_per_page=5,
schema=RoleRead,
) )
assert isinstance(page2.pagination, CursorPagination) assert isinstance(page2.pagination, CursorPagination)
assert page2.pagination.prev_cursor is not None assert page2.pagination.prev_cursor is not None
@@ -1802,6 +1737,7 @@ class TestCursorPaginateWithSearch:
db_session, db_session,
search="admin", search="admin",
items_per_page=20, items_per_page=20,
schema=RoleRead,
) )
assert len(result.data) == 5 assert len(result.data) == 5
@@ -1836,6 +1772,7 @@ class TestCursorPaginateExtraOptions:
db_session, db_session,
joins=[(Role, User.role_id == Role.id)], joins=[(Role, User.role_id == Role.id)],
items_per_page=20, items_per_page=20,
schema=UserRead,
) )
assert isinstance(result.pagination, CursorPagination) assert isinstance(result.pagination, CursorPagination)
@@ -1867,6 +1804,7 @@ class TestCursorPaginateExtraOptions:
joins=[(Role, User.role_id == Role.id)], joins=[(Role, User.role_id == Role.id)],
outer_join=True, outer_join=True,
items_per_page=20, items_per_page=20,
schema=UserRead,
) )
assert isinstance(result.pagination, CursorPagination) assert isinstance(result.pagination, CursorPagination)
@@ -1876,7 +1814,12 @@ class TestCursorPaginateExtraOptions:
@pytest.mark.anyio @pytest.mark.anyio
async def test_with_load_options(self, db_session: AsyncSession): async def test_with_load_options(self, db_session: AsyncSession):
"""cursor_paginate passes load_options to the query.""" """cursor_paginate passes load_options to the query."""
from fastapi_toolsets.schemas import CursorPagination from fastapi_toolsets.schemas import CursorPagination, PydanticBase
class UserWithRoleRead(PydanticBase):
id: uuid.UUID
username: str
role: RoleRead | None = None
role = await RoleCrud.create(db_session, RoleCreate(name="manager")) role = await RoleCrud.create(db_session, RoleCreate(name="manager"))
for i in range(3): for i in range(3):
@@ -1893,6 +1836,7 @@ class TestCursorPaginateExtraOptions:
db_session, db_session,
load_options=[selectinload(User.role)], load_options=[selectinload(User.role)],
items_per_page=20, items_per_page=20,
schema=UserWithRoleRead,
) )
assert isinstance(result.pagination, CursorPagination) assert isinstance(result.pagination, CursorPagination)
@@ -1912,6 +1856,7 @@ class TestCursorPaginateExtraOptions:
db_session, db_session,
order_by=Role.name.desc(), order_by=Role.name.desc(),
items_per_page=3, items_per_page=3,
schema=RoleRead,
) )
assert isinstance(result.pagination, CursorPagination) assert isinstance(result.pagination, CursorPagination)
@@ -1925,7 +1870,9 @@ class TestCursorPaginateExtraOptions:
for i in range(5): for i in range(5):
await IntRoleCursorCrud.create(db_session, IntRoleCreate(name=f"role{i}")) await IntRoleCursorCrud.create(db_session, IntRoleCreate(name=f"role{i}"))
page1 = await IntRoleCursorCrud.cursor_paginate(db_session, items_per_page=3) page1 = await IntRoleCursorCrud.cursor_paginate(
db_session, items_per_page=3, schema=IntRoleRead
)
assert isinstance(page1.pagination, CursorPagination) assert isinstance(page1.pagination, CursorPagination)
assert len(page1.data) == 3 assert len(page1.data) == 3
@@ -1935,6 +1882,7 @@ class TestCursorPaginateExtraOptions:
db_session, db_session,
cursor=page1.pagination.next_cursor, cursor=page1.pagination.next_cursor,
items_per_page=3, items_per_page=3,
schema=IntRoleRead,
) )
assert isinstance(page2.pagination, CursorPagination) assert isinstance(page2.pagination, CursorPagination)
@@ -1955,7 +1903,9 @@ class TestCursorPaginateExtraOptions:
await RoleCrud.create(db_session, RoleCreate(name="role01")) await RoleCrud.create(db_session, RoleCreate(name="role01"))
# First page succeeds (no cursor to decode) # First page succeeds (no cursor to decode)
page1 = await RoleNameCursorCrud.cursor_paginate(db_session, items_per_page=1) page1 = await RoleNameCursorCrud.cursor_paginate(
db_session, items_per_page=1, schema=RoleRead
)
assert page1.pagination.has_more is True assert page1.pagination.has_more is True
assert isinstance(page1.pagination, CursorPagination) assert isinstance(page1.pagination, CursorPagination)
@@ -1965,6 +1915,7 @@ class TestCursorPaginateExtraOptions:
db_session, db_session,
cursor=page1.pagination.next_cursor, cursor=page1.pagination.next_cursor,
items_per_page=1, items_per_page=1,
schema=RoleRead,
) )
@@ -2003,6 +1954,7 @@ class TestCursorPaginateSearchJoins:
search="administrator", search="administrator",
search_fields=[(User.role, Role.name)], search_fields=[(User.role, Role.name)],
items_per_page=20, items_per_page=20,
schema=UserRead,
) )
assert isinstance(result.pagination, CursorPagination) assert isinstance(result.pagination, CursorPagination)
@@ -2049,7 +2001,7 @@ class TestCursorPaginateColumnTypes:
) )
page1 = await EventDateTimeCursorCrud.cursor_paginate( page1 = await EventDateTimeCursorCrud.cursor_paginate(
db_session, items_per_page=3 db_session, items_per_page=3, schema=EventRead
) )
assert isinstance(page1.pagination, CursorPagination) assert isinstance(page1.pagination, CursorPagination)
@@ -2060,6 +2012,7 @@ class TestCursorPaginateColumnTypes:
db_session, db_session,
cursor=page1.pagination.next_cursor, cursor=page1.pagination.next_cursor,
items_per_page=3, items_per_page=3,
schema=EventRead,
) )
assert isinstance(page2.pagination, CursorPagination) assert isinstance(page2.pagination, CursorPagination)
@@ -2087,7 +2040,9 @@ class TestCursorPaginateColumnTypes:
), ),
) )
page1 = await EventDateCursorCrud.cursor_paginate(db_session, items_per_page=3) page1 = await EventDateCursorCrud.cursor_paginate(
db_session, items_per_page=3, schema=EventRead
)
assert isinstance(page1.pagination, CursorPagination) assert isinstance(page1.pagination, CursorPagination)
assert len(page1.data) == 3 assert len(page1.data) == 3
@@ -2097,6 +2052,7 @@ class TestCursorPaginateColumnTypes:
db_session, db_session,
cursor=page1.pagination.next_cursor, cursor=page1.pagination.next_cursor,
items_per_page=3, items_per_page=3,
schema=EventRead,
) )
assert isinstance(page2.pagination, CursorPagination) assert isinstance(page2.pagination, CursorPagination)
@@ -2123,7 +2079,7 @@ class TestCursorPaginateColumnTypes:
) )
page1 = await ProductNumericCursorCrud.cursor_paginate( page1 = await ProductNumericCursorCrud.cursor_paginate(
db_session, items_per_page=3 db_session, items_per_page=3, schema=ProductRead
) )
assert isinstance(page1.pagination, CursorPagination) assert isinstance(page1.pagination, CursorPagination)
@@ -2134,6 +2090,7 @@ class TestCursorPaginateColumnTypes:
db_session, db_session,
cursor=page1.pagination.next_cursor, cursor=page1.pagination.next_cursor,
items_per_page=3, items_per_page=3,
schema=ProductRead,
) )
assert isinstance(page2.pagination, CursorPagination) assert isinstance(page2.pagination, CursorPagination)

View File

@@ -23,6 +23,7 @@ from .conftest import (
User, User,
UserCreate, UserCreate,
UserCrud, UserCrud,
UserRead,
) )
@@ -42,10 +43,11 @@ class TestPaginateSearch:
db_session, UserCreate(username="bob_smith", email="bob@test.com") db_session, UserCreate(username="bob_smith", email="bob@test.com")
) )
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
search="doe", search="doe",
search_fields=[User.username], search_fields=[User.username],
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -61,10 +63,11 @@ class TestPaginateSearch:
db_session, UserCreate(username="company_bob", email="bob@other.com") db_session, UserCreate(username="company_bob", email="bob@other.com")
) )
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
search="company", search="company",
search_fields=[User.username, User.email], search_fields=[User.username, User.email],
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -89,10 +92,11 @@ class TestPaginateSearch:
UserCreate(username="user1", email="u1@test.com", role_id=user_role.id), UserCreate(username="user1", email="u1@test.com", role_id=user_role.id),
) )
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
search="admin", search="admin",
search_fields=[(User.role, Role.name)], search_fields=[(User.role, Role.name)],
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -108,10 +112,11 @@ class TestPaginateSearch:
) )
# Search "admin" in username OR role.name # Search "admin" in username OR role.name
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
search="admin", search="admin",
search_fields=[User.username, (User.role, Role.name)], search_fields=[User.username, (User.role, Role.name)],
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -124,10 +129,11 @@ class TestPaginateSearch:
db_session, UserCreate(username="JohnDoe", email="j@test.com") db_session, UserCreate(username="JohnDoe", email="j@test.com")
) )
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
search="johndoe", search="johndoe",
search_fields=[User.username], search_fields=[User.username],
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -141,19 +147,21 @@ class TestPaginateSearch:
) )
# Should not find (case mismatch) # Should not find (case mismatch)
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
search=SearchConfig(query="johndoe", case_sensitive=True), search=SearchConfig(query="johndoe", case_sensitive=True),
search_fields=[User.username], search_fields=[User.username],
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 0 assert result.pagination.total_count == 0
# Should find (case match) # Should find (case match)
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
search=SearchConfig(query="JohnDoe", case_sensitive=True), search=SearchConfig(query="JohnDoe", case_sensitive=True),
search_fields=[User.username], search_fields=[User.username],
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 1 assert result.pagination.total_count == 1
@@ -168,11 +176,13 @@ class TestPaginateSearch:
db_session, UserCreate(username="user2", email="u2@test.com") db_session, UserCreate(username="user2", email="u2@test.com")
) )
result = await UserCrud.paginate(db_session, search="") result = await UserCrud.offset_paginate(db_session, search="", schema=UserRead)
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 2 assert result.pagination.total_count == 2
result = await UserCrud.paginate(db_session, search=None) result = await UserCrud.offset_paginate(
db_session, search=None, schema=UserRead
)
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 2 assert result.pagination.total_count == 2
@@ -188,11 +198,12 @@ class TestPaginateSearch:
UserCreate(username="inactive_john", email="ij@test.com", is_active=False), UserCreate(username="inactive_john", email="ij@test.com", is_active=False),
) )
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
filters=[User.is_active == True], # noqa: E712 filters=[User.is_active == True], # noqa: E712
search="john", search="john",
search_fields=[User.username], search_fields=[User.username],
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -206,7 +217,9 @@ class TestPaginateSearch:
db_session, UserCreate(username="findme", email="other@test.com") db_session, UserCreate(username="findme", email="other@test.com")
) )
result = await UserCrud.paginate(db_session, search="findme") result = await UserCrud.offset_paginate(
db_session, search="findme", schema=UserRead
)
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 1 assert result.pagination.total_count == 1
@@ -218,10 +231,11 @@ class TestPaginateSearch:
db_session, UserCreate(username="john", email="j@test.com") db_session, UserCreate(username="john", email="j@test.com")
) )
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
search="nonexistent", search="nonexistent",
search_fields=[User.username], search_fields=[User.username],
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -237,12 +251,13 @@ class TestPaginateSearch:
UserCreate(username=f"user_{i}", email=f"user{i}@test.com"), UserCreate(username=f"user_{i}", email=f"user{i}@test.com"),
) )
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
search="user_", search="user_",
search_fields=[User.username], search_fields=[User.username],
page=1, page=1,
items_per_page=5, items_per_page=5,
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -264,10 +279,11 @@ class TestPaginateSearch:
) )
# Search in username, not in role # Search in username, not in role
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
search="role", search="role",
search_fields=[User.username], search_fields=[User.username],
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -286,11 +302,12 @@ class TestPaginateSearch:
db_session, UserCreate(username="bob", email="b@test.com") db_session, UserCreate(username="bob", email="b@test.com")
) )
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
search="@test.com", search="@test.com",
search_fields=[User.email], search_fields=[User.email],
order_by=User.username, order_by=User.username,
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -310,10 +327,11 @@ class TestPaginateSearch:
) )
# Search by UUID (partial match) # Search by UUID (partial match)
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
search="12345678", search="12345678",
search_fields=[User.id, User.username], search_fields=[User.id, User.username],
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -363,10 +381,11 @@ class TestSearchConfig:
) )
# 'john' must be in username AND email # 'john' must be in username AND email
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
search=SearchConfig(query="john", match_mode="all"), search=SearchConfig(query="john", match_mode="all"),
search_fields=[User.username, User.email], search_fields=[User.username, User.email],
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -380,9 +399,10 @@ class TestSearchConfig:
db_session, UserCreate(username="test", email="findme@test.com") db_session, UserCreate(username="test", email="findme@test.com")
) )
result = await UserCrud.paginate( result = await UserCrud.offset_paginate(
db_session, db_session,
search=SearchConfig(query="findme", fields=[User.email]), search=SearchConfig(query="findme", fields=[User.email]),
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -478,7 +498,7 @@ class TestFacetsNotSet:
db_session, UserCreate(username="alice", email="a@test.com") db_session, UserCreate(username="alice", email="a@test.com")
) )
result = await UserCrud.offset_paginate(db_session) result = await UserCrud.offset_paginate(db_session, schema=UserRead)
assert result.filter_attributes is None assert result.filter_attributes is None
@@ -490,7 +510,7 @@ class TestFacetsNotSet:
db_session, UserCreate(username="alice", email="a@test.com") db_session, UserCreate(username="alice", email="a@test.com")
) )
result = await UserCursorCrud.cursor_paginate(db_session) result = await UserCursorCrud.cursor_paginate(db_session, schema=UserRead)
assert result.filter_attributes is None assert result.filter_attributes is None
@@ -509,7 +529,7 @@ class TestFacetsDirectColumn:
db_session, UserCreate(username="bob", email="b@test.com") db_session, UserCreate(username="bob", email="b@test.com")
) )
result = await UserFacetCrud.offset_paginate(db_session) result = await UserFacetCrud.offset_paginate(db_session, schema=UserRead)
assert result.filter_attributes is not None assert result.filter_attributes is not None
# Distinct usernames, sorted # Distinct usernames, sorted
@@ -528,7 +548,7 @@ class TestFacetsDirectColumn:
db_session, UserCreate(username="bob", email="b@test.com") db_session, UserCreate(username="bob", email="b@test.com")
) )
result = await UserFacetCursorCrud.cursor_paginate(db_session) result = await UserFacetCursorCrud.cursor_paginate(db_session, schema=UserRead)
assert result.filter_attributes is not None assert result.filter_attributes is not None
assert set(result.filter_attributes["email"]) == {"a@test.com", "b@test.com"} assert set(result.filter_attributes["email"]) == {"a@test.com", "b@test.com"}
@@ -544,7 +564,7 @@ class TestFacetsDirectColumn:
db_session, UserCreate(username="bob", email="b@test.com") db_session, UserCreate(username="bob", email="b@test.com")
) )
result = await UserFacetCrud.offset_paginate(db_session) result = await UserFacetCrud.offset_paginate(db_session, schema=UserRead)
assert result.filter_attributes is not None assert result.filter_attributes is not None
assert "username" in result.filter_attributes assert "username" in result.filter_attributes
@@ -561,7 +581,7 @@ class TestFacetsDirectColumn:
# Override: ask for email instead of username # Override: ask for email instead of username
result = await UserFacetCrud.offset_paginate( result = await UserFacetCrud.offset_paginate(
db_session, facet_fields=[User.email] db_session, facet_fields=[User.email], schema=UserRead
) )
assert result.filter_attributes is not None assert result.filter_attributes is not None
@@ -587,6 +607,7 @@ class TestFacetsRespectFilters:
result = await UserFacetCrud.offset_paginate( result = await UserFacetCrud.offset_paginate(
db_session, db_session,
filters=[User.is_active == True], # noqa: E712 filters=[User.is_active == True], # noqa: E712
schema=UserRead,
) )
assert result.filter_attributes is not None assert result.filter_attributes is not None
@@ -617,7 +638,7 @@ class TestFacetsRelationship:
db_session, UserCreate(username="charlie", email="c@test.com") db_session, UserCreate(username="charlie", email="c@test.com")
) )
result = await UserRelFacetCrud.offset_paginate(db_session) result = await UserRelFacetCrud.offset_paginate(db_session, schema=UserRead)
assert result.filter_attributes is not None assert result.filter_attributes is not None
assert set(result.filter_attributes["name"]) == {"admin", "editor"} assert set(result.filter_attributes["name"]) == {"admin", "editor"}
@@ -632,7 +653,7 @@ class TestFacetsRelationship:
db_session, UserCreate(username="norole", email="n@test.com") db_session, UserCreate(username="norole", email="n@test.com")
) )
result = await UserRelFacetCrud.offset_paginate(db_session) result = await UserRelFacetCrud.offset_paginate(db_session, schema=UserRead)
assert result.filter_attributes is not None assert result.filter_attributes is not None
assert result.filter_attributes["name"] == [] assert result.filter_attributes["name"] == []
@@ -656,7 +677,10 @@ class TestFacetsRelationship:
) )
result = await UserSearchFacetCrud.offset_paginate( result = await UserSearchFacetCrud.offset_paginate(
db_session, search="admin", search_fields=[(User.role, Role.name)] db_session,
search="admin",
search_fields=[(User.role, Role.name)],
schema=UserRead,
) )
assert result.filter_attributes is not None assert result.filter_attributes is not None
@@ -678,7 +702,7 @@ class TestFilterBy:
) )
result = await UserFacetCrud.offset_paginate( result = await UserFacetCrud.offset_paginate(
db_session, filter_by={"username": "alice"} db_session, filter_by={"username": "alice"}, schema=UserRead
) )
assert len(result.data) == 1 assert len(result.data) == 1
@@ -701,7 +725,7 @@ class TestFilterBy:
) )
result = await UserFacetCrud.offset_paginate( result = await UserFacetCrud.offset_paginate(
db_session, filter_by={"username": ["alice", "bob"]} db_session, filter_by={"username": ["alice", "bob"]}, schema=UserRead
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -726,7 +750,7 @@ class TestFilterBy:
) )
result = await UserRelFacetCrud.offset_paginate( result = await UserRelFacetCrud.offset_paginate(
db_session, filter_by={"name": "admin"} db_session, filter_by={"name": "admin"}, schema=UserRead
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -749,6 +773,7 @@ class TestFilterBy:
db_session, db_session,
filters=[User.is_active == True], # noqa: E712 filters=[User.is_active == True], # noqa: E712
filter_by={"username": ["alice", "alice2"]}, filter_by={"username": ["alice", "alice2"]},
schema=UserRead,
) )
# Only alice passes both: is_active=True AND username IN [alice, alice2] # Only alice passes both: is_active=True AND username IN [alice, alice2]
@@ -763,7 +788,7 @@ class TestFilterBy:
with pytest.raises(InvalidFacetFilterError) as exc_info: with pytest.raises(InvalidFacetFilterError) as exc_info:
await UserFacetCrud.offset_paginate( await UserFacetCrud.offset_paginate(
db_session, filter_by={"nonexistent": "value"} db_session, filter_by={"nonexistent": "value"}, schema=UserRead
) )
assert exc_info.value.key == "nonexistent" assert exc_info.value.key == "nonexistent"
@@ -795,6 +820,7 @@ class TestFilterBy:
result = await UserRoleFacetCrud.offset_paginate( result = await UserRoleFacetCrud.offset_paginate(
db_session, db_session,
filter_by={"name": "admin", "id": str(admin.id)}, filter_by={"name": "admin", "id": str(admin.id)},
schema=UserRead,
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -815,7 +841,7 @@ class TestFilterBy:
) )
result = await UserFacetCursorCrud.cursor_paginate( result = await UserFacetCursorCrud.cursor_paginate(
db_session, filter_by={"username": "alice"} db_session, filter_by={"username": "alice"}, schema=UserRead
) )
assert len(result.data) == 1 assert len(result.data) == 1
@@ -839,7 +865,7 @@ class TestFilterBy:
) )
result = await UserFacetCrud.offset_paginate( result = await UserFacetCrud.offset_paginate(
db_session, filter_by=UserFilter(username="alice") db_session, filter_by=UserFilter(username="alice"), schema=UserRead
) )
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
@@ -865,7 +891,7 @@ class TestFilterBy:
) )
result = await UserFacetCursorCrud.cursor_paginate( result = await UserFacetCursorCrud.cursor_paginate(
db_session, filter_by=UserFilter(username="alice") db_session, filter_by=UserFilter(username="alice"), schema=UserRead
) )
assert len(result.data) == 1 assert len(result.data) == 1
@@ -974,7 +1000,9 @@ class TestFilterParamsSchema:
dep = UserFacetCrud.filter_params() dep = UserFacetCrud.filter_params()
f = await dep(username=["alice"]) f = await dep(username=["alice"])
result = await UserFacetCrud.offset_paginate(db_session, filter_by=f) result = await UserFacetCrud.offset_paginate(
db_session, filter_by=f, schema=UserRead
)
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 1 assert result.pagination.total_count == 1
@@ -995,7 +1023,9 @@ class TestFilterParamsSchema:
dep = UserFacetCursorCrud.filter_params() dep = UserFacetCursorCrud.filter_params()
f = await dep(username=["alice"]) f = await dep(username=["alice"])
result = await UserFacetCursorCrud.cursor_paginate(db_session, filter_by=f) result = await UserFacetCursorCrud.cursor_paginate(
db_session, filter_by=f, schema=UserRead
)
assert len(result.data) == 1 assert len(result.data) == 1
assert result.data[0].username == "alice" assert result.data[0].username == "alice"
@@ -1013,7 +1043,9 @@ class TestFilterParamsSchema:
dep = UserFacetCrud.filter_params() dep = UserFacetCrud.filter_params()
f = await dep() # all fields None f = await dep() # all fields None
result = await UserFacetCrud.offset_paginate(db_session, filter_by=f) result = await UserFacetCrud.offset_paginate(
db_session, filter_by=f, schema=UserRead
)
assert isinstance(result.pagination, OffsetPagination) assert isinstance(result.pagination, OffsetPagination)
assert result.pagination.total_count == 2 assert result.pagination.total_count == 2

View File

@@ -14,7 +14,9 @@ from fastapi_toolsets.fixtures import (
load_fixtures_by_context, load_fixtures_by_context,
) )
from .conftest import Role, User from fastapi_toolsets.fixtures.utils import _get_primary_key
from .conftest import IntRole, Permission, Role, User
class TestContext: class TestContext:
@@ -597,6 +599,46 @@ class TestLoadFixtures:
count = await RoleCrud.count(db_session) count = await RoleCrud.count(db_session)
assert count == 2 assert count == 2
@pytest.mark.anyio
async def test_skip_existing_skips_if_record_exists(self, db_session: AsyncSession):
"""SKIP_EXISTING returns empty loaded list when the record already exists."""
registry = FixtureRegistry()
role_id = uuid.uuid4()
@registry.register
def roles():
return [Role(id=role_id, name="admin")]
# First load — inserts the record.
result1 = await load_fixtures(
db_session, registry, "roles", strategy=LoadStrategy.SKIP_EXISTING
)
assert len(result1["roles"]) == 1
# Remove from identity map so session.get() queries the DB in the second load.
db_session.expunge_all()
# Second load — record exists in DB, nothing should be added.
result2 = await load_fixtures(
db_session, registry, "roles", strategy=LoadStrategy.SKIP_EXISTING
)
assert result2["roles"] == []
@pytest.mark.anyio
async def test_skip_existing_null_pk_inserts(self, db_session: AsyncSession):
"""SKIP_EXISTING inserts when the instance has no PK set (auto-increment)."""
registry = FixtureRegistry()
@registry.register
def int_roles():
# No id provided — PK is None before INSERT (autoincrement).
return [IntRole(name="member")]
result = await load_fixtures(
db_session, registry, "int_roles", strategy=LoadStrategy.SKIP_EXISTING
)
assert len(result["int_roles"]) == 1
class TestLoadFixturesByContext: class TestLoadFixturesByContext:
"""Tests for load_fixtures_by_context function.""" """Tests for load_fixtures_by_context function."""
@@ -755,3 +797,19 @@ class TestGetObjByAttr:
"""Raises StopIteration when value type doesn't match.""" """Raises StopIteration when value type doesn't match."""
with pytest.raises(StopIteration): with pytest.raises(StopIteration):
get_obj_by_attr(self.roles, "id", "not-a-uuid") get_obj_by_attr(self.roles, "id", "not-a-uuid")
class TestGetPrimaryKey:
"""Unit tests for the _get_primary_key helper (composite PK paths)."""
def test_composite_pk_all_set(self):
"""Returns a tuple when all composite PK values are set."""
instance = Permission(subject="post", action="read")
pk = _get_primary_key(instance)
assert pk == ("post", "read")
def test_composite_pk_partial_none(self):
"""Returns None when any composite PK value is None."""
instance = Permission(subject="post") # action is None
pk = _get_primary_key(instance)
assert pk is None

View File

@@ -9,7 +9,6 @@ from fastapi_toolsets.schemas import (
ErrorResponse, ErrorResponse,
OffsetPagination, OffsetPagination,
PaginatedResponse, PaginatedResponse,
Pagination,
Response, Response,
ResponseStatus, ResponseStatus,
) )
@@ -199,20 +198,6 @@ class TestOffsetPagination:
assert data["page"] == 2 assert data["page"] == 2
assert data["has_more"] is True assert data["has_more"] is True
def test_pagination_alias_is_offset_pagination(self):
"""Pagination is a backward-compatible alias for OffsetPagination."""
assert Pagination is OffsetPagination
def test_pagination_alias_constructs_offset_pagination(self):
"""Code using Pagination(...) still works unchanged."""
pagination = Pagination(
total_count=10,
items_per_page=5,
page=2,
has_more=False,
)
assert isinstance(pagination, OffsetPagination)
class TestCursorPagination: class TestCursorPagination:
"""Tests for CursorPagination schema.""" """Tests for CursorPagination schema."""
@@ -276,7 +261,7 @@ class TestPaginatedResponse:
def test_create_paginated_response(self): def test_create_paginated_response(self):
"""Create PaginatedResponse with data and pagination.""" """Create PaginatedResponse with data and pagination."""
pagination = Pagination( pagination = OffsetPagination(
total_count=30, total_count=30,
items_per_page=10, items_per_page=10,
page=1, page=1,
@@ -294,7 +279,7 @@ class TestPaginatedResponse:
def test_with_custom_message(self): def test_with_custom_message(self):
"""PaginatedResponse with custom message.""" """PaginatedResponse with custom message."""
pagination = Pagination( pagination = OffsetPagination(
total_count=5, total_count=5,
items_per_page=10, items_per_page=10,
page=1, page=1,
@@ -310,7 +295,7 @@ class TestPaginatedResponse:
def test_empty_data(self): def test_empty_data(self):
"""PaginatedResponse with empty data.""" """PaginatedResponse with empty data."""
pagination = Pagination( pagination = OffsetPagination(
total_count=0, total_count=0,
items_per_page=10, items_per_page=10,
page=1, page=1,
@@ -332,7 +317,7 @@ class TestPaginatedResponse:
id: int id: int
name: str name: str
pagination = Pagination( pagination = OffsetPagination(
total_count=1, total_count=1,
items_per_page=10, items_per_page=10,
page=1, page=1,
@@ -347,7 +332,7 @@ class TestPaginatedResponse:
def test_serialization(self): def test_serialization(self):
"""PaginatedResponse serializes correctly.""" """PaginatedResponse serializes correctly."""
pagination = Pagination( pagination = OffsetPagination(
total_count=100, total_count=100,
items_per_page=10, items_per_page=10,
page=5, page=5,
@@ -385,16 +370,6 @@ class TestPaginatedResponse:
) )
assert isinstance(response.pagination, CursorPagination) assert isinstance(response.pagination, CursorPagination)
def test_pagination_alias_accepted(self):
"""Constructing PaginatedResponse with Pagination (alias) still works."""
response = PaginatedResponse(
data=[],
pagination=Pagination(
total_count=0, items_per_page=10, page=1, has_more=False
),
)
assert isinstance(response.pagination, OffsetPagination)
class TestFromAttributes: class TestFromAttributes:
"""Tests for from_attributes config (ORM mode).""" """Tests for from_attributes config (ORM mode)."""