mirror of
https://github.com/d3vyce/fastapi-toolsets.git
synced 2026-04-16 14:46:24 +02:00
Compare commits
3 Commits
v2.3.0
...
1a863b7032
| Author | SHA1 | Date | |
|---|---|---|---|
|
1a863b7032
|
|||
|
aca6dd298a
|
|||
|
2e7f879544
|
@@ -48,8 +48,8 @@ uv add "fastapi-toolsets[all]"
|
||||
- **Database**: Session management, transaction helpers, table locking, and polling-based row change detection
|
||||
- **Dependencies**: FastAPI dependency factories (`PathDependency`, `BodyDependency`) for automatic DB lookups from path or body parameters
|
||||
- **Fixtures**: Fixture system with dependency management, context support, and pytest integration
|
||||
- **Model Mixins**: SQLAlchemy mixins for common column patterns (`UUIDMixin`, `UUIDv7Mixin`, `CreatedAtMixin`, `UpdatedAtMixin`, `TimestampMixin`)
|
||||
- **Standardized API Responses**: Consistent response format with `Response`, `ErrorResponse`, `PaginatedResponse`, `CursorPaginatedResponse` and `OffsetPaginatedResponse`.
|
||||
- **Model Mixins**: SQLAlchemy mixins for common column patterns (`UUIDMixin`, `CreatedAtMixin`, `UpdatedAtMixin`, `TimestampMixin`)
|
||||
- **Standardized API Responses**: Consistent response format with `Response`, `PaginatedResponse`, and `PydanticBase`
|
||||
- **Exception Handling**: Structured error responses with automatic OpenAPI documentation
|
||||
- **Logging**: Logging configuration with uvicorn integration via `configure_logging` and `get_logger`
|
||||
|
||||
|
||||
@@ -48,8 +48,8 @@ uv add "fastapi-toolsets[all]"
|
||||
- **Database**: Session management, transaction helpers, table locking, and polling-based row change detection
|
||||
- **Dependencies**: FastAPI dependency factories (`PathDependency`, `BodyDependency`) for automatic DB lookups from path or body parameters
|
||||
- **Fixtures**: Fixture system with dependency management, context support, and pytest integration
|
||||
- **Model Mixins**: SQLAlchemy mixins for common column patterns (`UUIDMixin`, `UUIDv7Mixin`, `CreatedAtMixin`, `UpdatedAtMixin`, `TimestampMixin`)
|
||||
- **Standardized API Responses**: Consistent response format with `Response`, `ErrorResponse`, `PaginatedResponse`, `CursorPaginatedResponse` and `OffsetPaginatedResponse`.
|
||||
- **Model Mixins**: SQLAlchemy mixins for common column patterns (`UUIDMixin`, `CreatedAtMixin`, `UpdatedAtMixin`, `TimestampMixin`)
|
||||
- **Standardized API Responses**: Consistent response format with `Response`, `PaginatedResponse`, and `PydanticBase`
|
||||
- **Exception Handling**: Structured error responses with automatic OpenAPI documentation
|
||||
- **Logging**: Logging configuration with uvicorn integration via `configure_logging` and `get_logger`
|
||||
|
||||
|
||||
@@ -18,15 +18,13 @@ class Article(Base, UUIDMixin, TimestampMixin):
|
||||
content: Mapped[str]
|
||||
```
|
||||
|
||||
All timestamp columns are timezone-aware (`TIMESTAMPTZ`). All defaults are server-side (`clock_timestamp()`), so they are also applied when inserting rows via raw SQL outside the ORM.
|
||||
All timestamp columns are timezone-aware (`TIMESTAMPTZ`). All defaults are server-side, so they are also applied when inserting rows via raw SQL outside the ORM.
|
||||
|
||||
## Mixins
|
||||
|
||||
### [`UUIDMixin`](../reference/models.md#fastapi_toolsets.models.UUIDMixin)
|
||||
|
||||
Adds a `id: UUID` primary key generated server-side by PostgreSQL using `gen_random_uuid()`. The value is retrieved via `RETURNING` after insert, so it is available on the Python object immediately after `flush()`.
|
||||
|
||||
!!! warning "Requires PostgreSQL 13+"
|
||||
Adds a `id: UUID` primary key generated server-side by PostgreSQL using `gen_random_uuid()` (requires PostgreSQL 13+). The value is retrieved via `RETURNING` after insert, so it is available on the Python object immediately after `flush()`.
|
||||
|
||||
```python
|
||||
from fastapi_toolsets.models import UUIDMixin
|
||||
@@ -38,37 +36,13 @@ class User(Base, UUIDMixin):
|
||||
|
||||
# id is None before flush
|
||||
user = User(username="alice")
|
||||
session.add(user)
|
||||
await session.flush()
|
||||
print(user.id) # UUID('...')
|
||||
```
|
||||
|
||||
### [`UUIDv7Mixin`](../reference/models.md#fastapi_toolsets.models.UUIDv7Mixin)
|
||||
|
||||
!!! info "Added in `v2.3`"
|
||||
|
||||
Adds a `id: UUID` primary key generated server-side by PostgreSQL using `uuidv7()`. It's a time-ordered UUID format that encodes a millisecond-precision timestamp in the most significant bits, making it naturally sortable and index-friendly.
|
||||
|
||||
!!! warning "Requires PostgreSQL 18+"
|
||||
|
||||
```python
|
||||
from fastapi_toolsets.models import UUIDv7Mixin
|
||||
|
||||
class Event(Base, UUIDv7Mixin):
|
||||
__tablename__ = "events"
|
||||
|
||||
name: Mapped[str]
|
||||
|
||||
# id is None before flush
|
||||
event = Event(name="user.signup")
|
||||
session.add(event)
|
||||
await session.flush()
|
||||
print(event.id) # UUID('019...')
|
||||
```
|
||||
|
||||
### [`CreatedAtMixin`](../reference/models.md#fastapi_toolsets.models.CreatedAtMixin)
|
||||
|
||||
Adds a `created_at: datetime` column set to `clock_timestamp()` on insert. The column has no `onupdate` hook — it is intentionally immutable after the row is created.
|
||||
Adds a `created_at: datetime` column set to `NOW()` on insert. The column has no `onupdate` hook — it is intentionally immutable after the row is created.
|
||||
|
||||
```python
|
||||
from fastapi_toolsets.models import UUIDMixin, CreatedAtMixin
|
||||
@@ -81,7 +55,7 @@ class Order(Base, UUIDMixin, CreatedAtMixin):
|
||||
|
||||
### [`UpdatedAtMixin`](../reference/models.md#fastapi_toolsets.models.UpdatedAtMixin)
|
||||
|
||||
Adds an `updated_at: datetime` column set to `clock_timestamp()` on insert and automatically updated to `clock_timestamp()` on every ORM-level update (via SQLAlchemy's `onupdate` hook).
|
||||
Adds an `updated_at: datetime` column set to `NOW()` on insert and automatically updated to `NOW()` on every ORM-level update (via SQLAlchemy's `onupdate` hook).
|
||||
|
||||
```python
|
||||
from fastapi_toolsets.models import UUIDMixin, UpdatedAtMixin
|
||||
|
||||
@@ -102,9 +102,7 @@ async def list_events(
|
||||
|
||||
#### [`PaginatedResponse[T]`](../reference/schemas.md#fastapi_toolsets.schemas.PaginatedResponse)
|
||||
|
||||
Return type for endpoints that support **both** pagination strategies via a `pagination_type` query parameter (using [`paginate()`](crud.md#unified-paginate--both-strategies-on-one-endpoint)).
|
||||
|
||||
When used as a return annotation, `PaginatedResponse[T]` automatically expands to `Annotated[Union[CursorPaginatedResponse[T], OffsetPaginatedResponse[T]], Field(discriminator="pagination_type")]`, so FastAPI emits a proper `oneOf` + discriminator in the OpenAPI schema with no extra boilerplate:
|
||||
Base class and return type for endpoints that support **both** pagination strategies via a `pagination_type` query parameter (using [`paginate()`](crud.md#unified-paginate--both-strategies-on-one-endpoint))
|
||||
|
||||
```python
|
||||
from fastapi_toolsets.crud import PaginationType
|
||||
|
||||
@@ -7,7 +7,6 @@ You can import them directly from `fastapi_toolsets.models`:
|
||||
```python
|
||||
from fastapi_toolsets.models import (
|
||||
UUIDMixin,
|
||||
UUIDv7Mixin,
|
||||
CreatedAtMixin,
|
||||
UpdatedAtMixin,
|
||||
TimestampMixin,
|
||||
@@ -16,8 +15,6 @@ from fastapi_toolsets.models import (
|
||||
|
||||
## ::: fastapi_toolsets.models.UUIDMixin
|
||||
|
||||
## ::: fastapi_toolsets.models.UUIDv7Mixin
|
||||
|
||||
## ::: fastapi_toolsets.models.CreatedAtMixin
|
||||
|
||||
## ::: fastapi_toolsets.models.UpdatedAtMixin
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
from sqlalchemy import Boolean, ForeignKey, String, Text
|
||||
from sqlalchemy import Boolean, DateTime, ForeignKey, String, Text, func
|
||||
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
|
||||
|
||||
from fastapi_toolsets.models import CreatedAtMixin
|
||||
|
||||
|
||||
class Base(DeclarativeBase):
|
||||
pass
|
||||
@@ -19,10 +18,13 @@ class Category(Base):
|
||||
articles: Mapped[list["Article"]] = relationship(back_populates="category")
|
||||
|
||||
|
||||
class Article(Base, CreatedAtMixin):
|
||||
class Article(Base):
|
||||
__tablename__ = "articles"
|
||||
|
||||
id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
|
||||
created_at: Mapped[datetime.datetime] = mapped_column(
|
||||
DateTime(timezone=True), server_default=func.now()
|
||||
)
|
||||
title: Mapped[str] = mapped_column(String(256))
|
||||
body: Mapped[str] = mapped_column(Text)
|
||||
status: Mapped[str] = mapped_column(String(32))
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "fastapi-toolsets"
|
||||
version = "2.3.0"
|
||||
version = "2.2.1"
|
||||
description = "Production-ready utilities for FastAPI applications"
|
||||
readme = "README.md"
|
||||
license = "MIT"
|
||||
|
||||
@@ -21,4 +21,4 @@ Example usage:
|
||||
return Response(data={"user": user.username}, message="Success")
|
||||
"""
|
||||
|
||||
__version__ = "2.3.0"
|
||||
__version__ = "2.2.1"
|
||||
|
||||
@@ -9,7 +9,6 @@ import uuid as uuid_module
|
||||
from collections.abc import Awaitable, Callable, Sequence
|
||||
from datetime import date, datetime
|
||||
from decimal import Decimal
|
||||
from enum import Enum
|
||||
from typing import Any, ClassVar, Generic, Literal, Self, cast, overload
|
||||
|
||||
from fastapi import Query
|
||||
@@ -50,43 +49,14 @@ from .search import (
|
||||
)
|
||||
|
||||
|
||||
class _CursorDirection(str, Enum):
|
||||
NEXT = "next"
|
||||
PREV = "prev"
|
||||
def _encode_cursor(value: Any) -> str:
|
||||
"""Encode cursor column value as an base64 string."""
|
||||
return base64.b64encode(json.dumps(str(value)).encode()).decode()
|
||||
|
||||
|
||||
def _encode_cursor(
|
||||
value: Any, *, direction: _CursorDirection = _CursorDirection.NEXT
|
||||
) -> str:
|
||||
"""Encode a cursor column value and navigation direction as a base64 string."""
|
||||
return base64.b64encode(
|
||||
json.dumps({"val": str(value), "dir": direction}).encode()
|
||||
).decode()
|
||||
|
||||
|
||||
def _decode_cursor(cursor: str) -> tuple[str, _CursorDirection]:
|
||||
"""Decode a cursor base64 string into ``(raw_value, direction)``."""
|
||||
payload = json.loads(base64.b64decode(cursor.encode()).decode())
|
||||
return payload["val"], _CursorDirection(payload["dir"])
|
||||
|
||||
|
||||
def _parse_cursor_value(raw_val: str, col_type: Any) -> Any:
|
||||
"""Parse a raw cursor string value back into the appropriate Python type."""
|
||||
if isinstance(col_type, Integer):
|
||||
return int(raw_val)
|
||||
if isinstance(col_type, Uuid):
|
||||
return uuid_module.UUID(raw_val)
|
||||
if isinstance(col_type, DateTime):
|
||||
return datetime.fromisoformat(raw_val)
|
||||
if isinstance(col_type, Date):
|
||||
return date.fromisoformat(raw_val)
|
||||
if isinstance(col_type, (Float, Numeric)):
|
||||
return Decimal(raw_val)
|
||||
raise ValueError(
|
||||
f"Unsupported cursor column type: {type(col_type).__name__!r}. "
|
||||
"Supported types: Integer, BigInteger, SmallInteger, Uuid, "
|
||||
"DateTime, Date, Float, Numeric."
|
||||
)
|
||||
def _decode_cursor(cursor: str) -> str:
|
||||
"""Decode cursor base64 string."""
|
||||
return json.loads(base64.b64decode(cursor.encode()).decode())
|
||||
|
||||
|
||||
def _apply_joins(q: Any, joins: JoinType | None, outer_join: bool) -> Any:
|
||||
@@ -135,9 +105,8 @@ class AsyncCrud(Generic[ModelType]):
|
||||
if raw_fields is None:
|
||||
cls.searchable_fields = [pk_col]
|
||||
else:
|
||||
if not any(
|
||||
not isinstance(f, tuple) and f.key == pk_key for f in raw_fields
|
||||
):
|
||||
existing_keys = {f.key for f in raw_fields if not isinstance(f, tuple)}
|
||||
if pk_key not in existing_keys:
|
||||
cls.searchable_fields = [pk_col, *raw_fields]
|
||||
|
||||
@classmethod
|
||||
@@ -1076,15 +1045,26 @@ class AsyncCrud(Generic[ModelType]):
|
||||
cursor_column: Any = cls.cursor_column
|
||||
cursor_col_name: str = cursor_column.key
|
||||
|
||||
direction = _CursorDirection.NEXT
|
||||
if cursor is not None:
|
||||
raw_val, direction = _decode_cursor(cursor)
|
||||
raw_val = _decode_cursor(cursor)
|
||||
col_type = cursor_column.property.columns[0].type
|
||||
cursor_val: Any = _parse_cursor_value(raw_val, col_type)
|
||||
if direction is _CursorDirection.PREV:
|
||||
filters.append(cursor_column < cursor_val)
|
||||
if isinstance(col_type, Integer):
|
||||
cursor_val: Any = int(raw_val)
|
||||
elif isinstance(col_type, Uuid):
|
||||
cursor_val = uuid_module.UUID(raw_val)
|
||||
elif isinstance(col_type, DateTime):
|
||||
cursor_val = datetime.fromisoformat(raw_val)
|
||||
elif isinstance(col_type, Date):
|
||||
cursor_val = date.fromisoformat(raw_val)
|
||||
elif isinstance(col_type, (Float, Numeric)):
|
||||
cursor_val = Decimal(raw_val)
|
||||
else:
|
||||
filters.append(cursor_column > cursor_val)
|
||||
raise ValueError(
|
||||
f"Unsupported cursor column type: {type(col_type).__name__!r}. "
|
||||
"Supported types: Integer, BigInteger, SmallInteger, Uuid, "
|
||||
"DateTime, Date, Float, Numeric."
|
||||
)
|
||||
filters.append(cursor_column > cursor_val)
|
||||
|
||||
# Build search filters
|
||||
if search:
|
||||
@@ -1111,15 +1091,12 @@ class AsyncCrud(Generic[ModelType]):
|
||||
if resolved := cls._resolve_load_options(load_options):
|
||||
q = q.options(*resolved)
|
||||
|
||||
# Cursor column is always the primary sort; reverse direction for prev traversal
|
||||
if direction is _CursorDirection.PREV:
|
||||
q = q.order_by(cursor_column.desc())
|
||||
else:
|
||||
q = q.order_by(cursor_column)
|
||||
# Cursor column is always the primary sort
|
||||
q = q.order_by(cursor_column)
|
||||
if order_by is not None:
|
||||
q = q.order_by(order_by)
|
||||
|
||||
# Fetch one extra to detect whether another page exists in this direction
|
||||
# Fetch one extra to detect whether a next page exists
|
||||
q = q.limit(items_per_page + 1)
|
||||
result = await session.execute(q)
|
||||
raw_items = cast(list[ModelType], result.unique().scalars().all())
|
||||
@@ -1127,36 +1104,15 @@ class AsyncCrud(Generic[ModelType]):
|
||||
has_more = len(raw_items) > items_per_page
|
||||
items_page = raw_items[:items_per_page]
|
||||
|
||||
# Restore ascending order when traversing backward
|
||||
if direction is _CursorDirection.PREV:
|
||||
items_page = list(reversed(items_page))
|
||||
|
||||
# next_cursor: points past the last item in ascending order
|
||||
# next_cursor points past the last item on this page
|
||||
next_cursor: str | None = None
|
||||
if direction is _CursorDirection.NEXT:
|
||||
if has_more and items_page:
|
||||
next_cursor = _encode_cursor(
|
||||
getattr(items_page[-1], cursor_col_name),
|
||||
direction=_CursorDirection.NEXT,
|
||||
)
|
||||
else:
|
||||
# Going backward: always provide a next_cursor to allow returning forward
|
||||
if items_page:
|
||||
next_cursor = _encode_cursor(
|
||||
getattr(items_page[-1], cursor_col_name),
|
||||
direction=_CursorDirection.NEXT,
|
||||
)
|
||||
if has_more and items_page:
|
||||
next_cursor = _encode_cursor(getattr(items_page[-1], cursor_col_name))
|
||||
|
||||
# prev_cursor: points before the first item in ascending order
|
||||
# prev_cursor points to the first item on this page or None when on the first page
|
||||
prev_cursor: str | None = None
|
||||
if direction is _CursorDirection.NEXT and cursor is not None and items_page:
|
||||
prev_cursor = _encode_cursor(
|
||||
getattr(items_page[0], cursor_col_name), direction=_CursorDirection.PREV
|
||||
)
|
||||
elif direction is _CursorDirection.PREV and has_more and items_page:
|
||||
prev_cursor = _encode_cursor(
|
||||
getattr(items_page[0], cursor_col_name), direction=_CursorDirection.PREV
|
||||
)
|
||||
if cursor is not None and items_page:
|
||||
prev_cursor = _encode_cursor(getattr(items_page[0], cursor_col_name))
|
||||
|
||||
items: list[Any] = [schema.model_validate(item) for item in items_page]
|
||||
|
||||
|
||||
@@ -3,12 +3,11 @@
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import DateTime, Uuid, text
|
||||
from sqlalchemy import DateTime, Uuid, func, text
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
__all__ = [
|
||||
"UUIDMixin",
|
||||
"UUIDv7Mixin",
|
||||
"CreatedAtMixin",
|
||||
"UpdatedAtMixin",
|
||||
"TimestampMixin",
|
||||
@@ -25,22 +24,12 @@ class UUIDMixin:
|
||||
)
|
||||
|
||||
|
||||
class UUIDv7Mixin:
|
||||
"""Mixin that adds a UUIDv7 primary key auto-generated by the database."""
|
||||
|
||||
id: Mapped[uuid.UUID] = mapped_column(
|
||||
Uuid,
|
||||
primary_key=True,
|
||||
server_default=text("uuidv7()"),
|
||||
)
|
||||
|
||||
|
||||
class CreatedAtMixin:
|
||||
"""Mixin that adds a ``created_at`` timestamp column."""
|
||||
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
server_default=text("clock_timestamp()"),
|
||||
server_default=func.now(),
|
||||
)
|
||||
|
||||
|
||||
@@ -49,8 +38,8 @@ class UpdatedAtMixin:
|
||||
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
server_default=text("clock_timestamp()"),
|
||||
onupdate=text("clock_timestamp()"),
|
||||
server_default=func.now(),
|
||||
onupdate=func.now(),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
"""Base Pydantic schemas for API responses."""
|
||||
|
||||
from enum import Enum
|
||||
from typing import Annotated, Any, ClassVar, Generic, Literal, TypeVar, Union
|
||||
from typing import Any, ClassVar, Generic, Literal
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
from .types import DataT
|
||||
|
||||
@@ -138,11 +138,9 @@ class PaginatedResponse(BaseResponse, Generic[DataT]):
|
||||
|
||||
Base class and return type for endpoints that support both pagination
|
||||
strategies. Use :class:`OffsetPaginatedResponse` or
|
||||
:class:`CursorPaginatedResponse` when the strategy is fixed.
|
||||
|
||||
When used as ``PaginatedResponse[T]`` in a return annotation, subscripting
|
||||
returns ``Annotated[Union[CursorPaginatedResponse[T], OffsetPaginatedResponse[T]], Field(discriminator="pagination_type")]``
|
||||
so FastAPI emits a proper ``oneOf`` + discriminator in the OpenAPI schema.
|
||||
:class:`CursorPaginatedResponse` when the strategy is fixed; use
|
||||
``PaginatedResponse`` as the return annotation for unified endpoints that
|
||||
dispatch via :meth:`~fastapi_toolsets.crud.factory.AsyncCrud.paginate`.
|
||||
"""
|
||||
|
||||
data: list[DataT]
|
||||
@@ -150,22 +148,6 @@ class PaginatedResponse(BaseResponse, Generic[DataT]):
|
||||
pagination_type: PaginationType | None = None
|
||||
filter_attributes: dict[str, list[Any]] | None = None
|
||||
|
||||
_discriminated_union_cache: ClassVar[dict[Any, Any]] = {}
|
||||
|
||||
def __class_getitem__( # type: ignore[invalid-method-override]
|
||||
cls, item: type[Any] | tuple[type[Any], ...]
|
||||
) -> type[Any]:
|
||||
if cls is PaginatedResponse and not isinstance(item, TypeVar):
|
||||
cached = cls._discriminated_union_cache.get(item)
|
||||
if cached is None:
|
||||
cached = Annotated[
|
||||
Union[CursorPaginatedResponse[item], OffsetPaginatedResponse[item]], # type: ignore[invalid-type-form]
|
||||
Field(discriminator="pagination_type"),
|
||||
]
|
||||
cls._discriminated_union_cache[item] = cached
|
||||
return cached # type: ignore[invalid-return-type]
|
||||
return super().__class_getitem__(item)
|
||||
|
||||
|
||||
class OffsetPaginatedResponse(PaginatedResponse[DataT]):
|
||||
"""Paginated response with typed offset-based pagination metadata.
|
||||
|
||||
@@ -7,7 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from fastapi_toolsets.crud import CrudFactory, PaginationType
|
||||
from fastapi_toolsets.crud.factory import AsyncCrud, _CursorDirection
|
||||
from fastapi_toolsets.crud.factory import AsyncCrud
|
||||
from fastapi_toolsets.exceptions import NotFoundError
|
||||
|
||||
from .conftest import (
|
||||
@@ -1969,8 +1969,11 @@ class TestCursorPaginatePrevCursor:
|
||||
assert page2.pagination.prev_cursor is not None
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_prev_cursor_navigates_back(self, db_session: AsyncSession):
|
||||
"""prev_cursor on page 2 navigates back to the same items as page 1."""
|
||||
async def test_prev_cursor_points_to_first_item(self, db_session: AsyncSession):
|
||||
"""prev_cursor encodes the value of the first item on the current page."""
|
||||
import base64
|
||||
import json
|
||||
|
||||
for i in range(10):
|
||||
await RoleCrud.create(db_session, RoleCreate(name=f"role{i:02d}"))
|
||||
|
||||
@@ -1989,83 +1992,12 @@ class TestCursorPaginatePrevCursor:
|
||||
assert isinstance(page2.pagination, CursorPagination)
|
||||
assert page2.pagination.prev_cursor is not None
|
||||
|
||||
# Using prev_cursor should return the same items as page 1
|
||||
back_to_page1 = await RoleCursorCrud.cursor_paginate(
|
||||
db_session,
|
||||
cursor=page2.pagination.prev_cursor,
|
||||
items_per_page=5,
|
||||
schema=RoleRead,
|
||||
# Decode prev_cursor and compare to first item's id
|
||||
decoded = json.loads(
|
||||
base64.b64decode(page2.pagination.prev_cursor.encode()).decode()
|
||||
)
|
||||
assert isinstance(back_to_page1.pagination, CursorPagination)
|
||||
assert [r.id for r in back_to_page1.data] == [r.id for r in page1.data]
|
||||
assert back_to_page1.pagination.prev_cursor is None
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_prev_cursor_empty_result_when_no_items_before(
|
||||
self, db_session: AsyncSession
|
||||
):
|
||||
"""Going backward past the first item returns an empty page."""
|
||||
from fastapi_toolsets.crud.factory import _encode_cursor
|
||||
from fastapi_toolsets.schemas import CursorPagination
|
||||
|
||||
await IntRoleCursorCrud.create(db_session, IntRoleCreate(name="role00"))
|
||||
|
||||
page1 = await IntRoleCursorCrud.cursor_paginate(
|
||||
db_session, items_per_page=5, schema=IntRoleRead
|
||||
)
|
||||
assert isinstance(page1.pagination, CursorPagination)
|
||||
|
||||
# Manually craft a backward cursor before any existing id
|
||||
before_all = _encode_cursor(0, direction=_CursorDirection.PREV)
|
||||
empty = await IntRoleCursorCrud.cursor_paginate(
|
||||
db_session, cursor=before_all, items_per_page=5, schema=IntRoleRead
|
||||
)
|
||||
|
||||
assert isinstance(empty.pagination, CursorPagination)
|
||||
assert empty.data == []
|
||||
assert empty.pagination.next_cursor is None
|
||||
assert empty.pagination.prev_cursor is None
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_prev_cursor_set_when_more_pages_behind(
|
||||
self, db_session: AsyncSession
|
||||
):
|
||||
"""Going backward on page 2 (of 3) still exposes a prev_cursor for page 1."""
|
||||
for i in range(9):
|
||||
await RoleCrud.create(db_session, RoleCreate(name=f"role{i:02d}"))
|
||||
|
||||
from fastapi_toolsets.schemas import CursorPagination
|
||||
|
||||
page1 = await RoleCursorCrud.cursor_paginate(
|
||||
db_session, items_per_page=3, schema=RoleRead
|
||||
)
|
||||
assert isinstance(page1.pagination, CursorPagination)
|
||||
page2 = await RoleCursorCrud.cursor_paginate(
|
||||
db_session,
|
||||
cursor=page1.pagination.next_cursor,
|
||||
items_per_page=3,
|
||||
schema=RoleRead,
|
||||
)
|
||||
assert isinstance(page2.pagination, CursorPagination)
|
||||
page3 = await RoleCursorCrud.cursor_paginate(
|
||||
db_session,
|
||||
cursor=page2.pagination.next_cursor,
|
||||
items_per_page=3,
|
||||
schema=RoleRead,
|
||||
)
|
||||
assert isinstance(page3.pagination, CursorPagination)
|
||||
assert page3.pagination.prev_cursor is not None
|
||||
|
||||
# Going back to page 2 should still have a prev_cursor pointing at page 1
|
||||
back_to_page2 = await RoleCursorCrud.cursor_paginate(
|
||||
db_session,
|
||||
cursor=page3.pagination.prev_cursor,
|
||||
items_per_page=3,
|
||||
schema=RoleRead,
|
||||
)
|
||||
assert isinstance(back_to_page2.pagination, CursorPagination)
|
||||
assert [r.id for r in back_to_page2.data] == [r.id for r in page2.data]
|
||||
assert back_to_page2.pagination.prev_cursor is not None
|
||||
first_item_id = str(page2.data[0].id)
|
||||
assert decoded == first_item_id
|
||||
|
||||
|
||||
class TestCursorPaginateWithSearch:
|
||||
|
||||
@@ -11,7 +11,6 @@ from fastapi_toolsets.models import (
|
||||
CreatedAtMixin,
|
||||
TimestampMixin,
|
||||
UUIDMixin,
|
||||
UUIDv7Mixin,
|
||||
UpdatedAtMixin,
|
||||
)
|
||||
|
||||
@@ -49,12 +48,6 @@ class TimestampModel(MixinBase, TimestampMixin):
|
||||
name: Mapped[str] = mapped_column(String(50))
|
||||
|
||||
|
||||
class UUIDv7Model(MixinBase, UUIDv7Mixin):
|
||||
__tablename__ = "mixin_uuidv7_models"
|
||||
|
||||
name: Mapped[str] = mapped_column(String(50))
|
||||
|
||||
|
||||
class FullMixinModel(MixinBase, UUIDMixin, UpdatedAtMixin):
|
||||
__tablename__ = "mixin_full_models"
|
||||
|
||||
@@ -240,50 +233,6 @@ class TestTimestampMixin:
|
||||
assert "updated_at" in col_names
|
||||
|
||||
|
||||
class TestUUIDv7Mixin:
|
||||
@pytest.mark.anyio
|
||||
async def test_uuid7_generated_by_db(self, mixin_session):
|
||||
"""UUIDv7 is generated server-side and populated after flush."""
|
||||
obj = UUIDv7Model(name="test")
|
||||
mixin_session.add(obj)
|
||||
await mixin_session.flush()
|
||||
|
||||
assert obj.id is not None
|
||||
assert isinstance(obj.id, uuid.UUID)
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_uuid7_is_primary_key(self):
|
||||
"""UUIDv7Mixin adds id as primary key column."""
|
||||
pk_cols = [c.name for c in UUIDv7Model.__table__.primary_key]
|
||||
assert pk_cols == ["id"]
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_each_row_gets_unique_uuid7(self, mixin_session):
|
||||
"""Each inserted row gets a distinct UUIDv7."""
|
||||
a = UUIDv7Model(name="a")
|
||||
b = UUIDv7Model(name="b")
|
||||
mixin_session.add_all([a, b])
|
||||
await mixin_session.flush()
|
||||
|
||||
assert a.id != b.id
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_uuid7_version(self, mixin_session):
|
||||
"""Generated UUIDs have version 7."""
|
||||
obj = UUIDv7Model(name="test")
|
||||
mixin_session.add(obj)
|
||||
await mixin_session.flush()
|
||||
|
||||
assert obj.id.version == 7
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_uuid7_server_default_set(self):
|
||||
"""Column has uuidv7() as server default."""
|
||||
col = UUIDv7Model.__table__.c["id"]
|
||||
assert col.server_default is not None
|
||||
assert "uuidv7" in str(col.server_default.arg)
|
||||
|
||||
|
||||
class TestFullMixinModel:
|
||||
@pytest.mark.anyio
|
||||
async def test_combined_mixins_work_together(self, mixin_session):
|
||||
|
||||
@@ -304,7 +304,7 @@ class TestPaginatedResponse:
|
||||
page=1,
|
||||
has_more=False,
|
||||
)
|
||||
response = PaginatedResponse(
|
||||
response = PaginatedResponse[dict](
|
||||
data=[],
|
||||
pagination=pagination,
|
||||
)
|
||||
@@ -313,30 +313,6 @@ class TestPaginatedResponse:
|
||||
assert response.data == []
|
||||
assert response.pagination.total_count == 0
|
||||
|
||||
def test_class_getitem_with_concrete_type_returns_discriminated_union(self):
|
||||
"""PaginatedResponse[T] with a concrete type returns a discriminated Annotated union."""
|
||||
import typing
|
||||
|
||||
alias = PaginatedResponse[dict]
|
||||
args = typing.get_args(alias)
|
||||
# args[0] is the Union, args[1] is the FieldInfo discriminator
|
||||
union_args = typing.get_args(args[0])
|
||||
assert CursorPaginatedResponse[dict] in union_args
|
||||
assert OffsetPaginatedResponse[dict] in union_args
|
||||
|
||||
def test_class_getitem_is_cached(self):
|
||||
"""Repeated subscripting with the same type returns the identical cached object."""
|
||||
assert PaginatedResponse[dict] is PaginatedResponse[dict]
|
||||
|
||||
def test_class_getitem_with_typevar_returns_generic(self):
|
||||
"""PaginatedResponse[TypeVar] falls through to Pydantic generic parametrisation."""
|
||||
from typing import TypeVar
|
||||
|
||||
T = TypeVar("T")
|
||||
alias = PaginatedResponse[T]
|
||||
# Should be a generic alias, not an Annotated union
|
||||
assert not hasattr(alias, "__metadata__")
|
||||
|
||||
def test_generic_type_hint(self):
|
||||
"""PaginatedResponse supports generic type hints."""
|
||||
pagination = OffsetPagination(
|
||||
|
||||
Reference in New Issue
Block a user