Compare commits

..

4 Commits

Author SHA1 Message Date
dependabot[bot]
4829cfba73 ⬆ Bump pygments from 2.19.2 to 2.20.0 (#199)
Bumps [pygments](https://github.com/pygments/pygments) from 2.19.2 to 2.20.0.
- [Release notes](https://github.com/pygments/pygments/releases)
- [Changelog](https://github.com/pygments/pygments/blob/master/CHANGES)
- [Commits](https://github.com/pygments/pygments/compare/2.19.2...2.20.0)

---
updated-dependencies:
- dependency-name: pygments
  dependency-version: 2.20.0
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-30 20:05:52 +02:00
d3vyce
9ca2da4213 docs: add documentation versioning (#125)
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
2026-03-30 18:55:38 +02:00
d3vyce
0b3f097012 fix: batch insert normalizes away omitted nullable columns (#198) 2026-03-30 18:50:05 +02:00
d3vyce
1890d696bf feat: rework async event system (#196)
* feat: rework async event system

* docs: add v3 migration guide

* feat: add cache

* enhancements
2026-03-30 18:24:36 +02:00
18 changed files with 1669 additions and 743 deletions

View File

@@ -5,20 +5,15 @@ on:
types: [published]
permissions:
contents: read
pages: write
id-token: write
contents: write
jobs:
deploy:
environment:
name: github-pages
url: ${{ steps.deployment.outputs.page_url }}
runs-on: ubuntu-latest
steps:
- uses: actions/configure-pages@v5
- uses: actions/checkout@v6
with:
fetch-depth: 0
- name: Install uv
uses: astral-sh/setup-uv@v7
@@ -28,11 +23,40 @@ jobs:
- run: uv sync --group dev
- run: uv run zensical build --clean
- name: Configure git
run: |
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"
- uses: actions/upload-pages-artifact@v4
with:
path: site
- name: Deploy documentation
run: |
VERSION=${GITHUB_REF_NAME#v}
MAJOR=$(echo "$VERSION" | cut -d. -f1)
DEPLOY_VERSION="v$(echo "$VERSION" | cut -d. -f1-2)"
- uses: actions/deploy-pages@v5
id: deployment
# On new major: consolidate previous major's feature versions into vX
PREV_MAJOR=$((MAJOR - 1))
OLD_FEATURE_VERSIONS=$(uv run mike list 2>/dev/null | grep -oE "^v${PREV_MAJOR}\.[0-9]+" || true)
if [ -n "$OLD_FEATURE_VERSIONS" ]; then
LATEST_PREV_TAG=$(git tag -l "v${PREV_MAJOR}.*" | sort -V | tail -1)
if [ -n "$LATEST_PREV_TAG" ]; then
git checkout "$LATEST_PREV_TAG" -- docs/ src/ zensical.toml
if ! grep -q '\[project\.extra\.version\]' zensical.toml; then
printf '\n[project.extra.version]\nprovider = "mike"\ndefault = "stable"\nalias = true\n' >> zensical.toml
fi
uv run mike deploy "v${PREV_MAJOR}"
git checkout HEAD -- docs/ src/ zensical.toml
fi
# Delete old feature versions
echo "$OLD_FEATURE_VERSIONS" | while read -r OLD_V; do
echo "Deleting $OLD_V"
uv run mike delete "$OLD_V"
done
fi
uv run mike deploy --update-aliases "$DEPLOY_VERSION" stable
uv run mike set-default stable
git push origin gh-pages

View File

@@ -48,7 +48,8 @@ uv add "fastapi-toolsets[all]"
- **Database**: Session management, transaction helpers, table locking, and polling-based row change detection
- **Dependencies**: FastAPI dependency factories (`PathDependency`, `BodyDependency`) for automatic DB lookups from path or body parameters
- **Fixtures**: Fixture system with dependency management, context support, and pytest integration
- **Model Mixins**: SQLAlchemy mixins for common column patterns (`UUIDMixin`, `UUIDv7Mixin`, `CreatedAtMixin`, `UpdatedAtMixin`, `TimestampMixin`) and lifecycle callbacks (`WatchedFieldsMixin`, `@watch`) that fire after commit for insert, update, and delete events
- **Model Mixins**: SQLAlchemy mixins for common column patterns (`UUIDMixin`, `UUIDv7Mixin`, `CreatedAtMixin`, `UpdatedAtMixin`, `TimestampMixin`)
- **Lifecycle Events**: Post-commit event system (`EventSession`, `listens_for`) that dispatches async/sync callbacks for insert, update, and delete operations
- **Standardized API Responses**: Consistent response format with `Response`, `ErrorResponse`, `PaginatedResponse`, `CursorPaginatedResponse` and `OffsetPaginatedResponse`.
- **Exception Handling**: Structured error responses with automatic OpenAPI documentation
- **Logging**: Logging configuration with uvicorn integration via `configure_logging` and `get_logger`

View File

@@ -1 +0,0 @@
fastapi-toolsets.d3vyce.fr

View File

@@ -48,7 +48,8 @@ uv add "fastapi-toolsets[all]"
- **Database**: Session management, transaction helpers, table locking, and polling-based row change detection
- **Dependencies**: FastAPI dependency factories (`PathDependency`, `BodyDependency`) for automatic DB lookups from path or body parameters
- **Fixtures**: Fixture system with dependency management, context support, and pytest integration
- **Model Mixins**: SQLAlchemy mixins for common column patterns (`UUIDMixin`, `UUIDv7Mixin`, `CreatedAtMixin`, `UpdatedAtMixin`, `TimestampMixin`) and lifecycle callbacks (`WatchedFieldsMixin`) that fire after commit for insert, update, and delete events.
- **Model Mixins**: SQLAlchemy mixins for common column patterns (`UUIDMixin`, `UUIDv7Mixin`, `CreatedAtMixin`, `UpdatedAtMixin`, `TimestampMixin`).
- **Lifecycle Events**: Post-commit event system (`EventSession`, `listens_for`) that dispatches async/sync callbacks for insert, update, and delete operations.
- **Standardized API Responses**: Consistent response format with `Response`, `ErrorResponse`, `PaginatedResponse`, `CursorPaginatedResponse` and `OffsetPaginatedResponse`.
- **Exception Handling**: Structured error responses with automatic OpenAPI documentation
- **Logging**: Logging configuration with uvicorn integration via `configure_logging` and `get_logger`

93
docs/migration/v3.md Normal file
View File

@@ -0,0 +1,93 @@
# Migrating to v3.0
This page covers every breaking change introduced in **v3.0** and the steps required to update your code.
---
## Models
The lifecycle event system has been rewritten. Callbacks are now registered with a module-level [`listens_for`](../reference/models.md#fastapi_toolsets.models.listens_for) decorator and dispatched by [`EventSession`](../reference/models.md#fastapi_toolsets.models.EventSession), replacing the mixin-based approach from `v2`.
### `WatchedFieldsMixin` and `@watch` removed
Importing `WatchedFieldsMixin` or `watch` will raise `ImportError`.
Model method callbacks (`on_create`, `on_delete`, `on_update`) and the `@watch` decorator are replaced by:
1. **`__watched_fields__`** — a plain class attribute to restrict which field changes trigger `UPDATE` events (replaces `@watch`).
2. **`@listens_for`** — a module-level decorator to register callbacks for one or more [`ModelEvent`](../reference/models.md#fastapi_toolsets.models.ModelEvent) types (replaces `on_create` / `on_delete` / `on_update` methods).
=== "Before (`v2`)"
```python
from fastapi_toolsets.models import WatchedFieldsMixin, watch
@watch("status")
class Order(Base, UUIDMixin, WatchedFieldsMixin):
__tablename__ = "orders"
status: Mapped[str]
async def on_create(self):
await notify_new_order(self.id)
async def on_update(self, changes):
if "status" in changes:
await notify_status_change(self.id, changes["status"])
async def on_delete(self):
await notify_order_cancelled(self.id)
```
=== "Now (`v3`)"
```python
from fastapi_toolsets.models import ModelEvent, UUIDMixin, listens_for
class Order(Base, UUIDMixin):
__tablename__ = "orders"
__watched_fields__ = ("status",)
status: Mapped[str]
@listens_for(Order, [ModelEvent.CREATE])
async def on_order_created(order: Order, event_type: ModelEvent, changes: None):
await notify_new_order(order.id)
@listens_for(Order, [ModelEvent.UPDATE])
async def on_order_updated(order: Order, event_type: ModelEvent, changes: dict):
if "status" in changes:
await notify_status_change(order.id, changes["status"])
@listens_for(Order, [ModelEvent.DELETE])
async def on_order_deleted(order: Order, event_type: ModelEvent, changes: None):
await notify_order_cancelled(order.id)
```
### `EventSession` now required
Without `EventSession`, lifecycle callbacks will silently stop firing.
Callbacks are now dispatched inside `EventSession.commit()` rather than via background tasks. Pass it as the session class when creating your session factory:
=== "Before (`v2`)"
```python
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
engine = create_async_engine("postgresql+asyncpg://...")
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
```
=== "Now (`v3`)"
```python
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from fastapi_toolsets.models import EventSession
engine = create_async_engine("postgresql+asyncpg://...")
SessionLocal = async_sessionmaker(engine, expire_on_commit=False, class_=EventSession)
```
!!! note
If you use `create_db_session` from `fastapi_toolsets.pytest`, the session already uses `EventSession` — no changes needed in tests.

View File

@@ -117,139 +117,118 @@ class Article(Base, UUIDMixin, TimestampMixin):
title: Mapped[str]
```
### [`WatchedFieldsMixin`](../reference/models.md#fastapi_toolsets.models.WatchedFieldsMixin)
## Lifecycle events
!!! info "Added in `v2.4`"
The event system provides lifecycle callbacks that fire **after commit**. If the transaction rolls back, no callback fires.
`WatchedFieldsMixin` provides lifecycle callbacks that fire **after commit** — meaning the row is durably persisted when your callback runs. If the transaction rolls back, no callback fires.
### Setup
Three callbacks are available, each corresponding to a [`ModelEvent`](../reference/models.md#fastapi_toolsets.models.ModelEvent) value:
| Callback | Event | Trigger |
|---|---|---|
| `on_create()` | `ModelEvent.CREATE` | After `INSERT` |
| `on_delete()` | `ModelEvent.DELETE` | After `DELETE` |
| `on_update(changes)` | `ModelEvent.UPDATE` | After `UPDATE` on a watched field |
Server-side defaults (e.g. `id`, `created_at`) are fully populated in all callbacks. All callbacks support both `async def` and plain `def`. Use `@watch` to restrict which fields trigger `on_update`:
| Decorator | `on_update` behaviour |
|---|---|
| `@watch("status", "role")` | Only fires when `status` or `role` changes |
| *(no decorator)* | Fires when **any** mapped field changes |
`@watch` is inherited through the class hierarchy. If a subclass does not declare its own `@watch`, it uses the filter from the nearest decorated parent. Applying `@watch` on the subclass overrides the parent's filter:
Event dispatch requires [`EventSession`](../reference/models.md#fastapi_toolsets.models.EventSession). Pass it as the session class when creating your session factory:
```python
@watch("status")
class Order(Base, UUIDMixin, WatchedFieldsMixin):
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from fastapi_toolsets.models import EventSession
engine = create_async_engine("postgresql+asyncpg://...")
SessionLocal = async_sessionmaker(engine, expire_on_commit=False, class_=EventSession)
```
!!! info "Callbacks fire on `session.commit()` only — not on savepoints."
Savepoints created by [`get_transaction`](db.md) or `begin_nested()` do **not**
trigger callbacks. All events accumulated across flushes are dispatched once
when the outermost `commit()` is called.
### Events
Three event types are available, each corresponding to a [`ModelEvent`](../reference/models.md#fastapi_toolsets.models.ModelEvent) value:
| Event | Trigger |
|---|---|
| `ModelEvent.CREATE` | After `INSERT` commit |
| `ModelEvent.DELETE` | After `DELETE` commit |
| `ModelEvent.UPDATE` | After `UPDATE` commit on a watched field |
!!! warning "Callbacks fire only for ORM-level changes. Rows updated via raw SQL (`UPDATE ... SET ...`) are not detected."
### Watched fields
Set `__watched_fields__` on the model to restrict which field changes trigger `UPDATE` events. It must be a `tuple[str, ...]` — any other type raises `TypeError`:
| Class attribute | `UPDATE` behaviour |
|---|---|
| `__watched_fields__ = ("status", "role")` | Only fires when `status` or `role` changes |
| *(not set)* | Fires when **any** mapped field changes |
`__watched_fields__` is inherited through the class hierarchy via normal Python MRO. A subclass can override it:
```python
class Order(Base, UUIDMixin):
__watched_fields__ = ("status",)
...
class UrgentOrder(Order):
# inherits @watch("status") — on_update fires only for status changes
# inherits __watched_fields__ = ("status",)
...
@watch("priority")
class PriorityOrder(Order):
# overrides parent — on_update fires only for priority changes
__watched_fields__ = ("priority",)
# overrides parent — UPDATE fires only for priority changes
...
```
#### Option 1 — catch-all with `on_event`
### Registering handlers
Override `on_event` to handle all event types in one place. The specific methods delegate here by default:
Register handlers with the [`listens_for`](../reference/models.md#fastapi_toolsets.models.listens_for) decorator. Every callback receives three arguments: the model instance, the [`ModelEvent`](../reference/models.md#fastapi_toolsets.models.ModelEvent) that triggered it, and a `changes` dict (`None` for `CREATE` and `DELETE`):
```python
from fastapi_toolsets.models import ModelEvent, UUIDMixin, WatchedFieldsMixin, watch
from fastapi_toolsets.models import ModelEvent, UUIDMixin, listens_for
@watch("status")
class Order(Base, UUIDMixin, WatchedFieldsMixin):
class Order(Base, UUIDMixin):
__tablename__ = "orders"
__watched_fields__ = ("status",)
status: Mapped[str]
async def on_event(self, event: ModelEvent, changes: dict | None = None) -> None:
if event == ModelEvent.CREATE:
await notify_new_order(self.id)
elif event == ModelEvent.DELETE:
await notify_order_cancelled(self.id)
elif event == ModelEvent.UPDATE:
await notify_status_change(self.id, changes["status"])
@listens_for(Order, [ModelEvent.CREATE])
async def on_order_created(order: Order, event_type: ModelEvent, changes: None):
await notify_new_order(order.id)
@listens_for(Order, [ModelEvent.DELETE])
async def on_order_deleted(order: Order, event_type: ModelEvent, changes: None):
await notify_order_cancelled(order.id)
@listens_for(Order, [ModelEvent.UPDATE])
async def on_order_updated(order: Order, event_type: ModelEvent, changes: dict):
if "status" in changes:
await notify_status_change(order.id, changes["status"])
```
#### Option 2 — targeted overrides
Multiple handlers can be registered for the same model and event. Handlers registered on a parent class also fire for subclass instances.
Override individual methods for more focused logic:
A single handler can listen for multiple events at once. When `event_types` is omitted, the handler fires for all events:
```python
@watch("status")
class Order(Base, UUIDMixin, WatchedFieldsMixin):
__tablename__ = "orders"
@listens_for(Order, [ModelEvent.CREATE, ModelEvent.UPDATE])
async def on_order_changed(order: Order, event_type: ModelEvent, changes: dict | None):
await invalidate_cache(order.id)
status: Mapped[str]
async def on_create(self) -> None:
await notify_new_order(self.id)
async def on_delete(self) -> None:
await notify_order_cancelled(self.id)
async def on_update(self, changes: dict) -> None:
if "status" in changes:
old = changes["status"]["old"]
new = changes["status"]["new"]
await notify_status_change(self.id, old, new)
@listens_for(Order) # all events
async def on_any_order_event(order: Order, event_type: ModelEvent, changes: dict | None):
await audit_log(order.id, event_type)
```
#### Field changes format
### Field changes format
The `changes` dict maps each watched field that changed to `{"old": ..., "new": ...}`. Only fields that actually changed are included:
The `changes` dict maps each watched field that changed to `{"old": ..., "new": ...}`. Only fields that actually changed are included. For `CREATE` and `DELETE` events, `changes` is `None`:
```python
# CREATE / DELETE → changes is None
# status changed → {"status": {"old": "pending", "new": "shipped"}}
# two fields changed → {"status": {...}, "assigned_to": {...}}
```
!!! info "Multiple flushes in one transaction are merged: the earliest `old` and latest `new` are preserved, and `on_update` fires only once per commit."
!!! warning "Callbacks fire only for ORM-level changes. Rows updated via raw SQL (`UPDATE ... SET ...`) are not detected."
!!! warning "Callbacks fire when the **outermost active context** (savepoint or transaction) commits."
If you create several related objects using `CrudFactory.create` and need
callbacks to see all of them (including associations), wrap the whole
operation in a single [`get_transaction`](db.md) or [`lock_tables`](db.md)
block. Without it, each `create` call commits its own savepoint and
`on_create` fires before the remaining objects exist.
```python
from fastapi_toolsets.db import get_transaction
async with get_transaction(session):
order = await OrderCrud.create(session, order_data)
item = await ItemCrud.create(session, item_data)
await session.refresh(order, attribute_names=["items"])
order.items.append(item)
# on_create fires here for both order and item,
# with the full association already committed.
```
## Composing mixins
All mixins can be combined in any order. The only constraint is that exactly one primary key must be defined — either via `UUIDMixin` or directly on the model.
```python
from fastapi_toolsets.models import UUIDMixin, TimestampMixin
class Event(Base, UUIDMixin, TimestampMixin):
__tablename__ = "events"
name: Mapped[str]
class Counter(Base, UpdatedAtMixin):
__tablename__ = "counters"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
value: Mapped[int]
```
---
[:material-api: API Reference](../reference/models.md)

View File

@@ -6,17 +6,19 @@ You can import them directly from `fastapi_toolsets.models`:
```python
from fastapi_toolsets.models import (
EventSession,
ModelEvent,
UUIDMixin,
UUIDv7Mixin,
CreatedAtMixin,
UpdatedAtMixin,
TimestampMixin,
WatchedFieldsMixin,
watch,
listens_for,
)
```
## ::: fastapi_toolsets.models.EventSession
## ::: fastapi_toolsets.models.ModelEvent
## ::: fastapi_toolsets.models.UUIDMixin
@@ -29,6 +31,4 @@ from fastapi_toolsets.models import (
## ::: fastapi_toolsets.models.TimestampMixin
## ::: fastapi_toolsets.models.WatchedFieldsMixin
## ::: fastapi_toolsets.models.watch
## ::: fastapi_toolsets.models.listens_for

View File

@@ -80,8 +80,9 @@ tests = [
"pytest>=8.0.0",
]
docs = [
"mike",
"mkdocstrings-python>=2.0.2",
"zensical>=0.0.23",
"zensical>=0.0.30",
]
[build-system]
@@ -104,3 +105,6 @@ exclude_lines = [
"if TYPE_CHECKING:",
"raise NotImplementedError",
]
[tool.uv.sources]
mike = { git = "https://github.com/squidfunk/mike.git", tag = "2.2.0+zensical-0.1.0" }

View File

@@ -24,9 +24,12 @@ __all__ = [
]
_SessionT = TypeVar("_SessionT", bound=AsyncSession)
def create_db_dependency(
session_maker: async_sessionmaker[AsyncSession],
) -> Callable[[], AsyncGenerator[AsyncSession, None]]:
session_maker: async_sessionmaker[_SessionT],
) -> Callable[[], AsyncGenerator[_SessionT, None]]:
"""Create a FastAPI dependency for database sessions.
Creates a dependency function that yields a session and auto-commits
@@ -54,7 +57,7 @@ def create_db_dependency(
```
"""
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async def get_db() -> AsyncGenerator[_SessionT, None]:
async with session_maker() as session:
await session.connection()
yield session
@@ -65,8 +68,8 @@ def create_db_dependency(
def create_db_context(
session_maker: async_sessionmaker[AsyncSession],
) -> Callable[[], AbstractAsyncContextManager[AsyncSession]]:
session_maker: async_sessionmaker[_SessionT],
) -> Callable[[], AbstractAsyncContextManager[_SessionT]]:
"""Create a context manager for database sessions.
Creates a context manager for use outside of FastAPI request handlers,

View File

@@ -30,20 +30,16 @@ def _instance_to_dict(instance: DeclarativeBase) -> dict[str, Any]:
if val is None:
col = prop.columns[0]
if col.server_default is not None or (
col.default is not None and col.default.is_callable
if (
col.server_default is not None
or (col.default is not None and col.default.is_callable)
or col.autoincrement is True
):
continue
result[prop.key] = val
return result
def _normalize_rows(dicts: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Ensure all row dicts share the same key set."""
all_keys: set[str] = set().union(*dicts)
return [{k: d.get(k) for k in all_keys} for d in dicts]
def _group_by_type(
instances: list[DeclarativeBase],
) -> list[tuple[type[DeclarativeBase], list[DeclarativeBase]]]:
@@ -54,14 +50,32 @@ def _group_by_type(
return list(groups.items())
def _group_by_column_set(
dicts: list[dict[str, Any]],
instances: list[DeclarativeBase],
) -> list[tuple[list[dict[str, Any]], list[DeclarativeBase]]]:
"""Group (dict, instance) pairs by their dict key sets."""
groups: dict[
frozenset[str], tuple[list[dict[str, Any]], list[DeclarativeBase]]
] = {}
for d, inst in zip(dicts, instances):
key = frozenset(d)
if key not in groups:
groups[key] = ([], [])
groups[key][0].append(d)
groups[key][1].append(inst)
return list(groups.values())
async def _batch_insert(
session: AsyncSession,
model_cls: type[DeclarativeBase],
instances: list[DeclarativeBase],
) -> None:
"""INSERT all instances — raises on conflict (no duplicate handling)."""
dicts = _normalize_rows([_instance_to_dict(i) for i in instances])
await session.execute(pg_insert(model_cls).values(dicts))
dicts = [_instance_to_dict(i) for i in instances]
for group_dicts, _ in _group_by_column_set(dicts, instances):
await session.execute(pg_insert(model_cls).values(group_dicts))
async def _batch_merge(
@@ -79,21 +93,22 @@ async def _batch_merge(
if not any(col.name in pk_names_set for col in prop.columns)
]
dicts = _normalize_rows([_instance_to_dict(i) for i in instances])
stmt = pg_insert(model_cls).values(dicts)
dicts = [_instance_to_dict(i) for i in instances]
for group_dicts, _ in _group_by_column_set(dicts, instances):
stmt = pg_insert(model_cls).values(group_dicts)
inserted_keys = set(dicts[0]) if dicts else set()
update_cols = [col for col in non_pk_cols if col in inserted_keys]
inserted_keys = set(group_dicts[0])
update_cols = [col for col in non_pk_cols if col in inserted_keys]
if update_cols:
stmt = stmt.on_conflict_do_update(
index_elements=pk_names,
set_={col: stmt.excluded[col] for col in update_cols},
)
else:
stmt = stmt.on_conflict_do_nothing(index_elements=pk_names)
if update_cols:
stmt = stmt.on_conflict_do_update(
index_elements=pk_names,
set_={col: stmt.excluded[col] for col in update_cols},
)
else:
stmt = stmt.on_conflict_do_nothing(index_elements=pk_names)
await session.execute(stmt)
await session.execute(stmt)
async def _batch_skip_existing(
@@ -116,22 +131,30 @@ async def _batch_skip_existing(
loaded: list[DeclarativeBase] = list(no_pk)
if no_pk:
await session.execute(
pg_insert(model_cls).values(
_normalize_rows([_instance_to_dict(i) for i in no_pk])
)
)
no_pk_dicts = [_instance_to_dict(i) for i in no_pk]
for group_dicts, _ in _group_by_column_set(no_pk_dicts, no_pk):
await session.execute(pg_insert(model_cls).values(group_dicts))
if with_pk_pairs:
with_pk = [i for i, _ in with_pk_pairs]
stmt = (
pg_insert(model_cls)
.values(_normalize_rows([_instance_to_dict(i) for i in with_pk]))
.on_conflict_do_nothing(index_elements=pk_names)
)
result = await session.execute(stmt.returning(*mapper.primary_key))
inserted_pks = {row[0] if len(pk_names) == 1 else tuple(row) for row in result}
loaded.extend(inst for inst, pk in with_pk_pairs if pk in inserted_pks)
with_pk_dicts = [_instance_to_dict(i) for i in with_pk]
for group_dicts, group_insts in _group_by_column_set(with_pk_dicts, with_pk):
stmt = (
pg_insert(model_cls)
.values(group_dicts)
.on_conflict_do_nothing(index_elements=pk_names)
)
result = await session.execute(stmt.returning(*mapper.primary_key))
inserted_pks = {
row[0] if len(pk_names) == 1 else tuple(row) for row in result
}
loaded.extend(
inst
for inst, pk in zip(
group_insts, [_get_primary_key(i) for i in group_insts]
)
if pk in inserted_pks
)
return loaded
@@ -143,12 +166,7 @@ async def _load_ordered(
strategy: LoadStrategy,
contexts: tuple[str, ...] | None = None,
) -> dict[str, list[DeclarativeBase]]:
"""Load fixtures in order using batch Core INSERT statements.
When *contexts* is provided only variants whose context set intersects with
*contexts* are called for each name; their instances are concatenated.
When *contexts* is ``None`` all variants of each name are loaded.
"""
"""Load fixtures in order using batch Core INSERT statements."""
results: dict[str, list[DeclarativeBase]] = {}
for name in ordered_names:
@@ -158,10 +176,6 @@ async def _load_ordered(
else registry.get_variants(name)
)
# Cross-context dependency fallback: if we're loading by context but
# no variant matches (e.g. a "base"-only fixture required by a
# "testing" fixture), load all available variants so the dependency
# is satisfied.
if contexts is not None and not variants:
variants = registry.get_variants(name)
@@ -267,10 +281,6 @@ async def load_fixtures_by_context(
) -> dict[str, list[DeclarativeBase]]:
"""Load all fixtures for specific contexts.
For each fixture name, only the variants whose context set intersects with
*contexts* are loaded. When a name has variants in multiple of the
requested contexts, their instances are merged before being inserted.
Args:
session: Database session
registry: Fixture registry

View File

@@ -7,15 +7,15 @@ from .columns import (
UUIDv7Mixin,
UpdatedAtMixin,
)
from .watched import ModelEvent, WatchedFieldsMixin, watch
from .watched import EventSession, ModelEvent, listens_for
__all__ = [
"EventSession",
"ModelEvent",
"UUIDMixin",
"UUIDv7Mixin",
"CreatedAtMixin",
"UpdatedAtMixin",
"TimestampMixin",
"WatchedFieldsMixin",
"watch",
"listens_for",
]

View File

@@ -6,14 +6,6 @@ from datetime import datetime
from sqlalchemy import DateTime, Uuid, text
from sqlalchemy.orm import Mapped, mapped_column
__all__ = [
"UUIDMixin",
"UUIDv7Mixin",
"CreatedAtMixin",
"UpdatedAtMixin",
"TimestampMixin",
]
class UUIDMixin:
"""Mixin that adds a UUID primary key auto-generated by the database."""

View File

@@ -1,11 +1,9 @@
"""Field-change monitoring via SQLAlchemy session events."""
import asyncio
import inspect
import weakref
from collections.abc import Awaitable
from collections.abc import Callable
from enum import Enum
from typing import Any, TypeVar
from typing import Any
from sqlalchemy import event
from sqlalchemy import inspect as sa_inspect
@@ -14,49 +12,81 @@ from sqlalchemy.orm.attributes import set_committed_value as _sa_set_committed_v
from ..logger import get_logger
__all__ = ["ModelEvent", "WatchedFieldsMixin", "watch"]
_logger = get_logger()
_T = TypeVar("_T")
_CALLBACK_ERROR_MSG = "WatchedFieldsMixin callback raised an unhandled exception"
_WATCHED_FIELDS: weakref.WeakKeyDictionary[type, list[str]] = (
weakref.WeakKeyDictionary()
)
_SESSION_PENDING_NEW = "_ft_pending_new"
_SESSION_CREATES = "_ft_creates"
_SESSION_DELETES = "_ft_deletes"
_SESSION_UPDATES = "_ft_updates"
_SESSION_SAVEPOINT_DEPTH = "_ft_sp_depth"
_DEFERRED_STRATEGY_KEY = (("deferred", True), ("instrument", True))
class ModelEvent(str, Enum):
"""Event types emitted by :class:`WatchedFieldsMixin`."""
"""Event types dispatched by :class:`EventSession`."""
CREATE = "create"
DELETE = "delete"
UPDATE = "update"
def watch(*fields: str) -> Any:
"""Class decorator to filter which fields trigger ``on_update``.
_CALLBACK_ERROR_MSG = "Event callback raised an unhandled exception"
_SESSION_CREATES = "_ft_creates"
_SESSION_DELETES = "_ft_deletes"
_SESSION_UPDATES = "_ft_updates"
_DEFERRED_STRATEGY_KEY = (("deferred", True), ("instrument", True))
_EVENT_HANDLERS: dict[tuple[type, ModelEvent], list[Callable[..., Any]]] = {}
_WATCHED_MODELS: set[type] = set()
_WATCHED_CACHE: dict[type, bool] = {}
_HANDLER_CACHE: dict[tuple[type, ModelEvent], list[Callable[..., Any]]] = {}
def _invalidate_caches() -> None:
"""Clear lookup caches after handler registration."""
_WATCHED_CACHE.clear()
_HANDLER_CACHE.clear()
def listens_for(
model_class: type,
event_types: list[ModelEvent] | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Register a callback for one or more model lifecycle events.
Args:
*fields: One or more field names to watch. At least one name is required.
Raises:
ValueError: If called with no field names.
model_class: The SQLAlchemy model class to listen on.
event_types: List of :class:`ModelEvent` values to listen for.
Defaults to all event types.
"""
if not fields:
raise ValueError("@watch requires at least one field name.")
evs = event_types if event_types is not None else list(ModelEvent)
def decorator(cls: type[_T]) -> type[_T]:
_WATCHED_FIELDS[cls] = list(fields)
return cls
def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
for ev in evs:
_EVENT_HANDLERS.setdefault((model_class, ev), []).append(fn)
_WATCHED_MODELS.add(model_class)
_invalidate_caches()
return fn
return decorator
def _is_watched(obj: Any) -> bool:
"""Return True if *obj*'s type (or any ancestor) has registered handlers."""
cls = type(obj)
try:
return _WATCHED_CACHE[cls]
except KeyError:
result = any(klass in _WATCHED_MODELS for klass in cls.__mro__)
_WATCHED_CACHE[cls] = result
return result
def _get_handlers(cls: type, ev: ModelEvent) -> list[Callable[..., Any]]:
"""Return registered handlers for *cls* and *ev*, walking the MRO."""
key = (cls, ev)
try:
return _HANDLER_CACHE[key]
except KeyError:
handlers: list[Callable[..., Any]] = []
for klass in cls.__mro__:
handlers.extend(_EVENT_HANDLERS.get((klass, ev), []))
_HANDLER_CACHE[key] = handlers
return handlers
def _snapshot_column_attrs(obj: Any) -> dict[str, Any]:
"""Read currently-loaded column values into a plain dict."""
state = sa_inspect(obj) # InstanceState
@@ -65,7 +95,7 @@ def _snapshot_column_attrs(obj: Any) -> dict[str, Any]:
for prop in state.mapper.column_attrs:
if prop.key in state_dict:
snapshot[prop.key] = state_dict[prop.key]
elif (
elif ( # pragma: no cover
not state.expired
and prop.strategy_key != _DEFERRED_STRATEGY_KEY
and all(
@@ -79,12 +109,17 @@ def _snapshot_column_attrs(obj: Any) -> dict[str, Any]:
return snapshot
def _get_watched_fields(cls: type) -> list[str] | None:
"""Return the watched fields for *cls*, walking the MRO to inherit from parents."""
for klass in cls.__mro__:
if klass in _WATCHED_FIELDS:
return _WATCHED_FIELDS[klass]
return None
def _get_watched_fields(cls: type) -> tuple[str, ...] | None:
"""Return the watched fields for *cls*."""
fields = getattr(cls, "__watched_fields__", None)
if fields is not None and (
not isinstance(fields, tuple) or not all(isinstance(f, str) for f in fields)
):
raise TypeError(
f"{cls.__name__}.__watched_fields__ must be a tuple[str, ...], "
f"got {type(fields).__name__}"
)
return fields
def _upsert_changes(
@@ -105,50 +140,32 @@ def _upsert_changes(
pending[key] = (obj, changes)
@event.listens_for(AsyncSession.sync_session_class, "after_transaction_create")
def _after_transaction_create(session: Any, transaction: Any) -> None:
if transaction.nested:
session.info[_SESSION_SAVEPOINT_DEPTH] = (
session.info.get(_SESSION_SAVEPOINT_DEPTH, 0) + 1
)
@event.listens_for(AsyncSession.sync_session_class, "after_transaction_end")
def _after_transaction_end(session: Any, transaction: Any) -> None:
if transaction.nested:
depth = session.info.get(_SESSION_SAVEPOINT_DEPTH, 0)
if depth > 0: # pragma: no branch
session.info[_SESSION_SAVEPOINT_DEPTH] = depth - 1
@event.listens_for(AsyncSession.sync_session_class, "after_flush")
def _after_flush(session: Any, flush_context: Any) -> None:
# New objects: capture references while session.new is still populated.
# Values are read in _after_flush_postexec once RETURNING has been processed.
# New objects: capture reference. Attributes will be refreshed after commit.
for obj in session.new:
if isinstance(obj, WatchedFieldsMixin):
session.info.setdefault(_SESSION_PENDING_NEW, []).append(obj)
if _is_watched(obj):
session.info.setdefault(_SESSION_CREATES, []).append(obj)
# Deleted objects: capture before they leave the identity map.
# Deleted objects: snapshot now while attributes are still loaded.
for obj in session.deleted:
if isinstance(obj, WatchedFieldsMixin):
session.info.setdefault(_SESSION_DELETES, []).append(obj)
if _is_watched(obj):
snapshot = _snapshot_column_attrs(obj)
session.info.setdefault(_SESSION_DELETES, []).append((obj, snapshot))
# Dirty objects: read old/new from SQLAlchemy attribute history.
for obj in session.dirty:
if not isinstance(obj, WatchedFieldsMixin):
if not _is_watched(obj):
continue
# None = not in dict = watch all fields; list = specific fields only
watched = _get_watched_fields(type(obj))
changes: dict[str, dict[str, Any]] = {}
inst_attrs = sa_inspect(obj).attrs
attrs = (
# Specific fields
((field, sa_inspect(obj).attrs[field]) for field in watched)
((field, inst_attrs[field]) for field in watched)
if watched is not None
# All mapped fields
else ((s.key, s) for s in sa_inspect(obj).attrs)
else ((s.key, s) for s in inst_attrs)
)
for field, attr_state in attrs:
history = attr_state.history
@@ -166,116 +183,101 @@ def _after_flush(session: Any, flush_context: Any) -> None:
)
@event.listens_for(AsyncSession.sync_session_class, "after_flush_postexec")
def _after_flush_postexec(session: Any, flush_context: Any) -> None:
# New objects are now persistent and RETURNING values have been applied,
# so server defaults (id, created_at, …) are available via getattr.
pending_new: list[Any] = session.info.pop(_SESSION_PENDING_NEW, [])
if not pending_new:
return
session.info.setdefault(_SESSION_CREATES, []).extend(pending_new)
@event.listens_for(AsyncSession.sync_session_class, "after_rollback")
def _after_rollback(session: Any) -> None:
session.info.pop(_SESSION_PENDING_NEW, None)
if session.in_transaction():
return
session.info.pop(_SESSION_CREATES, None)
session.info.pop(_SESSION_DELETES, None)
session.info.pop(_SESSION_UPDATES, None)
def _task_error_handler(task: asyncio.Task[Any]) -> None:
if not task.cancelled() and (exc := task.exception()):
_logger.error(_CALLBACK_ERROR_MSG, exc_info=exc)
def _schedule_with_snapshot(
loop: asyncio.AbstractEventLoop, obj: Any, fn: Any, *args: Any
async def _invoke_callback(
fn: Callable[..., Any],
obj: Any,
event_type: ModelEvent,
changes: dict[str, dict[str, Any]] | None,
) -> None:
"""Snapshot *obj*'s column attrs now (before expire_on_commit wipes them),
then schedule a coroutine that restores the snapshot and calls *fn*.
"""
snapshot = _snapshot_column_attrs(obj)
async def _run(
obj: Any = obj,
fn: Any = fn,
snapshot: dict[str, Any] = snapshot,
args: tuple = args,
) -> None:
for key, value in snapshot.items():
_sa_set_committed_value(obj, key, value)
try:
result = fn(*args)
if inspect.isawaitable(result):
await result
except Exception as exc:
_logger.error(_CALLBACK_ERROR_MSG, exc_info=exc)
task = loop.create_task(_run())
task.add_done_callback(_task_error_handler)
"""Call *fn* and await the result if it is awaitable."""
result = fn(obj, event_type, changes)
if inspect.isawaitable(result):
await result
@event.listens_for(AsyncSession.sync_session_class, "after_commit")
def _after_commit(session: Any) -> None:
if session.info.get(_SESSION_SAVEPOINT_DEPTH, 0) > 0:
return
class EventSession(AsyncSession):
"""AsyncSession subclass that dispatches lifecycle callbacks after commit."""
creates: list[Any] = session.info.pop(_SESSION_CREATES, [])
deletes: list[Any] = session.info.pop(_SESSION_DELETES, [])
field_changes: dict[int, tuple[Any, dict[str, dict[str, Any]]]] = session.info.pop(
_SESSION_UPDATES, {}
)
async def commit(self) -> None: # noqa: C901
await super().commit()
if creates and deletes:
transient_ids = {id(o) for o in creates} & {id(o) for o in deletes}
if transient_ids:
creates = [o for o in creates if id(o) not in transient_ids]
deletes = [o for o in deletes if id(o) not in transient_ids]
creates: list[Any] = self.info.pop(_SESSION_CREATES, [])
deletes: list[tuple[Any, dict[str, Any]]] = self.info.pop(_SESSION_DELETES, [])
field_changes: dict[int, tuple[Any, dict[str, dict[str, Any]]]] = self.info.pop(
_SESSION_UPDATES, {}
)
if not creates and not deletes and not field_changes:
return
# Suppress transient objects (created + deleted in same transaction).
if creates and deletes:
created_ids = {id(o) for o in creates}
deleted_ids = {id(o) for o, _ in deletes}
transient_ids = created_ids & deleted_ids
if transient_ids:
creates = [o for o in creates if id(o) not in transient_ids]
deletes = [(o, s) for o, s in deletes if id(o) not in transient_ids]
field_changes = {
k: v for k, v in field_changes.items() if k not in transient_ids
}
# Suppress updates for newly created objects (CREATE-only semantics).
if creates and field_changes:
create_ids = {id(o) for o in creates}
field_changes = {
k: v for k, v in field_changes.items() if k not in transient_ids
k: v for k, v in field_changes.items() if k not in create_ids
}
if not creates and not deletes and not field_changes:
return
# Dispatch CREATE callbacks.
for obj in creates:
try:
state = sa_inspect(obj, raiseerr=False)
if (
state is None or state.detached or state.transient
): # pragma: no cover
continue
await self.refresh(obj)
for handler in _get_handlers(type(obj), ModelEvent.CREATE):
await _invoke_callback(handler, obj, ModelEvent.CREATE, None)
except Exception as exc:
_logger.error(_CALLBACK_ERROR_MSG, exc_info=exc)
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
# Dispatch DELETE callbacks (restore snapshot; row is gone).
for obj, snapshot in deletes:
try:
for key, value in snapshot.items():
_sa_set_committed_value(obj, key, value)
for handler in _get_handlers(type(obj), ModelEvent.DELETE):
await _invoke_callback(handler, obj, ModelEvent.DELETE, None)
except Exception as exc:
_logger.error(_CALLBACK_ERROR_MSG, exc_info=exc)
for obj in creates:
_schedule_with_snapshot(loop, obj, obj.on_create)
# Dispatch UPDATE callbacks.
for obj, changes in field_changes.values():
try:
state = sa_inspect(obj, raiseerr=False)
if (
state is None or state.detached or state.transient
): # pragma: no cover
continue
await self.refresh(obj)
for handler in _get_handlers(type(obj), ModelEvent.UPDATE):
await _invoke_callback(handler, obj, ModelEvent.UPDATE, changes)
except Exception as exc:
_logger.error(_CALLBACK_ERROR_MSG, exc_info=exc)
for obj in deletes:
_schedule_with_snapshot(loop, obj, obj.on_delete)
for obj, changes in field_changes.values():
_schedule_with_snapshot(loop, obj, obj.on_update, changes)
class WatchedFieldsMixin:
"""Mixin that enables lifecycle callbacks for SQLAlchemy models."""
def on_event(
self, event: ModelEvent, changes: dict[str, dict[str, Any]] | None = None
) -> Awaitable[None] | None:
"""Catch-all callback fired for every lifecycle event.
Args:
event: The event type (:attr:`ModelEvent.CREATE`, :attr:`ModelEvent.DELETE`,
or :attr:`ModelEvent.UPDATE`).
changes: Field changes for :attr:`ModelEvent.UPDATE`, ``None`` otherwise.
"""
def on_create(self) -> Awaitable[None] | None:
"""Called after INSERT commit."""
return self.on_event(ModelEvent.CREATE)
def on_delete(self) -> Awaitable[None] | None:
"""Called after DELETE commit."""
return self.on_event(ModelEvent.DELETE)
def on_update(self, changes: dict[str, dict[str, Any]]) -> Awaitable[None] | None:
"""Called after UPDATE commit when watched fields change."""
return self.on_event(ModelEvent.UPDATE, changes=changes)
async def rollback(self) -> None:
await super().rollback()
self.info.pop(_SESSION_CREATES, None)
self.info.pop(_SESSION_DELETES, None)
self.info.pop(_SESSION_UPDATES, None)

View File

@@ -18,6 +18,7 @@ from sqlalchemy.orm import DeclarativeBase
from ..db import cleanup_tables as _cleanup_tables
from ..db import create_database
from ..models.watched import EventSession
async def cleanup_tables(
@@ -265,7 +266,9 @@ async def create_db_session(
async with engine.begin() as conn:
await conn.run_sync(base.metadata.create_all)
session_maker = async_sessionmaker(engine, expire_on_commit=expire_on_commit)
session_maker = async_sessionmaker(
engine, expire_on_commit=expire_on_commit, class_=EventSession
)
async with session_maker() as session:
yield session

View File

@@ -1011,6 +1011,14 @@ class TestInstanceToDict:
assert "id" not in d
assert d["name"] == "admin"
def test_autoincrement_none_excluded(self):
"""A column whose value is None but has autoincrement=True is excluded
so the DB generates the value via its sequence."""
instance = IntRole(id=None, name="admin")
d = _instance_to_dict(instance)
assert "id" not in d
assert d["name"] == "admin"
def test_nullable_none_included(self):
"""None on a nullable column with no default is kept (explicit NULL)."""
instance = User(id=uuid.uuid4(), username="u", email="e@e.com", role_id=None)
@@ -1107,3 +1115,361 @@ class TestBatchMergeNonPkColumns:
db_session, registry, "permissions", strategy=LoadStrategy.MERGE
)
assert len(result2["permissions"]) == 2
class TestBatchNullableColumnEdgeCases:
"""Deep tests for nullable column handling during batch import."""
@pytest.mark.anyio
async def test_insert_batch_mixed_nullable_fk(self, db_session: AsyncSession):
"""INSERT batch where some rows set a nullable FK and others don't.
After normalization the omitted role_id becomes None. For INSERT this
is acceptable — both rows should insert successfully with the correct
values (one with FK, one with NULL).
"""
registry = FixtureRegistry()
admin = await RoleCrud.create(db_session, RoleCreate(name="admin"))
uid1 = uuid.uuid4()
uid2 = uuid.uuid4()
@registry.register
def users():
return [
User(
id=uid1, username="with_role", email="a@test.com", role_id=admin.id
),
User(id=uid2, username="no_role", email="b@test.com"),
]
await load_fixtures(db_session, registry, "users", strategy=LoadStrategy.INSERT)
from sqlalchemy import select
rows = {
r.username: r
for r in (await db_session.execute(select(User))).scalars().all()
}
assert rows["with_role"].role_id == admin.id
assert rows["no_role"].role_id is None
@pytest.mark.anyio
async def test_insert_batch_mixed_nullable_notes(self, db_session: AsyncSession):
"""INSERT batch where some rows have notes and others don't.
Ensures normalization doesn't break the insert and that each row gets
the intended value.
"""
registry = FixtureRegistry()
uid1 = uuid.uuid4()
uid2 = uuid.uuid4()
uid3 = uuid.uuid4()
@registry.register
def users():
return [
User(
id=uid1,
username="has_notes",
email="a@test.com",
notes="important",
),
User(id=uid2, username="no_notes", email="b@test.com"),
User(id=uid3, username="null_notes", email="c@test.com", notes=None),
]
await load_fixtures(db_session, registry, "users", strategy=LoadStrategy.INSERT)
from sqlalchemy import select
rows = {
r.username: r
for r in (await db_session.execute(select(User))).scalars().all()
}
assert rows["has_notes"].notes == "important"
assert rows["no_notes"].notes is None
assert rows["null_notes"].notes is None
@pytest.mark.anyio
async def test_merge_batch_mixed_nullable_does_not_overwrite(
self, db_session: AsyncSession
):
"""MERGE batch where one row sets a nullable column and another omits it.
If both rows already exist in DB, the row that omits the column must
NOT have its existing value overwritten with NULL.
This is the core normalization bug: _normalize_rows fills missing keys
with None, and then MERGE's SET clause includes that column for ALL rows.
"""
from sqlalchemy import select
admin = await RoleCrud.create(db_session, RoleCreate(name="admin"))
uid1 = uuid.uuid4()
uid2 = uuid.uuid4()
# Pre-populate: both users have role_id and notes
registry_initial = FixtureRegistry()
@registry_initial.register
def users():
return [
User(
id=uid1,
username="alice",
email="a@test.com",
role_id=admin.id,
notes="alice notes",
),
User(
id=uid2,
username="bob",
email="b@test.com",
role_id=admin.id,
notes="bob notes",
),
]
await load_fixtures(
db_session, registry_initial, "users", strategy=LoadStrategy.INSERT
)
# Re-merge: alice updates notes, bob omits notes entirely
registry_merge = FixtureRegistry()
@registry_merge.register
def users(): # noqa: F811
return [
User(
id=uid1,
username="alice",
email="a@test.com",
role_id=admin.id,
notes="updated",
),
User(
id=uid2,
username="bob",
email="b@test.com",
role_id=admin.id,
), # notes omitted
]
await load_fixtures(
db_session, registry_merge, "users", strategy=LoadStrategy.MERGE
)
rows = {
r.username: r
for r in (await db_session.execute(select(User))).scalars().all()
}
assert rows["alice"].notes == "updated"
# Bob's notes must be preserved, NOT overwritten with NULL
assert rows["bob"].notes == "bob notes"
@pytest.mark.anyio
async def test_merge_batch_mixed_nullable_fk_preserves_existing(
self, db_session: AsyncSession
):
"""MERGE batch where one row sets role_id and another omits it.
The row that omits role_id must keep its existing DB value.
"""
from sqlalchemy import select
admin = await RoleCrud.create(db_session, RoleCreate(name="admin"))
editor = await RoleCrud.create(db_session, RoleCreate(name="editor"))
uid1 = uuid.uuid4()
uid2 = uuid.uuid4()
# Pre-populate
registry_initial = FixtureRegistry()
@registry_initial.register
def users():
return [
User(
id=uid1,
username="alice",
email="a@test.com",
role_id=admin.id,
),
User(
id=uid2,
username="bob",
email="b@test.com",
role_id=editor.id,
),
]
await load_fixtures(
db_session, registry_initial, "users", strategy=LoadStrategy.INSERT
)
# Re-merge: alice changes role, bob omits role_id
registry_merge = FixtureRegistry()
@registry_merge.register
def users(): # noqa: F811
return [
User(
id=uid1,
username="alice",
email="a@test.com",
role_id=editor.id,
),
User(id=uid2, username="bob", email="b@test.com"), # role_id omitted
]
await load_fixtures(
db_session, registry_merge, "users", strategy=LoadStrategy.MERGE
)
rows = {
r.username: r
for r in (await db_session.execute(select(User))).scalars().all()
}
assert rows["alice"].role_id == editor.id # updated
assert rows["bob"].role_id == editor.id # must be preserved, NOT NULL
@pytest.mark.anyio
async def test_insert_batch_mixed_pk_presence(self, db_session: AsyncSession):
"""INSERT batch where some rows have explicit PK and others rely on
the callable default (uuid.uuid4).
Normalization adds the PK key with None to rows that omitted it,
which can cause NOT NULL violations on the PK column.
"""
registry = FixtureRegistry()
explicit_id = uuid.uuid4()
@registry.register
def roles():
return [
Role(id=explicit_id, name="admin"),
Role(name="user"), # PK omitted, relies on default=uuid.uuid4
]
await load_fixtures(db_session, registry, "roles", strategy=LoadStrategy.INSERT)
from sqlalchemy import select
rows = (await db_session.execute(select(Role))).scalars().all()
assert len(rows) == 2
names = {r.name for r in rows}
assert names == {"admin", "user"}
# The "admin" row must have the explicit ID
admin = next(r for r in rows if r.name == "admin")
assert admin.id == explicit_id
# The "user" row must have a generated UUID (not None)
user = next(r for r in rows if r.name == "user")
assert user.id is not None
@pytest.mark.anyio
async def test_skip_existing_batch_mixed_nullable(self, db_session: AsyncSession):
"""SKIP_EXISTING with mixed nullable columns inserts correctly.
Only new rows are inserted; existing rows are untouched regardless of
which columns the fixture provides.
"""
from sqlalchemy import select
admin = await RoleCrud.create(db_session, RoleCreate(name="admin"))
uid1 = uuid.uuid4()
uid2 = uuid.uuid4()
# Pre-populate uid1 with notes
registry_initial = FixtureRegistry()
@registry_initial.register
def users():
return [
User(
id=uid1,
username="alice",
email="a@test.com",
role_id=admin.id,
notes="keep me",
),
]
await load_fixtures(
db_session, registry_initial, "users", strategy=LoadStrategy.INSERT
)
# Load again with SKIP_EXISTING: uid1 already exists, uid2 is new
registry_skip = FixtureRegistry()
@registry_skip.register
def users(): # noqa: F811
return [
User(id=uid1, username="alice-updated", email="a@test.com"), # exists
User(
id=uid2,
username="bob",
email="b@test.com",
notes="new user",
), # new
]
result = await load_fixtures(
db_session, registry_skip, "users", strategy=LoadStrategy.SKIP_EXISTING
)
assert len(result["users"]) == 1 # only bob inserted
rows = {
r.username: r
for r in (await db_session.execute(select(User))).scalars().all()
}
# alice untouched
assert rows["alice"].role_id == admin.id
assert rows["alice"].notes == "keep me"
# bob inserted correctly
assert rows["bob"].notes == "new user"
@pytest.mark.anyio
async def test_insert_batch_every_row_different_nullable_columns(
self, db_session: AsyncSession
):
"""Each row in the batch sets a different combination of nullable columns.
Tests that normalization produces valid SQL for all rows.
"""
registry = FixtureRegistry()
admin = await RoleCrud.create(db_session, RoleCreate(name="admin"))
uid1 = uuid.uuid4()
uid2 = uuid.uuid4()
uid3 = uuid.uuid4()
@registry.register
def users():
return [
User(
id=uid1,
username="all_set",
email="a@test.com",
role_id=admin.id,
notes="full",
),
User(
id=uid2, username="only_role", email="b@test.com", role_id=admin.id
),
User(
id=uid3, username="only_notes", email="c@test.com", notes="partial"
),
]
await load_fixtures(db_session, registry, "users", strategy=LoadStrategy.INSERT)
from sqlalchemy import select
rows = {
r.username: r
for r in (await db_session.execute(select(User))).scalars().all()
}
assert rows["all_set"].role_id == admin.id
assert rows["all_set"].notes == "full"
assert rows["only_role"].role_id == admin.id
assert rows["only_role"].notes is None
assert rows["only_notes"].role_id is None
assert rows["only_notes"].notes == "partial"

File diff suppressed because it is too large Load Diff

71
uv.lock generated
View File

@@ -285,6 +285,7 @@ dev = [
{ name = "coverage" },
{ name = "fastapi-toolsets", extra = ["all"] },
{ name = "httpx" },
{ name = "mike" },
{ name = "mkdocstrings-python" },
{ name = "prek" },
{ name = "pytest" },
@@ -296,6 +297,7 @@ dev = [
{ name = "zensical" },
]
docs = [
{ name = "mike" },
{ name = "mkdocstrings-python" },
{ name = "zensical" },
]
@@ -328,6 +330,7 @@ dev = [
{ name = "coverage", specifier = ">=7.0.0" },
{ name = "fastapi-toolsets", extras = ["all"] },
{ name = "httpx", specifier = ">=0.25.0" },
{ name = "mike", git = "https://github.com/squidfunk/mike.git?tag=2.2.0%2Bzensical-0.1.0" },
{ name = "mkdocstrings-python", specifier = ">=2.0.2" },
{ name = "prek", specifier = ">=0.3.8" },
{ name = "pytest", specifier = ">=8.0.0" },
@@ -336,11 +339,12 @@ dev = [
{ name = "pytest-xdist", specifier = ">=3.0.0" },
{ name = "ruff", specifier = ">=0.1.0" },
{ name = "ty", specifier = ">=0.0.1a0" },
{ name = "zensical", specifier = ">=0.0.23" },
{ name = "zensical", specifier = ">=0.0.30" },
]
docs = [
{ name = "mike", git = "https://github.com/squidfunk/mike.git?tag=2.2.0%2Bzensical-0.1.0" },
{ name = "mkdocstrings-python", specifier = ">=2.0.2" },
{ name = "zensical", specifier = ">=0.0.23" },
{ name = "zensical", specifier = ">=0.0.30" },
]
tests = [
{ name = "coverage", specifier = ">=7.0.0" },
@@ -604,6 +608,17 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/2c/19/04f9b178c2d8a15b076c8b5140708fa6ffc5601fb6f1e975537072df5b2a/mergedeep-1.3.4-py3-none-any.whl", hash = "sha256:70775750742b25c0d8f36c55aed03d24c3384d17c951b3175d898bd778ef0307", size = 6354, upload-time = "2021-02-05T18:55:29.583Z" },
]
[[package]]
name = "mike"
version = "2.2.0+zensical.0.1.0"
source = { git = "https://github.com/squidfunk/mike.git?tag=2.2.0%2Bzensical-0.1.0#0f62791256ebeba60d20d2f1d8fe6ec3b7d1e2b3" }
dependencies = [
{ name = "jinja2" },
{ name = "pyparsing" },
{ name = "verspec" },
{ name = "zensical" },
]
[[package]]
name = "mkdocs"
version = "1.6.1"
@@ -870,11 +885,11 @@ wheels = [
[[package]]
name = "pygments"
version = "2.19.2"
version = "2.20.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" }
sdist = { url = "https://files.pythonhosted.org/packages/c3/b2/bc9c9196916376152d655522fdcebac55e66de6603a76a02bca1b6414f6c/pygments-2.20.0.tar.gz", hash = "sha256:6757cd03768053ff99f3039c1a36d6c0aa0b263438fcab17520b30a303a82b5f", size = 4955991, upload-time = "2026-03-29T13:29:33.898Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" },
{ url = "https://files.pythonhosted.org/packages/f4/7e/a72dd26f3b0f4f2bf1dd8923c85f7ceb43172af56d63c7383eb62b332364/pygments-2.20.0-py3-none-any.whl", hash = "sha256:81a9e26dd42fd28a23a2d169d86d7ac03b46e2f8b59ed4698fb4785f946d0176", size = 1231151, upload-time = "2026-03-29T13:29:30.038Z" },
]
[[package]]
@@ -890,6 +905,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/6f/2c/5b079febdc65e1c3fb2729bf958d18b45be7113828528e8a0b5850dd819a/pymdown_extensions-10.21-py3-none-any.whl", hash = "sha256:91b879f9f864d49794c2d9534372b10150e6141096c3908a455e45ca72ad9d3f", size = 268877, upload-time = "2026-02-15T20:44:05.464Z" },
]
[[package]]
name = "pyparsing"
version = "3.3.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f3/91/9c6ee907786a473bf81c5f53cf703ba0957b23ab84c264080fb5a450416f/pyparsing-3.3.2.tar.gz", hash = "sha256:c777f4d763f140633dcb6d8a3eda953bf7a214dc4eff598413c070bcdc117cbc", size = 6851574, upload-time = "2026-01-21T03:57:59.36Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/10/bd/c038d7cc38edc1aa5bf91ab8068b63d4308c66c4c8bb3cbba7dfbc049f9c/pyparsing-3.3.2-py3-none-any.whl", hash = "sha256:850ba148bd908d7e2411587e247a1e4f0327839c40e2e5e6d05a007ecc69911d", size = 122781, upload-time = "2026-01-21T03:57:55.912Z" },
]
[[package]]
name = "pytest"
version = "9.0.2"
@@ -1262,6 +1286,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/dc/9b/47798a6c91d8bdb567fe2698fe81e0c6b7cb7ef4d13da4114b41d239f65d/typing_inspection-0.4.2-py3-none-any.whl", hash = "sha256:4ed1cacbdc298c220f1bd249ed5287caa16f34d44ef4e9c3d0cbad5b521545e7", size = 14611, upload-time = "2025-10-01T02:14:40.154Z" },
]
[[package]]
name = "verspec"
version = "0.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/e7/44/8126f9f0c44319b2efc65feaad589cadef4d77ece200ae3c9133d58464d0/verspec-0.1.0.tar.gz", hash = "sha256:c4504ca697b2056cdb4bfa7121461f5a0e81809255b41c03dda4ba823637c01e", size = 27123, upload-time = "2020-11-30T02:24:09.646Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a4/ce/3b6fee91c85626eaf769d617f1be9d2e15c1cca027bbdeb2e0d751469355/verspec-0.1.0-py3-none-any.whl", hash = "sha256:741877d5633cc9464c45a469ae2a31e801e6dbbaa85b9675d481cda100f11c31", size = 19640, upload-time = "2020-11-30T02:24:08.387Z" },
]
[[package]]
name = "watchdog"
version = "6.0.0"
@@ -1291,7 +1324,7 @@ wheels = [
[[package]]
name = "zensical"
version = "0.0.29"
version = "0.0.30"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "click" },
@@ -1301,18 +1334,18 @@ dependencies = [
{ name = "pymdown-extensions" },
{ name = "pyyaml" },
]
sdist = { url = "https://files.pythonhosted.org/packages/78/bd/5786ab618a60bd7469ab243a7fd2c9eecb0790c85c784abb8b97edb77a54/zensical-0.0.29.tar.gz", hash = "sha256:0d6282be7cb551e12d5806badf5e94c54a5e2f2cf07057a3e36d1eaf97c33ada", size = 3842641, upload-time = "2026-03-24T13:37:27.587Z" }
sdist = { url = "https://files.pythonhosted.org/packages/1d/53/5e551f8912718816733a75adcb53a0787b2d2edca5869c156325aaf82e24/zensical-0.0.30.tar.gz", hash = "sha256:408b531683f6bcb6cc5ab928146d2c68afbc16fac4eda87ae3dd20af1498180f", size = 3844287, upload-time = "2026-03-28T17:55:52.836Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/4b/9c/8b681daa024abca9763017bec09ecee8008e110cae1254217c8dd22cc339/zensical-0.0.29-cp310-abi3-macosx_10_12_x86_64.whl", hash = "sha256:20ae0709ea14fce25ab33d0a82acdaf454a7a2e232a9ee20c019942205174476", size = 12311399, upload-time = "2026-03-24T13:36:53.809Z" },
{ url = "https://files.pythonhosted.org/packages/81/ae/4ebb4d8bb2ef0164d473698b92f11caf431fc436e1625524acd5641102ca/zensical-0.0.29-cp310-abi3-macosx_11_0_arm64.whl", hash = "sha256:599af3ba66fcd0146d7019f3493ed3c316051fae6c4d5599bc59f3a8f4b8a6f0", size = 12191845, upload-time = "2026-03-24T13:36:56.909Z" },
{ url = "https://files.pythonhosted.org/packages/d5/35/67f89db06571a52283b3ecbe3bcf32fd3115ca50436b3ae177a948b83ea7/zensical-0.0.29-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:eea7e48a00a71c0586e875079b5f83a070c33a147e52ad4383e4b63ab524332b", size = 12554105, upload-time = "2026-03-24T13:36:59.945Z" },
{ url = "https://files.pythonhosted.org/packages/7c/f6/ac79e5d9c18b28557c9ff1c7c23d695fbdd82645d69bfe02292f46d935e7/zensical-0.0.29-cp310-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:59a57db35542e98d2896b833de07d199320f8ada3b4e7ddccb7fe892292d8b74", size = 12498643, upload-time = "2026-03-24T13:37:02.376Z" },
{ url = "https://files.pythonhosted.org/packages/b1/70/5c22a96a69e0e91e569c26236918bb9bab1170f59b29ad04105ead64f199/zensical-0.0.29-cp310-abi3-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d42c2b2a96a80cf64c98ba7242f59ef95109914bd4c9499d7ebc12544663852c", size = 12854531, upload-time = "2026-03-24T13:37:04.962Z" },
{ url = "https://files.pythonhosted.org/packages/79/25/e32237a8fcb0ceae1ef8e192e7f8db53b38f1e48f1c7cdbacd0a7b713892/zensical-0.0.29-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6b2fca39c5f6b1782c77cf6591cf346357cabee85ebdb956c5ddc0fd5169f3d9", size = 12596828, upload-time = "2026-03-24T13:37:07.817Z" },
{ url = "https://files.pythonhosted.org/packages/ff/74/89ac909cbb258903ea53802c184e4986c17ce0ba79b1c7f77b7e78a2dce3/zensical-0.0.29-cp310-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:dfc23a74ef672aa51088c080286319da1dc0b989cd5051e9e5e6d7d4abbc2fc1", size = 12732059, upload-time = "2026-03-24T13:37:11.651Z" },
{ url = "https://files.pythonhosted.org/packages/8c/31/2429de6a9328eed4acc7e9a3789f160294a15115be15f9870a0d02649302/zensical-0.0.29-cp310-abi3-musllinux_1_2_armv7l.whl", hash = "sha256:c9336d4e4b232e3c9a70e30258e916dd7e60c0a2a08c8690065e60350c302028", size = 12768542, upload-time = "2026-03-24T13:37:14.39Z" },
{ url = "https://files.pythonhosted.org/packages/10/8a/55588b2a1dcbe86dad0404506c9ba367a06c663b1ff47147c84d26f7510e/zensical-0.0.29-cp310-abi3-musllinux_1_2_i686.whl", hash = "sha256:30661148f0681199f3b598cbeb1d54f5cba773e54ae840bac639250d85907b84", size = 12917991, upload-time = "2026-03-24T13:37:16.795Z" },
{ url = "https://files.pythonhosted.org/packages/ec/5d/653901f0d3a3ca72daebc62746a148797f4e422cc3a2b66a4e6718e4398f/zensical-0.0.29-cp310-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:6a566ac1fd4bfac5d711a7bd1ae06666712127c2718daa5083c7bf3f107e8578", size = 12868392, upload-time = "2026-03-24T13:37:19.42Z" },
{ url = "https://files.pythonhosted.org/packages/29/58/d7449bc88a174b98daa3f2fbdfbdac3493768a557d8987e88bdaa6c78b1a/zensical-0.0.29-cp310-abi3-win32.whl", hash = "sha256:a231a3a02a3851741dc4d2de8910b5c39fe81e55bf026d8edf4d803e91a922fb", size = 11905486, upload-time = "2026-03-24T13:37:22.154Z" },
{ url = "https://files.pythonhosted.org/packages/f5/09/3fd082d016497c4d26ff20f42a8be2cc91e27191c0c5f3cd6507827f666f/zensical-0.0.29-cp310-abi3-win_amd64.whl", hash = "sha256:7145c5504380a344b8cd4586da815cdde77ef4a42319fa4f35e78250f01985af", size = 12101510, upload-time = "2026-03-24T13:37:24.77Z" },
{ url = "https://files.pythonhosted.org/packages/1b/e3/ac0eb77a8a7f793613813de68bde26776d0da68d8041fa9eb8d0b986a449/zensical-0.0.30-cp310-abi3-macosx_10_12_x86_64.whl", hash = "sha256:b67fca8bfcd71c94b331045a591bf6e24fe123a66fba94587aa3379faf521a16", size = 12313786, upload-time = "2026-03-28T17:55:18.839Z" },
{ url = "https://files.pythonhosted.org/packages/a5/6a/73e461dfa27d3bc415e48396f83a3287b43df2fd3361e25146bc86360aab/zensical-0.0.30-cp310-abi3-macosx_11_0_arm64.whl", hash = "sha256:8ceadfece1153edc26506e8ddf68d9818afe8517cf3bcdb6bfe4cb2793ae247b", size = 12186136, upload-time = "2026-03-28T17:55:21.836Z" },
{ url = "https://files.pythonhosted.org/packages/a3/bc/9022156b4c28c1b95209acb64319b1e5cd0af2e97035bdd461e58408cb46/zensical-0.0.30-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e100b2b654337ac5306ba12818f3c5336c66d0d34c593ef05e316c124a5819cb", size = 12556115, upload-time = "2026-03-28T17:55:24.849Z" },
{ url = "https://files.pythonhosted.org/packages/0b/29/9e8f5bd6d33b35f4c368ae8b13d431dc42b2de17ea6eccbd71d48122eba6/zensical-0.0.30-cp310-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:bdf641ffddaf21c6971b91a4426b81cd76271c5b1adb7176afcce3f1508328b1", size = 12498121, upload-time = "2026-03-28T17:55:27.637Z" },
{ url = "https://files.pythonhosted.org/packages/c4/e1/b8dfa0769050e62cd731358145fdeb67af35e322197bd7e7727250596e7b/zensical-0.0.30-cp310-abi3-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fd909a0c2116e26190c7f3ec4fb55837c417b7a8d99ebf4f3deb26b07b97e49", size = 12854142, upload-time = "2026-03-28T17:55:30.54Z" },
{ url = "https://files.pythonhosted.org/packages/04/11/62a36cfb81522b6108db8f9e96d36da8cccb306b02c15ad19e1b333fa7c8/zensical-0.0.30-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:16fd2da09fe4e5cbec2ca74f31abc70f32f7330d56593b647e0a114bb329171a", size = 12598341, upload-time = "2026-03-28T17:55:32.988Z" },
{ url = "https://files.pythonhosted.org/packages/a7/a4/8c7a6725fb226aa71d19209403d974e45f39d757e725f9558c6ed8d350a5/zensical-0.0.30-cp310-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:896b36eaef7fed5f8fc6f2c8264b2751aad63c2d66d3d8650e38481b6b4f6f7b", size = 12732307, upload-time = "2026-03-28T17:55:35.618Z" },
{ url = "https://files.pythonhosted.org/packages/5e/a1/7858fb3f6ac67d7d24a8acbe834cbe26851d6bd151ece6fba3fc88b0f878/zensical-0.0.30-cp310-abi3-musllinux_1_2_armv7l.whl", hash = "sha256:a1f515ec67a0d0250e53846327bf0c69635a1f39749da3b04feb68431188d3c6", size = 12770962, upload-time = "2026-03-28T17:55:38.627Z" },
{ url = "https://files.pythonhosted.org/packages/49/b7/228298112a69d0b74e6e93041bffcf1fc96d03cf252be94a354f277d4789/zensical-0.0.30-cp310-abi3-musllinux_1_2_i686.whl", hash = "sha256:ce33d1002438838a35fa43358a1f43d74f874586596d3d116999d3756cded00e", size = 12919256, upload-time = "2026-03-28T17:55:41.413Z" },
{ url = "https://files.pythonhosted.org/packages/de/c7/5b4ea036f7f7d84abf907f7f7a3e8420b054c89279c5273ca248d3bc9f48/zensical-0.0.30-cp310-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:029dad561568f4ae3056dde16a81012efd92c426d4eb7101f960f448c1168196", size = 12869760, upload-time = "2026-03-28T17:55:44.474Z" },
{ url = "https://files.pythonhosted.org/packages/36/b4/77bef2132e43108db718ae014a5961fc511e88fc446c11f1c3483def429e/zensical-0.0.30-cp310-abi3-win32.whl", hash = "sha256:0105672850f053c326fba9fdd95adf60e9f90308f8cc1c08e3a00e15a8d5e90f", size = 11905658, upload-time = "2026-03-28T17:55:47.416Z" },
{ url = "https://files.pythonhosted.org/packages/a1/59/23b6c7ff062e2b299cc60e333095e853f9d38d1b5abe743c7b94c4ac432c/zensical-0.0.30-cp310-abi3-win_amd64.whl", hash = "sha256:b879dbf4c69d3ea41694bae33e1b948847e635dcbcd6ec8c522920833379dd48", size = 12101867, upload-time = "2026-03-28T17:55:50.083Z" },
]

View File

@@ -140,6 +140,7 @@ Examples = [
[[project.nav]]
Migration = [
{"v3.0" = "migration/v3.md"},
{"v2.0" = "migration/v2.md"},
]