mirror of
https://github.com/d3vyce/fastapi-toolsets.git
synced 2026-03-01 17:00:48 +01:00
Compare commits
1 Commits
0fc86d3c34
...
44921e5966
| Author | SHA1 | Date | |
|---|---|---|---|
|
44921e5966
|
@@ -1,6 +1,6 @@
|
||||
# Pagination & search
|
||||
|
||||
This example builds an articles listing endpoint that supports **offset pagination**, **cursor pagination**, **full-text search**, **faceted filtering**, and **sorting** — all from a single `CrudFactory` definition.
|
||||
This example builds an articles listing endpoint that supports **offset pagination**, **cursor pagination**, **full-text search**, and **faceted filtering** — all from a single `CrudFactory` definition.
|
||||
|
||||
## Models
|
||||
|
||||
@@ -16,7 +16,7 @@ This example builds an articles listing endpoint that supports **offset paginati
|
||||
|
||||
## Crud
|
||||
|
||||
Declare `searchable_fields`, `facet_fields`, and `order_fields` once on [`CrudFactory`](../reference/crud.md#fastapi_toolsets.crud.factory.CrudFactory). All endpoints built from this class share the same defaults and can override them per call.
|
||||
Declare `facet_fields` and `searchable_fields` once on [`CrudFactory`](../reference/crud.md#fastapi_toolsets.crud.factory.CrudFactory). All endpoints built from this class share the same defaults and can override them per call.
|
||||
|
||||
```python title="crud.py"
|
||||
--8<-- "docs_src/examples/pagination_search/crud.py"
|
||||
@@ -46,14 +46,14 @@ Declare `searchable_fields`, `facet_fields`, and `order_fields` once on [`CrudFa
|
||||
|
||||
Best for admin panels or any UI that needs a total item count and numbered pages.
|
||||
|
||||
```python title="routes.py:1:36"
|
||||
--8<-- "docs_src/examples/pagination_search/routes.py:1:36"
|
||||
```python title="routes.py:1:27"
|
||||
--8<-- "docs_src/examples/pagination_search/routes.py:1:27"
|
||||
```
|
||||
|
||||
**Example request**
|
||||
|
||||
```
|
||||
GET /articles/offset?page=2&items_per_page=10&search=fastapi&status=published&order_by=title&order=asc
|
||||
GET /articles/offset?page=2&items_per_page=10&search=fastapi&status=published
|
||||
```
|
||||
|
||||
**Example response**
|
||||
@@ -83,14 +83,14 @@ GET /articles/offset?page=2&items_per_page=10&search=fastapi&status=published&or
|
||||
|
||||
Best for feeds, infinite scroll, or any high-throughput API where offset performance degrades.
|
||||
|
||||
```python title="routes.py:39:59"
|
||||
--8<-- "docs_src/examples/pagination_search/routes.py:39:59"
|
||||
```python title="routes.py:30:45"
|
||||
--8<-- "docs_src/examples/pagination_search/routes.py:30:45"
|
||||
```
|
||||
|
||||
**Example request**
|
||||
|
||||
```
|
||||
GET /articles/cursor?items_per_page=10&status=published&order_by=created_at&order=desc
|
||||
GET /articles/cursor?items_per_page=10&status=published
|
||||
```
|
||||
|
||||
**Example response**
|
||||
|
||||
@@ -292,8 +292,6 @@ Use `filter_by` to pass the client's chosen filter values directly — no need t
|
||||
Use [`filter_params()`](../reference/crud.md#fastapi_toolsets.crud.factory.AsyncCrud.filter_params) to generate a dict with the facet filter values from the query parameters:
|
||||
|
||||
```python
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends
|
||||
|
||||
UserCrud = CrudFactory(
|
||||
@@ -305,7 +303,7 @@ UserCrud = CrudFactory(
|
||||
async def list_users(
|
||||
session: SessionDep,
|
||||
page: int = 1,
|
||||
filter_by: Annotated[dict[str, list[str]], Depends(UserCrud.filter_params())],
|
||||
filter_by: dict[str, list[str]] = Depends(UserCrud.filter_params()),
|
||||
) -> PaginatedResponse[UserRead]:
|
||||
return await UserCrud.offset_paginate(
|
||||
session=session,
|
||||
@@ -322,58 +320,6 @@ GET /users?status=active&country=FR → filter_by={"status": ["active"], "coun
|
||||
GET /users?role=admin&role=editor → filter_by={"role": ["admin", "editor"]} (IN clause)
|
||||
```
|
||||
|
||||
## Sorting
|
||||
|
||||
!!! info "Added in `v1.3`"
|
||||
|
||||
Declare `order_fields` on the CRUD class to expose client-driven column ordering via `order_by` and `order` query parameters.
|
||||
|
||||
```python
|
||||
UserCrud = CrudFactory(
|
||||
model=User,
|
||||
order_fields=[
|
||||
User.name,
|
||||
User.created_at,
|
||||
],
|
||||
)
|
||||
```
|
||||
|
||||
Call [`order_params()`](../reference/crud.md#fastapi_toolsets.crud.factory.AsyncCrud.order_params) to generate a FastAPI dependency that maps the query parameters to an [`OrderByClause`](../reference/crud.md#fastapi_toolsets.crud.factory.OrderByClause) expression:
|
||||
|
||||
```python
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import Depends
|
||||
from fastapi_toolsets.crud import OrderByClause
|
||||
|
||||
@router.get("")
|
||||
async def list_users(
|
||||
session: SessionDep,
|
||||
order_by: Annotated[OrderByClause | None, Depends(UserCrud.order_params())],
|
||||
) -> PaginatedResponse[UserRead]:
|
||||
return await UserCrud.offset_paginate(session=session, order_by=order_by)
|
||||
```
|
||||
|
||||
The dependency adds two query parameters to the endpoint:
|
||||
|
||||
| Parameter | Type |
|
||||
| ---------- | --------------- |
|
||||
| `order_by` | `str | null` |
|
||||
| `order` | `asc` or `desc` |
|
||||
|
||||
```
|
||||
GET /users?order_by=name&order=asc → ORDER BY users.name ASC
|
||||
GET /users?order_by=name&order=desc → ORDER BY users.name DESC
|
||||
```
|
||||
|
||||
An unknown `order_by` value raises [`InvalidOrderFieldError`](../reference/exceptions.md#fastapi_toolsets.exceptions.exceptions.InvalidOrderFieldError) (HTTP 422).
|
||||
|
||||
You can also pass `order_fields` directly to `order_params()` to override the class-level defaults without modifying them:
|
||||
|
||||
```python
|
||||
UserOrderParams = UserCrud.order_params(order_fields=[User.name])
|
||||
```
|
||||
|
||||
## Relationship loading
|
||||
|
||||
!!! info "Added in `v1.1`"
|
||||
|
||||
@@ -13,7 +13,6 @@ from fastapi_toolsets.exceptions import (
|
||||
ConflictError,
|
||||
NoSearchableFieldsError,
|
||||
InvalidFacetFilterError,
|
||||
InvalidOrderFieldError,
|
||||
generate_error_responses,
|
||||
init_exceptions_handlers,
|
||||
)
|
||||
@@ -33,8 +32,6 @@ from fastapi_toolsets.exceptions import (
|
||||
|
||||
## ::: fastapi_toolsets.exceptions.exceptions.InvalidFacetFilterError
|
||||
|
||||
## ::: fastapi_toolsets.exceptions.exceptions.InvalidOrderFieldError
|
||||
|
||||
## ::: fastapi_toolsets.exceptions.exceptions.generate_error_responses
|
||||
|
||||
## ::: fastapi_toolsets.exceptions.handler.init_exceptions_handlers
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
from fastapi import FastAPI
|
||||
|
||||
from fastapi_toolsets.exceptions import init_exceptions_handlers
|
||||
|
||||
from .routes import router
|
||||
|
||||
app = FastAPI()
|
||||
init_exceptions_handlers(app=app)
|
||||
app.include_router(router=router)
|
||||
|
||||
@@ -14,8 +14,6 @@ ArticleCrud = CrudFactory(
|
||||
Article.status,
|
||||
(Article.category, Category.name),
|
||||
],
|
||||
order_fields=[ # fields exposed for client-driven ordering
|
||||
Article.title,
|
||||
Article.created_at,
|
||||
],
|
||||
)
|
||||
|
||||
ArticleFilters = ArticleCrud.filter_params()
|
||||
|
||||
@@ -1,13 +1,9 @@
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import APIRouter, Depends, Query
|
||||
|
||||
from fastapi_toolsets.crud import OrderByClause
|
||||
from fastapi_toolsets.schemas import PaginatedResponse
|
||||
|
||||
from .crud import ArticleCrud
|
||||
from .db import SessionDep
|
||||
from .models import Article
|
||||
from .schemas import ArticleRead
|
||||
|
||||
router = APIRouter(prefix="/articles")
|
||||
@@ -16,14 +12,10 @@ router = APIRouter(prefix="/articles")
|
||||
@router.get("/offset")
|
||||
async def list_articles_offset(
|
||||
session: SessionDep,
|
||||
filter_by: Annotated[dict[str, list[str]], Depends(ArticleCrud.filter_params())],
|
||||
order_by: Annotated[
|
||||
OrderByClause | None,
|
||||
Depends(ArticleCrud.order_params(default_field=Article.created_at)),
|
||||
],
|
||||
page: int = Query(1, ge=1),
|
||||
items_per_page: int = Query(20, ge=1, le=100),
|
||||
search: str | None = None,
|
||||
filter_by: dict[str, list[str]] = Depends(ArticleCrud.filter_params()),
|
||||
) -> PaginatedResponse[ArticleRead]:
|
||||
return await ArticleCrud.offset_paginate(
|
||||
session=session,
|
||||
@@ -31,7 +23,6 @@ async def list_articles_offset(
|
||||
items_per_page=items_per_page,
|
||||
search=search,
|
||||
filter_by=filter_by or None,
|
||||
order_by=order_by,
|
||||
schema=ArticleRead,
|
||||
)
|
||||
|
||||
@@ -39,14 +30,10 @@ async def list_articles_offset(
|
||||
@router.get("/cursor")
|
||||
async def list_articles_cursor(
|
||||
session: SessionDep,
|
||||
filter_by: Annotated[dict[str, list[str]], Depends(ArticleCrud.filter_params())],
|
||||
order_by: Annotated[
|
||||
OrderByClause | None,
|
||||
Depends(ArticleCrud.order_params(default_field=Article.created_at)),
|
||||
],
|
||||
cursor: str | None = None,
|
||||
items_per_page: int = Query(20, ge=1, le=100),
|
||||
search: str | None = None,
|
||||
filter_by: dict[str, list[str]] = Depends(ArticleCrud.filter_params()),
|
||||
) -> PaginatedResponse[ArticleRead]:
|
||||
return await ArticleCrud.cursor_paginate(
|
||||
session=session,
|
||||
@@ -54,6 +41,5 @@ async def list_articles_cursor(
|
||||
items_per_page=items_per_page,
|
||||
search=search,
|
||||
filter_by=filter_by or None,
|
||||
order_by=order_by,
|
||||
schema=ArticleRead,
|
||||
)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "fastapi-toolsets"
|
||||
version = "1.3.0"
|
||||
version = "1.2.1"
|
||||
description = "Production-ready utilities for FastAPI applications"
|
||||
readme = "README.md"
|
||||
license = "MIT"
|
||||
|
||||
@@ -21,4 +21,4 @@ Example usage:
|
||||
return Response(data={"user": user.username}, message="Success")
|
||||
"""
|
||||
|
||||
__version__ = "1.3.0"
|
||||
__version__ = "1.2.1"
|
||||
|
||||
@@ -72,7 +72,7 @@ async def load(
|
||||
registry = get_fixtures_registry()
|
||||
db_context = get_db_context()
|
||||
|
||||
context_list = list(contexts) if contexts else [Context.BASE]
|
||||
context_list = [c.value for c in contexts] if contexts else [Context.BASE]
|
||||
|
||||
ordered = registry.resolve_context_dependencies(*context_list)
|
||||
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
"""Generic async CRUD operations for SQLAlchemy models."""
|
||||
|
||||
from ..exceptions import InvalidFacetFilterError, NoSearchableFieldsError
|
||||
from ..types import FacetFieldType, JoinType, M2MFieldType, OrderByClause
|
||||
from .factory import CrudFactory
|
||||
from .search import SearchConfig, get_searchable_fields
|
||||
from .factory import CrudFactory, JoinType, M2MFieldType
|
||||
from .search import (
|
||||
FacetFieldType,
|
||||
SearchConfig,
|
||||
get_searchable_fields,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"CrudFactory",
|
||||
@@ -13,6 +16,5 @@ __all__ = [
|
||||
"JoinType",
|
||||
"M2MFieldType",
|
||||
"NoSearchableFieldsError",
|
||||
"OrderByClause",
|
||||
"SearchConfig",
|
||||
]
|
||||
|
||||
@@ -6,10 +6,10 @@ import base64
|
||||
import inspect
|
||||
import json
|
||||
import uuid as uuid_module
|
||||
from collections.abc import Awaitable, Callable, Sequence
|
||||
from collections.abc import Awaitable, Callable, Mapping, Sequence
|
||||
from datetime import date, datetime
|
||||
from decimal import Decimal
|
||||
from typing import Any, ClassVar, Generic, Literal, Self, cast, overload
|
||||
from typing import Any, ClassVar, Generic, Literal, Self, TypeVar, cast, overload
|
||||
|
||||
from fastapi import Query
|
||||
from pydantic import BaseModel
|
||||
@@ -23,25 +23,23 @@ from sqlalchemy.sql.base import ExecutableOption
|
||||
from sqlalchemy.sql.roles import WhereHavingRole
|
||||
|
||||
from ..db import get_transaction
|
||||
from ..exceptions import InvalidOrderFieldError, NotFoundError
|
||||
from ..exceptions import NotFoundError
|
||||
from ..schemas import CursorPagination, OffsetPagination, PaginatedResponse, Response
|
||||
from ..types import (
|
||||
FacetFieldType,
|
||||
JoinType,
|
||||
M2MFieldType,
|
||||
ModelType,
|
||||
OrderByClause,
|
||||
SchemaType,
|
||||
SearchFieldType,
|
||||
)
|
||||
from .search import (
|
||||
FacetFieldType,
|
||||
SearchConfig,
|
||||
SearchFieldType,
|
||||
build_facets,
|
||||
build_filter_by,
|
||||
build_search_filters,
|
||||
facet_keys,
|
||||
)
|
||||
|
||||
ModelType = TypeVar("ModelType", bound=DeclarativeBase)
|
||||
SchemaType = TypeVar("SchemaType", bound=BaseModel)
|
||||
JoinType = list[tuple[type[DeclarativeBase], Any]]
|
||||
M2MFieldType = Mapping[str, QueryableAttribute[Any]]
|
||||
|
||||
|
||||
def _encode_cursor(value: Any) -> str:
|
||||
"""Encode cursor column value as an base64 string."""
|
||||
@@ -53,22 +51,6 @@ def _decode_cursor(cursor: str) -> str:
|
||||
return json.loads(base64.b64decode(cursor.encode()).decode())
|
||||
|
||||
|
||||
def _apply_joins(q: Any, joins: JoinType | None, outer_join: bool) -> Any:
|
||||
"""Apply a list of (model, condition) joins to a SQLAlchemy select query."""
|
||||
if not joins:
|
||||
return q
|
||||
for model, condition in joins:
|
||||
q = q.outerjoin(model, condition) if outer_join else q.join(model, condition)
|
||||
return q
|
||||
|
||||
|
||||
def _apply_search_joins(q: Any, search_joins: list[Any]) -> Any:
|
||||
"""Apply relationship-based outer joins (from search/filter_by) to a query."""
|
||||
for join_rel in search_joins:
|
||||
q = q.outerjoin(join_rel)
|
||||
return q
|
||||
|
||||
|
||||
class AsyncCrud(Generic[ModelType]):
|
||||
"""Generic async CRUD operations for SQLAlchemy models.
|
||||
|
||||
@@ -78,7 +60,6 @@ class AsyncCrud(Generic[ModelType]):
|
||||
model: ClassVar[type[DeclarativeBase]]
|
||||
searchable_fields: ClassVar[Sequence[SearchFieldType] | None] = None
|
||||
facet_fields: ClassVar[Sequence[FacetFieldType] | None] = None
|
||||
order_fields: ClassVar[Sequence[QueryableAttribute[Any]] | None] = None
|
||||
m2m_fields: ClassVar[M2MFieldType | None] = None
|
||||
default_load_options: ClassVar[list[ExecutableOption] | None] = None
|
||||
cursor_column: ClassVar[Any | None] = None
|
||||
@@ -148,48 +129,6 @@ class AsyncCrud(Generic[ModelType]):
|
||||
return set()
|
||||
return set(cls.m2m_fields.keys())
|
||||
|
||||
@classmethod
|
||||
def _resolve_facet_fields(
|
||||
cls: type[Self],
|
||||
facet_fields: Sequence[FacetFieldType] | None,
|
||||
) -> Sequence[FacetFieldType] | None:
|
||||
"""Return facet_fields if given, otherwise fall back to the class-level default."""
|
||||
return facet_fields if facet_fields is not None else cls.facet_fields
|
||||
|
||||
@classmethod
|
||||
def _prepare_filter_by(
|
||||
cls: type[Self],
|
||||
filter_by: dict[str, Any] | BaseModel | None,
|
||||
facet_fields: Sequence[FacetFieldType] | None,
|
||||
) -> tuple[list[Any], list[Any]]:
|
||||
"""Normalize filter_by and return (filters, joins) to apply to the query."""
|
||||
if isinstance(filter_by, BaseModel):
|
||||
filter_by = filter_by.model_dump(exclude_none=True)
|
||||
if not filter_by:
|
||||
return [], []
|
||||
resolved = cls._resolve_facet_fields(facet_fields)
|
||||
return build_filter_by(filter_by, resolved or [])
|
||||
|
||||
@classmethod
|
||||
async def _build_filter_attributes(
|
||||
cls: type[Self],
|
||||
session: AsyncSession,
|
||||
facet_fields: Sequence[FacetFieldType] | None,
|
||||
filters: list[Any],
|
||||
search_joins: list[Any],
|
||||
) -> dict[str, list[Any]] | None:
|
||||
"""Build facet filter_attributes, or return None if no facet fields configured."""
|
||||
resolved = cls._resolve_facet_fields(facet_fields)
|
||||
if not resolved:
|
||||
return None
|
||||
return await build_facets(
|
||||
session,
|
||||
cls.model,
|
||||
resolved,
|
||||
base_filters=filters,
|
||||
base_joins=search_joins,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def filter_params(
|
||||
cls: type[Self],
|
||||
@@ -210,7 +149,7 @@ class AsyncCrud(Generic[ModelType]):
|
||||
ValueError: If no facet fields are configured on this CRUD class and none are
|
||||
provided via ``facet_fields``.
|
||||
"""
|
||||
fields = cls._resolve_facet_fields(facet_fields)
|
||||
fields = facet_fields if facet_fields is not None else cls.facet_fields
|
||||
if not fields:
|
||||
raise ValueError(
|
||||
f"{cls.__name__} has no facet_fields configured. "
|
||||
@@ -236,63 +175,6 @@ class AsyncCrud(Generic[ModelType]):
|
||||
|
||||
return dependency
|
||||
|
||||
@classmethod
|
||||
def order_params(
|
||||
cls: type[Self],
|
||||
*,
|
||||
order_fields: Sequence[QueryableAttribute[Any]] | None = None,
|
||||
default_field: QueryableAttribute[Any] | None = None,
|
||||
default_order: Literal["asc", "desc"] = "asc",
|
||||
) -> Callable[..., Awaitable[OrderByClause | None]]:
|
||||
"""Return a FastAPI dependency that resolves order query params into an order_by clause.
|
||||
|
||||
Args:
|
||||
order_fields: Override the allowed order fields. Falls back to the class-level
|
||||
``order_fields`` if not provided.
|
||||
default_field: Field to order by when ``order_by`` query param is absent.
|
||||
If ``None`` and no ``order_by`` is provided, no ordering is applied.
|
||||
default_order: Default order direction when ``order`` is absent
|
||||
(``"asc"`` or ``"desc"``).
|
||||
|
||||
Returns:
|
||||
An async dependency function named ``{Model}OrderParams`` that resolves to an
|
||||
``OrderByClause`` (or ``None``). Pass it to ``Depends()`` in your route.
|
||||
|
||||
Raises:
|
||||
ValueError: If no order fields are configured on this CRUD class and none are
|
||||
provided via ``order_fields``.
|
||||
InvalidOrderFieldError: When the request provides an unknown ``order_by`` value.
|
||||
"""
|
||||
fields = order_fields if order_fields is not None else cls.order_fields
|
||||
if not fields:
|
||||
raise ValueError(
|
||||
f"{cls.__name__} has no order_fields configured. "
|
||||
"Pass order_fields= or set them on CrudFactory."
|
||||
)
|
||||
field_map: dict[str, QueryableAttribute[Any]] = {f.key: f for f in fields}
|
||||
valid_keys = sorted(field_map.keys())
|
||||
|
||||
async def dependency(
|
||||
order_by: str | None = Query(
|
||||
None, description=f"Field to order by. Valid values: {valid_keys}"
|
||||
),
|
||||
order: Literal["asc", "desc"] = Query(
|
||||
default_order, description="Sort direction"
|
||||
),
|
||||
) -> OrderByClause | None:
|
||||
if order_by is None:
|
||||
if default_field is None:
|
||||
return None
|
||||
field = default_field
|
||||
elif order_by not in field_map:
|
||||
raise InvalidOrderFieldError(order_by, valid_keys)
|
||||
else:
|
||||
field = field_map[order_by]
|
||||
return field.asc() if order == "asc" else field.desc()
|
||||
|
||||
dependency.__name__ = f"{cls.model.__name__}OrderParams"
|
||||
return dependency
|
||||
|
||||
@overload
|
||||
@classmethod
|
||||
async def create( # pragma: no cover
|
||||
@@ -348,7 +230,8 @@ class AsyncCrud(Generic[ModelType]):
|
||||
await session.refresh(db_model)
|
||||
result = cast(ModelType, db_model)
|
||||
if schema:
|
||||
return Response(data=schema.model_validate(result))
|
||||
data_out = schema.model_validate(result) if schema else result
|
||||
return Response(data=data_out)
|
||||
return result
|
||||
|
||||
@overload
|
||||
@@ -411,7 +294,13 @@ class AsyncCrud(Generic[ModelType]):
|
||||
MultipleResultsFound: If more than one record found
|
||||
"""
|
||||
q = select(cls.model)
|
||||
q = _apply_joins(q, joins, outer_join)
|
||||
if joins:
|
||||
for model, condition in joins:
|
||||
q = (
|
||||
q.outerjoin(model, condition)
|
||||
if outer_join
|
||||
else q.join(model, condition)
|
||||
)
|
||||
q = q.where(and_(*filters))
|
||||
if resolved := cls._resolve_load_options(load_options):
|
||||
q = q.options(*resolved)
|
||||
@@ -423,7 +312,8 @@ class AsyncCrud(Generic[ModelType]):
|
||||
raise NotFoundError()
|
||||
result = cast(ModelType, item)
|
||||
if schema:
|
||||
return Response(data=schema.model_validate(result))
|
||||
data_out = schema.model_validate(result) if schema else result
|
||||
return Response(data=data_out)
|
||||
return result
|
||||
|
||||
@classmethod
|
||||
@@ -449,7 +339,13 @@ class AsyncCrud(Generic[ModelType]):
|
||||
Model instance or None
|
||||
"""
|
||||
q = select(cls.model)
|
||||
q = _apply_joins(q, joins, outer_join)
|
||||
if joins:
|
||||
for model, condition in joins:
|
||||
q = (
|
||||
q.outerjoin(model, condition)
|
||||
if outer_join
|
||||
else q.join(model, condition)
|
||||
)
|
||||
if filters:
|
||||
q = q.where(and_(*filters))
|
||||
if resolved := cls._resolve_load_options(load_options):
|
||||
@@ -466,7 +362,7 @@ class AsyncCrud(Generic[ModelType]):
|
||||
joins: JoinType | None = None,
|
||||
outer_join: bool = False,
|
||||
load_options: list[ExecutableOption] | None = None,
|
||||
order_by: OrderByClause | None = None,
|
||||
order_by: Any | None = None,
|
||||
limit: int | None = None,
|
||||
offset: int | None = None,
|
||||
) -> Sequence[ModelType]:
|
||||
@@ -486,7 +382,13 @@ class AsyncCrud(Generic[ModelType]):
|
||||
List of model instances
|
||||
"""
|
||||
q = select(cls.model)
|
||||
q = _apply_joins(q, joins, outer_join)
|
||||
if joins:
|
||||
for model, condition in joins:
|
||||
q = (
|
||||
q.outerjoin(model, condition)
|
||||
if outer_join
|
||||
else q.join(model, condition)
|
||||
)
|
||||
if filters:
|
||||
q = q.where(and_(*filters))
|
||||
if resolved := cls._resolve_load_options(load_options):
|
||||
@@ -584,7 +486,8 @@ class AsyncCrud(Generic[ModelType]):
|
||||
setattr(db_model, rel_attr, related_instances)
|
||||
await session.refresh(db_model)
|
||||
if schema:
|
||||
return Response(data=schema.model_validate(db_model))
|
||||
data_out = schema.model_validate(db_model) if schema else db_model
|
||||
return Response(data=data_out)
|
||||
return db_model
|
||||
|
||||
@classmethod
|
||||
@@ -701,7 +604,13 @@ class AsyncCrud(Generic[ModelType]):
|
||||
Number of matching records
|
||||
"""
|
||||
q = select(func.count()).select_from(cls.model)
|
||||
q = _apply_joins(q, joins, outer_join)
|
||||
if joins:
|
||||
for model, condition in joins:
|
||||
q = (
|
||||
q.outerjoin(model, condition)
|
||||
if outer_join
|
||||
else q.join(model, condition)
|
||||
)
|
||||
if filters:
|
||||
q = q.where(and_(*filters))
|
||||
result = await session.execute(q)
|
||||
@@ -728,7 +637,13 @@ class AsyncCrud(Generic[ModelType]):
|
||||
True if at least one record matches
|
||||
"""
|
||||
q = select(cls.model)
|
||||
q = _apply_joins(q, joins, outer_join)
|
||||
if joins:
|
||||
for model, condition in joins:
|
||||
q = (
|
||||
q.outerjoin(model, condition)
|
||||
if outer_join
|
||||
else q.join(model, condition)
|
||||
)
|
||||
q = q.where(and_(*filters)).exists().select()
|
||||
result = await session.execute(q)
|
||||
return bool(result.scalar())
|
||||
@@ -742,7 +657,7 @@ class AsyncCrud(Generic[ModelType]):
|
||||
joins: JoinType | None = None,
|
||||
outer_join: bool = False,
|
||||
load_options: list[ExecutableOption] | None = None,
|
||||
order_by: OrderByClause | None = None,
|
||||
order_by: Any | None = None,
|
||||
page: int = 1,
|
||||
items_per_page: int = 20,
|
||||
search: str | SearchConfig | None = None,
|
||||
@@ -775,29 +690,47 @@ class AsyncCrud(Generic[ModelType]):
|
||||
"""
|
||||
filters = list(filters) if filters else []
|
||||
offset = (page - 1) * items_per_page
|
||||
search_joins: list[Any] = []
|
||||
|
||||
fb_filters, search_joins = cls._prepare_filter_by(filter_by, facet_fields)
|
||||
filters.extend(fb_filters)
|
||||
if isinstance(filter_by, BaseModel):
|
||||
filter_by = filter_by.model_dump(exclude_none=True) or None
|
||||
|
||||
# Build filter_by conditions from declared facet fields
|
||||
if filter_by:
|
||||
resolved_facets_for_filter = (
|
||||
facet_fields if facet_fields is not None else cls.facet_fields
|
||||
)
|
||||
fb_filters, fb_joins = build_filter_by(
|
||||
filter_by, resolved_facets_for_filter or []
|
||||
)
|
||||
filters.extend(fb_filters)
|
||||
search_joins.extend(fb_joins)
|
||||
|
||||
# Build search filters
|
||||
if search:
|
||||
search_filters, new_search_joins = build_search_filters(
|
||||
search_filters, search_joins = build_search_filters(
|
||||
cls.model,
|
||||
search,
|
||||
search_fields=search_fields,
|
||||
default_fields=cls.searchable_fields,
|
||||
)
|
||||
filters.extend(search_filters)
|
||||
search_joins.extend(new_search_joins)
|
||||
|
||||
# Build query with joins
|
||||
q = select(cls.model)
|
||||
|
||||
# Apply explicit joins
|
||||
q = _apply_joins(q, joins, outer_join)
|
||||
if joins:
|
||||
for model, condition in joins:
|
||||
q = (
|
||||
q.outerjoin(model, condition)
|
||||
if outer_join
|
||||
else q.join(model, condition)
|
||||
)
|
||||
|
||||
# Apply search joins (always outer joins for search)
|
||||
q = _apply_search_joins(q, search_joins)
|
||||
for join_rel in search_joins:
|
||||
q = q.outerjoin(join_rel)
|
||||
|
||||
if filters:
|
||||
q = q.where(and_(*filters))
|
||||
@@ -817,10 +750,17 @@ class AsyncCrud(Generic[ModelType]):
|
||||
count_q = count_q.select_from(cls.model)
|
||||
|
||||
# Apply explicit joins to count query
|
||||
count_q = _apply_joins(count_q, joins, outer_join)
|
||||
if joins:
|
||||
for model, condition in joins:
|
||||
count_q = (
|
||||
count_q.outerjoin(model, condition)
|
||||
if outer_join
|
||||
else count_q.join(model, condition)
|
||||
)
|
||||
|
||||
# Apply search joins to count query
|
||||
count_q = _apply_search_joins(count_q, search_joins)
|
||||
for join_rel in search_joins:
|
||||
count_q = count_q.outerjoin(join_rel)
|
||||
|
||||
if filters:
|
||||
count_q = count_q.where(and_(*filters))
|
||||
@@ -828,9 +768,19 @@ class AsyncCrud(Generic[ModelType]):
|
||||
count_result = await session.execute(count_q)
|
||||
total_count = count_result.scalar_one()
|
||||
|
||||
filter_attributes = await cls._build_filter_attributes(
|
||||
session, facet_fields, filters, search_joins
|
||||
# Build facets
|
||||
resolved_facet_fields = (
|
||||
facet_fields if facet_fields is not None else cls.facet_fields
|
||||
)
|
||||
filter_attributes: dict[str, list[Any]] | None = None
|
||||
if resolved_facet_fields:
|
||||
filter_attributes = await build_facets(
|
||||
session,
|
||||
cls.model,
|
||||
resolved_facet_fields,
|
||||
base_filters=filters or None,
|
||||
base_joins=search_joins or None,
|
||||
)
|
||||
|
||||
return PaginatedResponse(
|
||||
data=items,
|
||||
@@ -853,7 +803,7 @@ class AsyncCrud(Generic[ModelType]):
|
||||
joins: JoinType | None = None,
|
||||
outer_join: bool = False,
|
||||
load_options: list[ExecutableOption] | None = None,
|
||||
order_by: OrderByClause | None = None,
|
||||
order_by: Any | None = None,
|
||||
items_per_page: int = 20,
|
||||
search: str | SearchConfig | None = None,
|
||||
search_fields: Sequence[SearchFieldType] | None = None,
|
||||
@@ -887,9 +837,21 @@ class AsyncCrud(Generic[ModelType]):
|
||||
PaginatedResponse with CursorPagination metadata
|
||||
"""
|
||||
filters = list(filters) if filters else []
|
||||
search_joins: list[Any] = []
|
||||
|
||||
fb_filters, search_joins = cls._prepare_filter_by(filter_by, facet_fields)
|
||||
filters.extend(fb_filters)
|
||||
if isinstance(filter_by, BaseModel):
|
||||
filter_by = filter_by.model_dump(exclude_none=True) or None
|
||||
|
||||
# Build filter_by conditions from declared facet fields
|
||||
if filter_by:
|
||||
resolved_facets_for_filter = (
|
||||
facet_fields if facet_fields is not None else cls.facet_fields
|
||||
)
|
||||
fb_filters, fb_joins = build_filter_by(
|
||||
filter_by, resolved_facets_for_filter or []
|
||||
)
|
||||
filters.extend(fb_filters)
|
||||
search_joins.extend(fb_joins)
|
||||
|
||||
if cls.cursor_column is None:
|
||||
raise ValueError(
|
||||
@@ -922,23 +884,29 @@ class AsyncCrud(Generic[ModelType]):
|
||||
|
||||
# Build search filters
|
||||
if search:
|
||||
search_filters, new_search_joins = build_search_filters(
|
||||
search_filters, search_joins = build_search_filters(
|
||||
cls.model,
|
||||
search,
|
||||
search_fields=search_fields,
|
||||
default_fields=cls.searchable_fields,
|
||||
)
|
||||
filters.extend(search_filters)
|
||||
search_joins.extend(new_search_joins)
|
||||
|
||||
# Build query
|
||||
q = select(cls.model)
|
||||
|
||||
# Apply explicit joins
|
||||
q = _apply_joins(q, joins, outer_join)
|
||||
if joins:
|
||||
for model, condition in joins:
|
||||
q = (
|
||||
q.outerjoin(model, condition)
|
||||
if outer_join
|
||||
else q.join(model, condition)
|
||||
)
|
||||
|
||||
# Apply search joins (always outer joins)
|
||||
q = _apply_search_joins(q, search_joins)
|
||||
for join_rel in search_joins:
|
||||
q = q.outerjoin(join_rel)
|
||||
|
||||
if filters:
|
||||
q = q.where(and_(*filters))
|
||||
@@ -970,9 +938,19 @@ class AsyncCrud(Generic[ModelType]):
|
||||
|
||||
items: list[Any] = [schema.model_validate(item) for item in items_page]
|
||||
|
||||
filter_attributes = await cls._build_filter_attributes(
|
||||
session, facet_fields, filters, search_joins
|
||||
# Build facets
|
||||
resolved_facet_fields = (
|
||||
facet_fields if facet_fields is not None else cls.facet_fields
|
||||
)
|
||||
filter_attributes: dict[str, list[Any]] | None = None
|
||||
if resolved_facet_fields:
|
||||
filter_attributes = await build_facets(
|
||||
session,
|
||||
cls.model,
|
||||
resolved_facet_fields,
|
||||
base_filters=filters or None,
|
||||
base_joins=search_joins or None,
|
||||
)
|
||||
|
||||
return PaginatedResponse(
|
||||
data=items,
|
||||
@@ -991,7 +969,6 @@ def CrudFactory(
|
||||
*,
|
||||
searchable_fields: Sequence[SearchFieldType] | None = None,
|
||||
facet_fields: Sequence[FacetFieldType] | None = None,
|
||||
order_fields: Sequence[QueryableAttribute[Any]] | None = None,
|
||||
m2m_fields: M2MFieldType | None = None,
|
||||
default_load_options: list[ExecutableOption] | None = None,
|
||||
cursor_column: Any | None = None,
|
||||
@@ -1004,8 +981,6 @@ def CrudFactory(
|
||||
facet_fields: Optional list of columns to compute distinct values for in paginated
|
||||
responses. Supports direct columns (``User.status``) and relationship tuples
|
||||
(``(User.role, Role.name)``). Can be overridden per call.
|
||||
order_fields: Optional list of model attributes that callers are allowed to order by
|
||||
via ``order_params()``. Can be overridden per call.
|
||||
m2m_fields: Optional mapping for many-to-many relationships.
|
||||
Maps schema field names (containing lists of IDs) to
|
||||
SQLAlchemy relationship attributes.
|
||||
@@ -1099,7 +1074,6 @@ def CrudFactory(
|
||||
"model": model,
|
||||
"searchable_fields": searchable_fields,
|
||||
"facet_fields": facet_fields,
|
||||
"order_fields": order_fields,
|
||||
"m2m_fields": m2m_fields,
|
||||
"default_load_options": default_load_options,
|
||||
"cursor_column": cursor_column,
|
||||
|
||||
@@ -1,23 +1,24 @@
|
||||
"""Search utilities for AsyncCrud."""
|
||||
|
||||
import asyncio
|
||||
import functools
|
||||
from collections import Counter
|
||||
from collections.abc import Sequence
|
||||
from dataclasses import dataclass, replace
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any, Literal
|
||||
|
||||
from sqlalchemy import String, and_, or_, select
|
||||
from sqlalchemy import String, or_, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
from sqlalchemy.orm.attributes import InstrumentedAttribute
|
||||
|
||||
from ..exceptions import InvalidFacetFilterError, NoSearchableFieldsError
|
||||
from ..types import FacetFieldType, SearchFieldType
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from sqlalchemy.sql.elements import ColumnElement
|
||||
|
||||
SearchFieldType = InstrumentedAttribute[Any] | tuple[InstrumentedAttribute[Any], ...]
|
||||
FacetFieldType = SearchFieldType
|
||||
|
||||
|
||||
@dataclass
|
||||
class SearchConfig:
|
||||
@@ -36,7 +37,6 @@ class SearchConfig:
|
||||
match_mode: Literal["any", "all"] = "any"
|
||||
|
||||
|
||||
@functools.lru_cache(maxsize=128)
|
||||
def get_searchable_fields(
|
||||
model: type[DeclarativeBase],
|
||||
*,
|
||||
@@ -101,11 +101,14 @@ def build_search_filters(
|
||||
if isinstance(search, str):
|
||||
config = SearchConfig(query=search, fields=search_fields)
|
||||
else:
|
||||
config = (
|
||||
replace(search, fields=search_fields)
|
||||
if search_fields is not None
|
||||
else search
|
||||
)
|
||||
config = search
|
||||
if search_fields is not None:
|
||||
config = SearchConfig(
|
||||
query=config.query,
|
||||
fields=search_fields,
|
||||
case_sensitive=config.case_sensitive,
|
||||
match_mode=config.match_mode,
|
||||
)
|
||||
|
||||
if not config.query or not config.query.strip():
|
||||
return [], []
|
||||
@@ -224,6 +227,8 @@ async def build_facets(
|
||||
q = q.outerjoin(rel)
|
||||
|
||||
if base_filters:
|
||||
from sqlalchemy import and_
|
||||
|
||||
q = q.where(and_(*base_filters))
|
||||
|
||||
q = q.order_by(column)
|
||||
|
||||
@@ -1,17 +1,20 @@
|
||||
"""Dependency factories for FastAPI routes."""
|
||||
|
||||
import inspect
|
||||
from collections.abc import Callable
|
||||
from typing import Any, cast
|
||||
from collections.abc import AsyncGenerator, Callable
|
||||
from typing import Any, TypeVar, cast
|
||||
|
||||
from fastapi import Depends
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
|
||||
from .crud import CrudFactory
|
||||
from .types import ModelType, SessionDependency
|
||||
|
||||
__all__ = ["BodyDependency", "PathDependency"]
|
||||
|
||||
ModelType = TypeVar("ModelType", bound=DeclarativeBase)
|
||||
SessionDependency = Callable[[], AsyncGenerator[AsyncSession, None]]
|
||||
|
||||
|
||||
def PathDependency(
|
||||
model: type[ModelType],
|
||||
|
||||
@@ -6,7 +6,6 @@ from .exceptions import (
|
||||
ConflictError,
|
||||
ForbiddenError,
|
||||
InvalidFacetFilterError,
|
||||
InvalidOrderFieldError,
|
||||
NoSearchableFieldsError,
|
||||
NotFoundError,
|
||||
UnauthorizedError,
|
||||
@@ -22,7 +21,6 @@ __all__ = [
|
||||
"generate_error_responses",
|
||||
"init_exceptions_handlers",
|
||||
"InvalidFacetFilterError",
|
||||
"InvalidOrderFieldError",
|
||||
"NoSearchableFieldsError",
|
||||
"NotFoundError",
|
||||
"UnauthorizedError",
|
||||
|
||||
@@ -128,31 +128,6 @@ class InvalidFacetFilterError(ApiException):
|
||||
super().__init__(detail)
|
||||
|
||||
|
||||
class InvalidOrderFieldError(ApiException):
|
||||
"""Raised when order_by contains a field not in the allowed order fields."""
|
||||
|
||||
api_error = ApiError(
|
||||
code=422,
|
||||
msg="Invalid Order Field",
|
||||
desc="The requested order field is not allowed for this resource.",
|
||||
err_code="SORT-422",
|
||||
)
|
||||
|
||||
def __init__(self, field: str, valid_fields: list[str]) -> None:
|
||||
"""Initialize the exception.
|
||||
|
||||
Args:
|
||||
field: The unknown order field provided by the caller
|
||||
valid_fields: List of valid field names
|
||||
"""
|
||||
self.field = field
|
||||
self.valid_fields = valid_fields
|
||||
detail = (
|
||||
f"'{field}' is not an allowed order field. Valid fields: {valid_fields}."
|
||||
)
|
||||
super().__init__(detail)
|
||||
|
||||
|
||||
def generate_error_responses(
|
||||
*errors: type[ApiException],
|
||||
) -> dict[int | str, dict[str, Any]]:
|
||||
|
||||
@@ -10,10 +10,6 @@ from fastapi.responses import JSONResponse
|
||||
from ..schemas import ErrorResponse, ResponseStatus
|
||||
from .exceptions import ApiException
|
||||
|
||||
_VALIDATION_LOCATION_PARAMS: frozenset[str] = frozenset(
|
||||
{"body", "query", "path", "header", "cookie"}
|
||||
)
|
||||
|
||||
|
||||
def init_exceptions_handlers(app: FastAPI) -> FastAPI:
|
||||
"""Register exception handlers and custom OpenAPI schema on a FastAPI app.
|
||||
@@ -110,7 +106,9 @@ def _format_validation_error(
|
||||
|
||||
for error in errors:
|
||||
field_path = ".".join(
|
||||
str(loc) for loc in error["loc"] if loc not in _VALIDATION_LOCATION_PARAMS
|
||||
str(loc)
|
||||
for loc in error["loc"]
|
||||
if loc not in ("body", "query", "path", "header", "cookie")
|
||||
)
|
||||
formatted_errors.append(
|
||||
{
|
||||
|
||||
@@ -1,84 +1,24 @@
|
||||
"""Fixture loading utilities for database seeding."""
|
||||
|
||||
from collections.abc import Callable, Sequence
|
||||
from typing import Any
|
||||
from typing import Any, TypeVar
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
|
||||
from ..db import get_transaction
|
||||
from ..logger import get_logger
|
||||
from ..types import ModelType
|
||||
from .enum import LoadStrategy
|
||||
from .registry import Context, FixtureRegistry
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
async def _load_ordered(
|
||||
session: AsyncSession,
|
||||
registry: FixtureRegistry,
|
||||
ordered_names: list[str],
|
||||
strategy: LoadStrategy,
|
||||
) -> dict[str, list[DeclarativeBase]]:
|
||||
"""Load fixtures in order."""
|
||||
results: dict[str, list[DeclarativeBase]] = {}
|
||||
|
||||
for name in ordered_names:
|
||||
fixture = registry.get(name)
|
||||
instances = list(fixture.func())
|
||||
|
||||
if not instances:
|
||||
results[name] = []
|
||||
continue
|
||||
|
||||
model_name = type(instances[0]).__name__
|
||||
loaded: list[DeclarativeBase] = []
|
||||
|
||||
async with get_transaction(session):
|
||||
for instance in instances:
|
||||
if strategy == LoadStrategy.INSERT:
|
||||
session.add(instance)
|
||||
loaded.append(instance)
|
||||
|
||||
elif strategy == LoadStrategy.MERGE:
|
||||
merged = await session.merge(instance)
|
||||
loaded.append(merged)
|
||||
|
||||
else: # LoadStrategy.SKIP_EXISTING
|
||||
pk = _get_primary_key(instance)
|
||||
if pk is not None:
|
||||
existing = await session.get(type(instance), pk)
|
||||
if existing is None:
|
||||
session.add(instance)
|
||||
loaded.append(instance)
|
||||
else:
|
||||
session.add(instance)
|
||||
loaded.append(instance)
|
||||
|
||||
results[name] = loaded
|
||||
logger.info(f"Loaded fixture '{name}': {len(loaded)} {model_name}(s)")
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def _get_primary_key(instance: DeclarativeBase) -> Any | None:
|
||||
"""Get the primary key value of a model instance."""
|
||||
mapper = instance.__class__.__mapper__
|
||||
pk_cols = mapper.primary_key
|
||||
|
||||
if len(pk_cols) == 1:
|
||||
return getattr(instance, pk_cols[0].name, None)
|
||||
|
||||
pk_values = tuple(getattr(instance, col.name, None) for col in pk_cols)
|
||||
if all(v is not None for v in pk_values):
|
||||
return pk_values
|
||||
return None
|
||||
T = TypeVar("T", bound=DeclarativeBase)
|
||||
|
||||
|
||||
def get_obj_by_attr(
|
||||
fixtures: Callable[[], Sequence[ModelType]], attr_name: str, value: Any
|
||||
) -> ModelType:
|
||||
fixtures: Callable[[], Sequence[T]], attr_name: str, value: Any
|
||||
) -> T:
|
||||
"""Get a SQLAlchemy model instance by matching an attribute value.
|
||||
|
||||
Args:
|
||||
@@ -117,6 +57,13 @@ async def load_fixtures(
|
||||
|
||||
Returns:
|
||||
Dict mapping fixture names to loaded instances
|
||||
|
||||
Example:
|
||||
```python
|
||||
# Loads 'roles' first (dependency), then 'users'
|
||||
result = await load_fixtures(session, fixtures, "users")
|
||||
print(result["users"]) # [User(...), ...]
|
||||
```
|
||||
"""
|
||||
ordered = registry.resolve_dependencies(*names)
|
||||
return await _load_ordered(session, registry, ordered, strategy)
|
||||
@@ -138,6 +85,76 @@ async def load_fixtures_by_context(
|
||||
|
||||
Returns:
|
||||
Dict mapping fixture names to loaded instances
|
||||
|
||||
Example:
|
||||
```python
|
||||
# Load base + testing fixtures
|
||||
await load_fixtures_by_context(
|
||||
session, fixtures,
|
||||
Context.BASE, Context.TESTING
|
||||
)
|
||||
```
|
||||
"""
|
||||
ordered = registry.resolve_context_dependencies(*contexts)
|
||||
return await _load_ordered(session, registry, ordered, strategy)
|
||||
|
||||
|
||||
async def _load_ordered(
|
||||
session: AsyncSession,
|
||||
registry: FixtureRegistry,
|
||||
ordered_names: list[str],
|
||||
strategy: LoadStrategy,
|
||||
) -> dict[str, list[DeclarativeBase]]:
|
||||
"""Load fixtures in order."""
|
||||
results: dict[str, list[DeclarativeBase]] = {}
|
||||
|
||||
for name in ordered_names:
|
||||
fixture = registry.get(name)
|
||||
instances = list(fixture.func())
|
||||
|
||||
if not instances:
|
||||
results[name] = []
|
||||
continue
|
||||
|
||||
model_name = type(instances[0]).__name__
|
||||
loaded: list[DeclarativeBase] = []
|
||||
|
||||
async with get_transaction(session):
|
||||
for instance in instances:
|
||||
if strategy == LoadStrategy.INSERT:
|
||||
session.add(instance)
|
||||
loaded.append(instance)
|
||||
|
||||
elif strategy == LoadStrategy.MERGE:
|
||||
merged = await session.merge(instance)
|
||||
loaded.append(merged)
|
||||
|
||||
elif strategy == LoadStrategy.SKIP_EXISTING:
|
||||
pk = _get_primary_key(instance)
|
||||
if pk is not None:
|
||||
existing = await session.get(type(instance), pk)
|
||||
if existing is None:
|
||||
session.add(instance)
|
||||
loaded.append(instance)
|
||||
else:
|
||||
session.add(instance)
|
||||
loaded.append(instance)
|
||||
|
||||
results[name] = loaded
|
||||
logger.info(f"Loaded fixture '{name}': {len(loaded)} {model_name}(s)")
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def _get_primary_key(instance: DeclarativeBase) -> Any | None:
|
||||
"""Get the primary key value of a model instance."""
|
||||
mapper = instance.__class__.__mapper__
|
||||
pk_cols = mapper.primary_key
|
||||
|
||||
if len(pk_cols) == 1:
|
||||
return getattr(instance, pk_cols[0].name, None)
|
||||
|
||||
pk_values = tuple(getattr(instance, col.name, None) for col in pk_cols)
|
||||
if all(v is not None for v in pk_values):
|
||||
return pk_values
|
||||
return None
|
||||
|
||||
@@ -53,23 +53,17 @@ def init_metrics(
|
||||
logger.debug("Initialising metric provider '%s'", provider.name)
|
||||
provider.func()
|
||||
|
||||
# Partition collectors and cache env check at startup — both are stable for the app lifetime.
|
||||
async_collectors = [
|
||||
c for c in registry.get_collectors() if asyncio.iscoroutinefunction(c.func)
|
||||
]
|
||||
sync_collectors = [
|
||||
c for c in registry.get_collectors() if not asyncio.iscoroutinefunction(c.func)
|
||||
]
|
||||
multiprocess_mode = _is_multiprocess()
|
||||
collectors = registry.get_collectors()
|
||||
|
||||
@app.get(path, include_in_schema=False)
|
||||
async def metrics_endpoint() -> Response:
|
||||
for collector in sync_collectors:
|
||||
collector.func()
|
||||
for collector in async_collectors:
|
||||
await collector.func()
|
||||
for collector in collectors:
|
||||
if asyncio.iscoroutinefunction(collector.func):
|
||||
await collector.func()
|
||||
else:
|
||||
collector.func()
|
||||
|
||||
if multiprocess_mode:
|
||||
if _is_multiprocess():
|
||||
prom_registry = CollectorRegistry()
|
||||
multiprocess.MultiProcessCollector(prom_registry)
|
||||
output = generate_latest(prom_registry)
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
"""Base Pydantic schemas for API responses."""
|
||||
|
||||
from enum import Enum
|
||||
from typing import Any, ClassVar, Generic
|
||||
from typing import Any, ClassVar, Generic, TypeVar
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
from .types import DataT
|
||||
|
||||
__all__ = [
|
||||
"ApiError",
|
||||
"CursorPagination",
|
||||
@@ -18,6 +16,8 @@ __all__ = [
|
||||
"ResponseStatus",
|
||||
]
|
||||
|
||||
DataT = TypeVar("DataT")
|
||||
|
||||
|
||||
class PydanticBase(BaseModel):
|
||||
"""Base class for all Pydantic models with common configuration."""
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
"""Shared type aliases for the fastapi-toolsets package."""
|
||||
|
||||
from collections.abc import AsyncGenerator, Callable, Mapping
|
||||
from typing import Any, TypeVar
|
||||
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import DeclarativeBase, QueryableAttribute
|
||||
from sqlalchemy.orm.attributes import InstrumentedAttribute
|
||||
from sqlalchemy.sql.elements import ColumnElement
|
||||
|
||||
# Generic TypeVars
|
||||
DataT = TypeVar("DataT")
|
||||
ModelType = TypeVar("ModelType", bound=DeclarativeBase)
|
||||
SchemaType = TypeVar("SchemaType", bound=BaseModel)
|
||||
|
||||
# CRUD type aliases
|
||||
JoinType = list[tuple[type[DeclarativeBase], Any]]
|
||||
M2MFieldType = Mapping[str, QueryableAttribute[Any]]
|
||||
OrderByClause = ColumnElement[Any] | QueryableAttribute[Any]
|
||||
|
||||
# Search / facet type aliases
|
||||
SearchFieldType = InstrumentedAttribute[Any] | tuple[InstrumentedAttribute[Any], ...]
|
||||
FacetFieldType = SearchFieldType
|
||||
|
||||
# Dependency type aliases
|
||||
SessionDependency = Callable[[], AsyncGenerator[AsyncSession, None]]
|
||||
@@ -92,15 +92,6 @@ class IntRole(Base):
|
||||
name: Mapped[str] = mapped_column(String(50), unique=True)
|
||||
|
||||
|
||||
class Permission(Base):
|
||||
"""Test model with composite primary key."""
|
||||
|
||||
__tablename__ = "permissions"
|
||||
|
||||
subject: Mapped[str] = mapped_column(String(50), primary_key=True)
|
||||
action: Mapped[str] = mapped_column(String(50), primary_key=True)
|
||||
|
||||
|
||||
class Event(Base):
|
||||
"""Test model with DateTime and Date cursor columns."""
|
||||
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
"""Tests for CRUD search functionality."""
|
||||
|
||||
import inspect
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.sql.elements import ColumnElement, UnaryExpression
|
||||
|
||||
from fastapi_toolsets.crud import (
|
||||
CrudFactory,
|
||||
@@ -13,7 +11,6 @@ from fastapi_toolsets.crud import (
|
||||
SearchConfig,
|
||||
get_searchable_fields,
|
||||
)
|
||||
from fastapi_toolsets.exceptions import InvalidOrderFieldError
|
||||
from fastapi_toolsets.schemas import OffsetPagination
|
||||
|
||||
from .conftest import (
|
||||
@@ -1049,144 +1046,3 @@ class TestFilterParamsSchema:
|
||||
|
||||
assert isinstance(result.pagination, OffsetPagination)
|
||||
assert result.pagination.total_count == 2
|
||||
|
||||
|
||||
class TestOrderParamsSchema:
|
||||
"""Tests for AsyncCrud.order_params()."""
|
||||
|
||||
def test_generates_order_by_and_order_params(self):
|
||||
"""Returned dependency has order_by and order query params."""
|
||||
UserOrderCrud = CrudFactory(User, order_fields=[User.username, User.email])
|
||||
dep = UserOrderCrud.order_params()
|
||||
|
||||
param_names = set(inspect.signature(dep).parameters)
|
||||
assert param_names == {"order_by", "order"}
|
||||
|
||||
def test_dependency_name_includes_model_name(self):
|
||||
"""Dependency function is named after the model."""
|
||||
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
|
||||
dep = UserOrderCrud.order_params()
|
||||
assert getattr(dep, "__name__") == "UserOrderParams"
|
||||
|
||||
def test_raises_when_no_order_fields(self):
|
||||
"""ValueError raised when no order_fields are configured or provided."""
|
||||
with pytest.raises(ValueError, match="no order_fields"):
|
||||
UserCrud.order_params()
|
||||
|
||||
def test_order_fields_override(self):
|
||||
"""order_fields= parameter overrides the class-level default."""
|
||||
UserOrderCrud = CrudFactory(User, order_fields=[User.username, User.email])
|
||||
dep = UserOrderCrud.order_params(order_fields=[User.email])
|
||||
|
||||
param_names = set(inspect.signature(dep).parameters)
|
||||
assert "order_by" in param_names
|
||||
# description should only mention email, not username
|
||||
sig = inspect.signature(dep)
|
||||
description = sig.parameters["order_by"].default.description
|
||||
assert "email" in description
|
||||
assert "username" not in description
|
||||
|
||||
def test_order_by_description_lists_valid_fields(self):
|
||||
"""order_by query param description mentions each allowed field."""
|
||||
UserOrderCrud = CrudFactory(User, order_fields=[User.username, User.email])
|
||||
dep = UserOrderCrud.order_params()
|
||||
|
||||
sig = inspect.signature(dep)
|
||||
description = sig.parameters["order_by"].default.description
|
||||
assert "username" in description
|
||||
assert "email" in description
|
||||
|
||||
def test_default_order_reflected_in_order_default(self):
|
||||
"""default_order is used as the default value for order."""
|
||||
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
|
||||
dep_asc = UserOrderCrud.order_params(default_order="asc")
|
||||
dep_desc = UserOrderCrud.order_params(default_order="desc")
|
||||
|
||||
sig_asc = inspect.signature(dep_asc)
|
||||
sig_desc = inspect.signature(dep_desc)
|
||||
assert sig_asc.parameters["order"].default.default == "asc"
|
||||
assert sig_desc.parameters["order"].default.default == "desc"
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_no_order_by_no_default_returns_none(self):
|
||||
"""Returns None when order_by is absent and no default_field is set."""
|
||||
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
|
||||
dep = UserOrderCrud.order_params()
|
||||
result = await dep(order_by=None, order="asc")
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_no_order_by_with_default_field_returns_asc_expression(self):
|
||||
"""Returns default_field.asc() when order_by absent and order=asc."""
|
||||
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
|
||||
dep = UserOrderCrud.order_params(default_field=User.username)
|
||||
result = await dep(order_by=None, order="asc")
|
||||
assert isinstance(result, UnaryExpression)
|
||||
assert "ASC" in str(result)
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_no_order_by_with_default_field_returns_desc_expression(self):
|
||||
"""Returns default_field.desc() when order_by absent and order=desc."""
|
||||
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
|
||||
dep = UserOrderCrud.order_params(default_field=User.username)
|
||||
result = await dep(order_by=None, order="desc")
|
||||
assert isinstance(result, UnaryExpression)
|
||||
assert "DESC" in str(result)
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_valid_order_by_asc(self):
|
||||
"""Returns field.asc() for a valid order_by with order=asc."""
|
||||
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
|
||||
dep = UserOrderCrud.order_params()
|
||||
result = await dep(order_by="username", order="asc")
|
||||
assert isinstance(result, UnaryExpression)
|
||||
assert "ASC" in str(result)
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_valid_order_by_desc(self):
|
||||
"""Returns field.desc() for a valid order_by with order=desc."""
|
||||
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
|
||||
dep = UserOrderCrud.order_params()
|
||||
result = await dep(order_by="username", order="desc")
|
||||
assert isinstance(result, UnaryExpression)
|
||||
assert "DESC" in str(result)
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_invalid_order_by_raises_invalid_order_field_error(self):
|
||||
"""Raises InvalidOrderFieldError for an unknown order_by value."""
|
||||
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
|
||||
dep = UserOrderCrud.order_params()
|
||||
with pytest.raises(InvalidOrderFieldError) as exc_info:
|
||||
await dep(order_by="nonexistent", order="asc")
|
||||
assert exc_info.value.field == "nonexistent"
|
||||
assert "username" in exc_info.value.valid_fields
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_multiple_fields_all_resolve(self):
|
||||
"""All configured fields resolve correctly via order_by."""
|
||||
UserOrderCrud = CrudFactory(User, order_fields=[User.username, User.email])
|
||||
dep = UserOrderCrud.order_params()
|
||||
result_username = await dep(order_by="username", order="asc")
|
||||
result_email = await dep(order_by="email", order="desc")
|
||||
assert isinstance(result_username, ColumnElement)
|
||||
assert isinstance(result_email, ColumnElement)
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_order_params_integrates_with_get_multi(
|
||||
self, db_session: AsyncSession
|
||||
):
|
||||
"""order_params output is accepted by get_multi(order_by=...)."""
|
||||
UserOrderCrud = CrudFactory(User, order_fields=[User.username])
|
||||
await UserCrud.create(
|
||||
db_session, UserCreate(username="charlie", email="c@test.com")
|
||||
)
|
||||
await UserCrud.create(
|
||||
db_session, UserCreate(username="alice", email="a@test.com")
|
||||
)
|
||||
|
||||
dep = UserOrderCrud.order_params()
|
||||
order_by = await dep(order_by="username", order="asc")
|
||||
results = await UserOrderCrud.get_multi(db_session, order_by=order_by)
|
||||
|
||||
assert results[0].username == "alice"
|
||||
assert results[1].username == "charlie"
|
||||
|
||||
@@ -15,14 +15,12 @@ from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_asyn
|
||||
from docs_src.examples.pagination_search.db import get_db
|
||||
from docs_src.examples.pagination_search.models import Article, Base, Category
|
||||
from docs_src.examples.pagination_search.routes import router
|
||||
from fastapi_toolsets.exceptions import init_exceptions_handlers
|
||||
|
||||
from .conftest import DATABASE_URL
|
||||
|
||||
|
||||
def build_app(session: AsyncSession) -> FastAPI:
|
||||
app = FastAPI()
|
||||
init_exceptions_handlers(app)
|
||||
|
||||
async def override_get_db():
|
||||
yield session
|
||||
@@ -271,125 +269,3 @@ class TestCursorPagination:
|
||||
body = resp.json()
|
||||
assert len(body["data"]) == 1
|
||||
assert body["data"][0]["title"] == "SQLAlchemy async"
|
||||
|
||||
|
||||
class TestOffsetSorting:
|
||||
"""Tests for order_by / order query parameters on the offset endpoint."""
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_default_order_uses_created_at_asc(
|
||||
self, client: AsyncClient, ex_db_session
|
||||
):
|
||||
"""No order_by → default field (created_at) ASC."""
|
||||
await seed(ex_db_session)
|
||||
|
||||
resp = await client.get("/articles/offset")
|
||||
|
||||
assert resp.status_code == 200
|
||||
titles = [a["title"] for a in resp.json()["data"]]
|
||||
assert titles == ["FastAPI tips", "SQLAlchemy async", "Draft notes"]
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_order_by_title_asc(self, client: AsyncClient, ex_db_session):
|
||||
"""order_by=title&order=asc returns alphabetical order."""
|
||||
await seed(ex_db_session)
|
||||
|
||||
resp = await client.get("/articles/offset?order_by=title&order=asc")
|
||||
|
||||
assert resp.status_code == 200
|
||||
titles = [a["title"] for a in resp.json()["data"]]
|
||||
assert titles == ["Draft notes", "FastAPI tips", "SQLAlchemy async"]
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_order_by_title_desc(self, client: AsyncClient, ex_db_session):
|
||||
"""order_by=title&order=desc returns reverse alphabetical order."""
|
||||
await seed(ex_db_session)
|
||||
|
||||
resp = await client.get("/articles/offset?order_by=title&order=desc")
|
||||
|
||||
assert resp.status_code == 200
|
||||
titles = [a["title"] for a in resp.json()["data"]]
|
||||
assert titles == ["SQLAlchemy async", "FastAPI tips", "Draft notes"]
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_order_by_created_at_desc(self, client: AsyncClient, ex_db_session):
|
||||
"""order_by=created_at&order=desc returns newest-first."""
|
||||
await seed(ex_db_session)
|
||||
|
||||
resp = await client.get("/articles/offset?order_by=created_at&order=desc")
|
||||
|
||||
assert resp.status_code == 200
|
||||
titles = [a["title"] for a in resp.json()["data"]]
|
||||
assert titles == ["Draft notes", "SQLAlchemy async", "FastAPI tips"]
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_invalid_order_by_returns_422(
|
||||
self, client: AsyncClient, ex_db_session
|
||||
):
|
||||
"""Unknown order_by field returns 422 with SORT-422 error code."""
|
||||
resp = await client.get("/articles/offset?order_by=nonexistent_field")
|
||||
|
||||
assert resp.status_code == 422
|
||||
body = resp.json()
|
||||
assert body["error_code"] == "SORT-422"
|
||||
assert body["status"] == "FAIL"
|
||||
|
||||
|
||||
class TestCursorSorting:
|
||||
"""Tests for order_by / order query parameters on the cursor endpoint.
|
||||
|
||||
In cursor_paginate the cursor_column is always the primary sort; order_by
|
||||
acts as a secondary tiebreaker. With the seeded articles (all having unique
|
||||
created_at values) the overall ordering is always created_at ASC regardless
|
||||
of the order_by value — only the valid/invalid field check and the response
|
||||
shape are meaningful here.
|
||||
"""
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_default_order_uses_created_at_asc(
|
||||
self, client: AsyncClient, ex_db_session
|
||||
):
|
||||
"""No order_by → default field (created_at) ASC."""
|
||||
await seed(ex_db_session)
|
||||
|
||||
resp = await client.get("/articles/cursor")
|
||||
|
||||
assert resp.status_code == 200
|
||||
titles = [a["title"] for a in resp.json()["data"]]
|
||||
assert titles == ["FastAPI tips", "SQLAlchemy async", "Draft notes"]
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_order_by_title_asc_accepted(
|
||||
self, client: AsyncClient, ex_db_session
|
||||
):
|
||||
"""order_by=title is a valid field — request succeeds and returns all articles."""
|
||||
await seed(ex_db_session)
|
||||
|
||||
resp = await client.get("/articles/cursor?order_by=title&order=asc")
|
||||
|
||||
assert resp.status_code == 200
|
||||
assert len(resp.json()["data"]) == 3
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_order_by_title_desc_accepted(
|
||||
self, client: AsyncClient, ex_db_session
|
||||
):
|
||||
"""order_by=title&order=desc is valid — request succeeds and returns all articles."""
|
||||
await seed(ex_db_session)
|
||||
|
||||
resp = await client.get("/articles/cursor?order_by=title&order=desc")
|
||||
|
||||
assert resp.status_code == 200
|
||||
assert len(resp.json()["data"]) == 3
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_invalid_order_by_returns_422(
|
||||
self, client: AsyncClient, ex_db_session
|
||||
):
|
||||
"""Unknown order_by field returns 422 with SORT-422 error code."""
|
||||
resp = await client.get("/articles/cursor?order_by=nonexistent_field")
|
||||
|
||||
assert resp.status_code == 422
|
||||
body = resp.json()
|
||||
assert body["error_code"] == "SORT-422"
|
||||
assert body["status"] == "FAIL"
|
||||
|
||||
@@ -8,7 +8,6 @@ from fastapi_toolsets.exceptions import (
|
||||
ApiException,
|
||||
ConflictError,
|
||||
ForbiddenError,
|
||||
InvalidOrderFieldError,
|
||||
NotFoundError,
|
||||
UnauthorizedError,
|
||||
generate_error_responses,
|
||||
@@ -335,43 +334,3 @@ class TestExceptionIntegration:
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"id": 1}
|
||||
|
||||
|
||||
class TestInvalidOrderFieldError:
|
||||
"""Tests for InvalidOrderFieldError exception."""
|
||||
|
||||
def test_api_error_attributes(self):
|
||||
"""InvalidOrderFieldError has correct api_error metadata."""
|
||||
assert InvalidOrderFieldError.api_error.code == 422
|
||||
assert InvalidOrderFieldError.api_error.err_code == "SORT-422"
|
||||
assert InvalidOrderFieldError.api_error.msg == "Invalid Order Field"
|
||||
|
||||
def test_stores_field_and_valid_fields(self):
|
||||
"""InvalidOrderFieldError stores field and valid_fields on the instance."""
|
||||
error = InvalidOrderFieldError("unknown", ["name", "created_at"])
|
||||
assert error.field == "unknown"
|
||||
assert error.valid_fields == ["name", "created_at"]
|
||||
|
||||
def test_message_contains_field_and_valid_fields(self):
|
||||
"""Exception message mentions the bad field and valid options."""
|
||||
error = InvalidOrderFieldError("bad_field", ["name", "email"])
|
||||
assert "bad_field" in str(error)
|
||||
assert "name" in str(error)
|
||||
assert "email" in str(error)
|
||||
|
||||
def test_handled_as_422_by_exception_handler(self):
|
||||
"""init_exceptions_handlers turns InvalidOrderFieldError into a 422 response."""
|
||||
app = FastAPI()
|
||||
init_exceptions_handlers(app)
|
||||
|
||||
@app.get("/items")
|
||||
async def list_items():
|
||||
raise InvalidOrderFieldError("bad", ["name"])
|
||||
|
||||
client = TestClient(app)
|
||||
response = client.get("/items")
|
||||
|
||||
assert response.status_code == 422
|
||||
data = response.json()
|
||||
assert data["error_code"] == "SORT-422"
|
||||
assert data["status"] == "FAIL"
|
||||
|
||||
@@ -14,9 +14,7 @@ from fastapi_toolsets.fixtures import (
|
||||
load_fixtures_by_context,
|
||||
)
|
||||
|
||||
from fastapi_toolsets.fixtures.utils import _get_primary_key
|
||||
|
||||
from .conftest import IntRole, Permission, Role, User
|
||||
from .conftest import Role, User
|
||||
|
||||
|
||||
class TestContext:
|
||||
@@ -599,46 +597,6 @@ class TestLoadFixtures:
|
||||
count = await RoleCrud.count(db_session)
|
||||
assert count == 2
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_skip_existing_skips_if_record_exists(self, db_session: AsyncSession):
|
||||
"""SKIP_EXISTING returns empty loaded list when the record already exists."""
|
||||
registry = FixtureRegistry()
|
||||
role_id = uuid.uuid4()
|
||||
|
||||
@registry.register
|
||||
def roles():
|
||||
return [Role(id=role_id, name="admin")]
|
||||
|
||||
# First load — inserts the record.
|
||||
result1 = await load_fixtures(
|
||||
db_session, registry, "roles", strategy=LoadStrategy.SKIP_EXISTING
|
||||
)
|
||||
assert len(result1["roles"]) == 1
|
||||
|
||||
# Remove from identity map so session.get() queries the DB in the second load.
|
||||
db_session.expunge_all()
|
||||
|
||||
# Second load — record exists in DB, nothing should be added.
|
||||
result2 = await load_fixtures(
|
||||
db_session, registry, "roles", strategy=LoadStrategy.SKIP_EXISTING
|
||||
)
|
||||
assert result2["roles"] == []
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_skip_existing_null_pk_inserts(self, db_session: AsyncSession):
|
||||
"""SKIP_EXISTING inserts when the instance has no PK set (auto-increment)."""
|
||||
registry = FixtureRegistry()
|
||||
|
||||
@registry.register
|
||||
def int_roles():
|
||||
# No id provided — PK is None before INSERT (autoincrement).
|
||||
return [IntRole(name="member")]
|
||||
|
||||
result = await load_fixtures(
|
||||
db_session, registry, "int_roles", strategy=LoadStrategy.SKIP_EXISTING
|
||||
)
|
||||
assert len(result["int_roles"]) == 1
|
||||
|
||||
|
||||
class TestLoadFixturesByContext:
|
||||
"""Tests for load_fixtures_by_context function."""
|
||||
@@ -797,19 +755,3 @@ class TestGetObjByAttr:
|
||||
"""Raises StopIteration when value type doesn't match."""
|
||||
with pytest.raises(StopIteration):
|
||||
get_obj_by_attr(self.roles, "id", "not-a-uuid")
|
||||
|
||||
|
||||
class TestGetPrimaryKey:
|
||||
"""Unit tests for the _get_primary_key helper (composite PK paths)."""
|
||||
|
||||
def test_composite_pk_all_set(self):
|
||||
"""Returns a tuple when all composite PK values are set."""
|
||||
instance = Permission(subject="post", action="read")
|
||||
pk = _get_primary_key(instance)
|
||||
assert pk == ("post", "read")
|
||||
|
||||
def test_composite_pk_partial_none(self):
|
||||
"""Returns None when any composite PK value is None."""
|
||||
instance = Permission(subject="post") # action is None
|
||||
pk = _get_primary_key(instance)
|
||||
assert pk is None
|
||||
|
||||
Reference in New Issue
Block a user