mirror of
https://github.com/d3vyce/fastapi-toolsets.git
synced 2026-04-16 14:46:24 +02:00
Compare commits
5 Commits
aca3f62a6b
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c853d13581 | ||
|
94e7d79d06
|
|||
|
|
9268b576b4 | ||
|
|
863e6ce6e9 | ||
|
|
c7397faea4 |
@@ -118,6 +118,57 @@ async def clean(db_session):
|
||||
await cleanup_tables(session=db_session, base=Base)
|
||||
```
|
||||
|
||||
## Many-to-Many helpers
|
||||
|
||||
SQLAlchemy's ORM collection API triggers lazy-loads when you append to a relationship inside a savepoint (e.g. inside `lock_tables` or a nested `get_transaction`). The three `m2m_*` helpers bypass the ORM collection entirely and issue direct SQL against the association table.
|
||||
|
||||
### `m2m_add` — insert associations
|
||||
|
||||
[`m2m_add`](../reference/db.md#fastapi_toolsets.db.m2m_add) inserts one or more rows into a secondary table without touching the ORM collection:
|
||||
|
||||
```python
|
||||
from fastapi_toolsets.db import lock_tables, m2m_add
|
||||
|
||||
async with lock_tables(session, [Tag]):
|
||||
tag = await TagCrud.create(session, TagCreate(name="python"))
|
||||
await m2m_add(session, post, Post.tags, tag)
|
||||
```
|
||||
|
||||
Pass `ignore_conflicts=True` to silently skip associations that already exist:
|
||||
|
||||
```python
|
||||
await m2m_add(session, post, Post.tags, tag, ignore_conflicts=True)
|
||||
```
|
||||
|
||||
### `m2m_remove` — delete associations
|
||||
|
||||
[`m2m_remove`](../reference/db.md#fastapi_toolsets.db.m2m_remove) deletes specific association rows. Removing a non-existent association is a no-op:
|
||||
|
||||
```python
|
||||
from fastapi_toolsets.db import get_transaction, m2m_remove
|
||||
|
||||
async with get_transaction(session):
|
||||
await m2m_remove(session, post, Post.tags, tag1, tag2)
|
||||
```
|
||||
|
||||
### `m2m_set` — replace the full set
|
||||
|
||||
[`m2m_set`](../reference/db.md#fastapi_toolsets.db.m2m_set) atomically replaces all associations: it deletes every existing row for the owner instance then inserts the new set. Passing no related instances clears the association entirely:
|
||||
|
||||
```python
|
||||
from fastapi_toolsets.db import get_transaction, m2m_set
|
||||
|
||||
# Replace all tags
|
||||
async with get_transaction(session):
|
||||
await m2m_set(session, post, Post.tags, tag_a, tag_b)
|
||||
|
||||
# Clear all tags
|
||||
async with get_transaction(session):
|
||||
await m2m_set(session, post, Post.tags)
|
||||
```
|
||||
|
||||
All three helpers raise `TypeError` if the relationship attribute is not a Many-to-Many (i.e. has no secondary table).
|
||||
|
||||
---
|
||||
|
||||
[:material-api: API Reference](../reference/db.md)
|
||||
|
||||
@@ -13,6 +13,9 @@ from fastapi_toolsets.db import (
|
||||
create_db_context,
|
||||
get_transaction,
|
||||
lock_tables,
|
||||
m2m_add,
|
||||
m2m_remove,
|
||||
m2m_set,
|
||||
wait_for_row_change,
|
||||
)
|
||||
```
|
||||
@@ -32,3 +35,9 @@ from fastapi_toolsets.db import (
|
||||
## ::: fastapi_toolsets.db.create_database
|
||||
|
||||
## ::: fastapi_toolsets.db.cleanup_tables
|
||||
|
||||
## ::: fastapi_toolsets.db.m2m_add
|
||||
|
||||
## ::: fastapi_toolsets.db.m2m_remove
|
||||
|
||||
## ::: fastapi_toolsets.db.m2m_set
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "fastapi-toolsets"
|
||||
version = "3.0.3"
|
||||
version = "3.1.0"
|
||||
description = "Production-ready utilities for FastAPI applications"
|
||||
readme = "README.md"
|
||||
license = "MIT"
|
||||
|
||||
@@ -21,4 +21,4 @@ Example usage:
|
||||
return Response(data={"user": user.username}, message="Success")
|
||||
"""
|
||||
|
||||
__version__ = "3.0.3"
|
||||
__version__ = "3.1.0"
|
||||
|
||||
@@ -4,11 +4,13 @@ import asyncio
|
||||
from collections.abc import AsyncGenerator, Callable
|
||||
from contextlib import AbstractAsyncContextManager, asynccontextmanager
|
||||
from enum import Enum
|
||||
from typing import Any, TypeVar
|
||||
from typing import Any, TypeVar, cast
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy import Table, delete, text, tuple_
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
from sqlalchemy.orm import DeclarativeBase, QueryableAttribute
|
||||
from sqlalchemy.orm.relationships import RelationshipProperty
|
||||
|
||||
from .exceptions import NotFoundError
|
||||
|
||||
@@ -20,6 +22,9 @@ __all__ = [
|
||||
"create_db_dependency",
|
||||
"get_transaction",
|
||||
"lock_tables",
|
||||
"m2m_add",
|
||||
"m2m_remove",
|
||||
"m2m_set",
|
||||
"wait_for_row_change",
|
||||
]
|
||||
|
||||
@@ -339,3 +344,140 @@ async def wait_for_row_change(
|
||||
current = {col: getattr(instance, col) for col in watch_cols}
|
||||
if current != initial:
|
||||
return instance
|
||||
|
||||
|
||||
def _m2m_prop(rel_attr: QueryableAttribute) -> RelationshipProperty: # type: ignore[type-arg]
|
||||
"""Return the validated M2M RelationshipProperty for *rel_attr*.
|
||||
|
||||
Raises TypeError if *rel_attr* is not a Many-to-Many relationship.
|
||||
"""
|
||||
prop = rel_attr.property
|
||||
if not isinstance(prop, RelationshipProperty) or prop.secondary is None:
|
||||
raise TypeError(
|
||||
f"m2m helpers require a Many-to-Many relationship attribute, "
|
||||
f"got {rel_attr!r}. Use a relationship with a secondary table."
|
||||
)
|
||||
return prop
|
||||
|
||||
|
||||
async def m2m_add(
|
||||
session: AsyncSession,
|
||||
instance: DeclarativeBase,
|
||||
rel_attr: QueryableAttribute,
|
||||
*related: DeclarativeBase,
|
||||
ignore_conflicts: bool = False,
|
||||
) -> None:
|
||||
"""Insert rows into a Many-to-Many association table without loading the ORM collection.
|
||||
|
||||
Args:
|
||||
session: DB async session.
|
||||
instance: The "owner" side model instance (e.g. the ``A`` in ``A.b_list``).
|
||||
rel_attr: The M2M relationship attribute on the model class (e.g. ``A.b_list``).
|
||||
*related: One or more related instances to associate with ``instance``.
|
||||
ignore_conflicts: When ``True``, silently skip rows that already exist
|
||||
in the association table (``ON CONFLICT DO NOTHING``).
|
||||
|
||||
Raises:
|
||||
TypeError: If ``rel_attr`` is not a Many-to-Many relationship.
|
||||
"""
|
||||
prop = _m2m_prop(rel_attr)
|
||||
if not related:
|
||||
return
|
||||
|
||||
secondary = cast(Table, prop.secondary)
|
||||
assert secondary is not None # guaranteed by _m2m_prop
|
||||
sync_pairs = prop.secondary_synchronize_pairs
|
||||
assert sync_pairs is not None # set whenever secondary is set
|
||||
|
||||
# synchronize_pairs: [(parent_col, assoc_col), ...]
|
||||
# secondary_synchronize_pairs: [(related_col, assoc_col), ...]
|
||||
rows: list[dict[str, Any]] = []
|
||||
for rel_instance in related:
|
||||
row: dict[str, Any] = {}
|
||||
for parent_col, assoc_col in prop.synchronize_pairs:
|
||||
row[assoc_col.name] = getattr(instance, cast(str, parent_col.key))
|
||||
for related_col, assoc_col in sync_pairs:
|
||||
row[assoc_col.name] = getattr(rel_instance, cast(str, related_col.key))
|
||||
rows.append(row)
|
||||
|
||||
stmt = pg_insert(secondary).values(rows)
|
||||
if ignore_conflicts:
|
||||
stmt = stmt.on_conflict_do_nothing()
|
||||
await session.execute(stmt)
|
||||
|
||||
|
||||
async def m2m_remove(
|
||||
session: AsyncSession,
|
||||
instance: DeclarativeBase,
|
||||
rel_attr: QueryableAttribute,
|
||||
*related: DeclarativeBase,
|
||||
) -> None:
|
||||
"""Remove rows from a Many-to-Many association table without loading the ORM collection.
|
||||
|
||||
Args:
|
||||
session: DB async session.
|
||||
instance: The "owner" side model instance (e.g. the ``A`` in ``A.b_list``).
|
||||
rel_attr: The M2M relationship attribute on the model class (e.g. ``A.b_list``).
|
||||
*related: One or more related instances to disassociate from ``instance``.
|
||||
|
||||
Raises:
|
||||
TypeError: If ``rel_attr`` is not a Many-to-Many relationship.
|
||||
"""
|
||||
prop = _m2m_prop(rel_attr)
|
||||
if not related:
|
||||
return
|
||||
|
||||
secondary = cast(Table, prop.secondary)
|
||||
assert secondary is not None # guaranteed by _m2m_prop
|
||||
related_pairs = prop.secondary_synchronize_pairs
|
||||
assert related_pairs is not None # set whenever secondary is set
|
||||
|
||||
parent_where = [
|
||||
assoc_col == getattr(instance, cast(str, parent_col.key))
|
||||
for parent_col, assoc_col in prop.synchronize_pairs
|
||||
]
|
||||
|
||||
if len(related_pairs) == 1:
|
||||
related_col, assoc_col = related_pairs[0]
|
||||
related_values = [getattr(r, cast(str, related_col.key)) for r in related]
|
||||
related_where = assoc_col.in_(related_values)
|
||||
else:
|
||||
assoc_cols = [ac for _, ac in related_pairs]
|
||||
rel_cols = [rc for rc, _ in related_pairs]
|
||||
related_values_t = [
|
||||
tuple(getattr(r, cast(str, rc.key)) for rc in rel_cols) for r in related
|
||||
]
|
||||
related_where = tuple_(*assoc_cols).in_(related_values_t)
|
||||
|
||||
await session.execute(delete(secondary).where(*parent_where, related_where))
|
||||
|
||||
|
||||
async def m2m_set(
|
||||
session: AsyncSession,
|
||||
instance: DeclarativeBase,
|
||||
rel_attr: QueryableAttribute,
|
||||
*related: DeclarativeBase,
|
||||
) -> None:
|
||||
"""Replace the entire Many-to-Many association set atomically.
|
||||
|
||||
Args:
|
||||
session: DB async session.
|
||||
instance: The "owner" side model instance (e.g. the ``A`` in ``A.b_list``).
|
||||
rel_attr: The M2M relationship attribute on the model class (e.g. ``A.b_list``).
|
||||
*related: The new complete set of related instances.
|
||||
|
||||
Raises:
|
||||
TypeError: If ``rel_attr`` is not a Many-to-Many relationship.
|
||||
"""
|
||||
prop = _m2m_prop(rel_attr)
|
||||
secondary = cast(Table, prop.secondary)
|
||||
assert secondary is not None # guaranteed by _m2m_prop
|
||||
|
||||
parent_where = [
|
||||
assoc_col == getattr(instance, cast(str, parent_col.key))
|
||||
for parent_col, assoc_col in prop.synchronize_pairs
|
||||
]
|
||||
await session.execute(delete(secondary).where(*parent_where))
|
||||
|
||||
if related:
|
||||
await m2m_add(session, instance, rel_attr, *related)
|
||||
|
||||
@@ -2,12 +2,18 @@
|
||||
|
||||
from .enum import LoadStrategy
|
||||
from .registry import Context, FixtureRegistry
|
||||
from .utils import get_obj_by_attr, load_fixtures, load_fixtures_by_context
|
||||
from .utils import (
|
||||
get_field_by_attr,
|
||||
get_obj_by_attr,
|
||||
load_fixtures,
|
||||
load_fixtures_by_context,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"Context",
|
||||
"FixtureRegistry",
|
||||
"LoadStrategy",
|
||||
"get_field_by_attr",
|
||||
"get_obj_by_attr",
|
||||
"load_fixtures",
|
||||
"load_fixtures_by_context",
|
||||
|
||||
@@ -250,6 +250,31 @@ def get_obj_by_attr(
|
||||
) from None
|
||||
|
||||
|
||||
def get_field_by_attr(
|
||||
fixtures: Callable[[], Sequence[ModelType]],
|
||||
attr_name: str,
|
||||
value: Any,
|
||||
*,
|
||||
field: str = "id",
|
||||
) -> Any:
|
||||
"""Get a single field value from a fixture object matched by an attribute.
|
||||
|
||||
Args:
|
||||
fixtures: A fixture function registered via ``@registry.register``
|
||||
that returns a sequence of SQLAlchemy model instances.
|
||||
attr_name: Name of the attribute to match against.
|
||||
value: Value to match.
|
||||
field: Attribute name to return from the matched object (default: ``"id"``).
|
||||
|
||||
Returns:
|
||||
The value of ``field`` on the first matching model instance.
|
||||
|
||||
Raises:
|
||||
StopIteration: If no matching object is found in the fixture group.
|
||||
"""
|
||||
return getattr(get_obj_by_attr(fixtures, attr_name, value), field)
|
||||
|
||||
|
||||
async def load_fixtures(
|
||||
session: AsyncSession,
|
||||
registry: FixtureRegistry,
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
"""Pytest plugin for using FixtureRegistry fixtures in tests."""
|
||||
|
||||
from collections.abc import Callable, Sequence
|
||||
from typing import Any
|
||||
from typing import Any, cast
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
from sqlalchemy.orm import DeclarativeBase, selectinload
|
||||
from sqlalchemy.orm.interfaces import ExecutableOption, ORMOption
|
||||
|
||||
from ..db import get_transaction
|
||||
from ..fixtures import FixtureRegistry, LoadStrategy
|
||||
@@ -112,7 +114,7 @@ def _create_fixture_function(
|
||||
elif strategy == LoadStrategy.MERGE:
|
||||
merged = await session.merge(instance)
|
||||
loaded.append(merged)
|
||||
elif strategy == LoadStrategy.SKIP_EXISTING:
|
||||
elif strategy == LoadStrategy.SKIP_EXISTING: # pragma: no branch
|
||||
pk = _get_primary_key(instance)
|
||||
if pk is not None:
|
||||
existing = await session.get(type(instance), pk)
|
||||
@@ -125,6 +127,11 @@ def _create_fixture_function(
|
||||
session.add(instance)
|
||||
loaded.append(instance)
|
||||
|
||||
if loaded: # pragma: no branch
|
||||
load_options = _relationship_load_options(type(loaded[0]))
|
||||
if load_options:
|
||||
return await _reload_with_relationships(session, loaded, load_options)
|
||||
|
||||
return loaded
|
||||
|
||||
# Update function signature to include dependencies
|
||||
@@ -141,6 +148,54 @@ def _create_fixture_function(
|
||||
return created_func
|
||||
|
||||
|
||||
def _relationship_load_options(model: type[DeclarativeBase]) -> list[ExecutableOption]:
|
||||
"""Build selectinload options for all direct relationships on a model."""
|
||||
return [
|
||||
selectinload(getattr(model, rel.key)) for rel in model.__mapper__.relationships
|
||||
]
|
||||
|
||||
|
||||
async def _reload_with_relationships(
|
||||
session: AsyncSession,
|
||||
instances: list[DeclarativeBase],
|
||||
load_options: list[ExecutableOption],
|
||||
) -> list[DeclarativeBase]:
|
||||
"""Reload instances in a single bulk query with relationship eager-loading.
|
||||
|
||||
Uses one SELECT … WHERE pk IN (…) so selectinload can batch all relationship
|
||||
queries — 1 + N_relationships round-trips regardless of how many instances
|
||||
there are, instead of one session.get() per instance.
|
||||
|
||||
Preserves the original insertion order.
|
||||
"""
|
||||
model = type(instances[0])
|
||||
mapper = model.__mapper__
|
||||
pk_cols = mapper.primary_key
|
||||
|
||||
if len(pk_cols) == 1:
|
||||
pk_attr = getattr(model, pk_cols[0].key)
|
||||
pks = [getattr(inst, pk_cols[0].key) for inst in instances]
|
||||
result = await session.execute(
|
||||
select(model).where(pk_attr.in_(pks)).options(*load_options)
|
||||
)
|
||||
by_pk = {getattr(row, pk_cols[0].key): row for row in result.unique().scalars()}
|
||||
return [by_pk[pk] for pk in pks]
|
||||
|
||||
# Composite PK: fall back to per-instance reload
|
||||
reloaded: list[DeclarativeBase] = []
|
||||
for instance in instances:
|
||||
pk = _get_primary_key(instance)
|
||||
refreshed = await session.get(
|
||||
model,
|
||||
pk,
|
||||
options=cast(list[ORMOption], load_options),
|
||||
populate_existing=True,
|
||||
)
|
||||
if refreshed is not None: # pragma: no branch
|
||||
reloaded.append(refreshed)
|
||||
return reloaded
|
||||
|
||||
|
||||
def _get_primary_key(instance: DeclarativeBase) -> Any | None:
|
||||
"""Get the primary key value of a model instance."""
|
||||
mapper = instance.__class__.__mapper__
|
||||
|
||||
454
tests/test_db.py
454
tests/test_db.py
@@ -4,10 +4,26 @@ import asyncio
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy import (
|
||||
Column,
|
||||
ForeignKey,
|
||||
ForeignKeyConstraint,
|
||||
String,
|
||||
Table,
|
||||
Uuid,
|
||||
select,
|
||||
text,
|
||||
)
|
||||
from sqlalchemy.engine import make_url
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
from sqlalchemy.orm import (
|
||||
DeclarativeBase,
|
||||
Mapped,
|
||||
mapped_column,
|
||||
relationship,
|
||||
selectinload,
|
||||
)
|
||||
|
||||
from fastapi_toolsets.db import (
|
||||
LockMode,
|
||||
@@ -17,12 +33,15 @@ from fastapi_toolsets.db import (
|
||||
create_db_dependency,
|
||||
get_transaction,
|
||||
lock_tables,
|
||||
m2m_add,
|
||||
m2m_remove,
|
||||
m2m_set,
|
||||
wait_for_row_change,
|
||||
)
|
||||
from fastapi_toolsets.exceptions import NotFoundError
|
||||
from fastapi_toolsets.pytest import create_db_session
|
||||
|
||||
from .conftest import DATABASE_URL, Base, Role, RoleCrud, User, UserCrud
|
||||
from .conftest import DATABASE_URL, Base, Post, Role, RoleCrud, Tag, User, UserCrud
|
||||
|
||||
|
||||
class TestCreateDbDependency:
|
||||
@@ -81,6 +100,21 @@ class TestCreateDbDependency:
|
||||
|
||||
await engine.dispose()
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_no_commit_when_not_in_transaction(self):
|
||||
"""Dependency skips commit if the session is no longer in a transaction on exit."""
|
||||
engine = create_async_engine(DATABASE_URL, echo=False)
|
||||
session_factory = async_sessionmaker(engine, expire_on_commit=False)
|
||||
get_db = create_db_dependency(session_factory)
|
||||
|
||||
async for session in get_db():
|
||||
# Manually commit — session exits the transaction
|
||||
await session.commit()
|
||||
assert not session.in_transaction()
|
||||
# The dependency's post-yield path must not call commit again (no error)
|
||||
|
||||
await engine.dispose()
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_update_after_lock_tables_is_persisted(self):
|
||||
"""Changes made after lock_tables exits (before endpoint returns) are committed.
|
||||
@@ -480,3 +514,417 @@ class TestCleanupTables:
|
||||
async with create_db_session(DATABASE_URL, Base, drop_tables=True) as session:
|
||||
# Should not raise
|
||||
await cleanup_tables(session, EmptyBase)
|
||||
|
||||
|
||||
class TestM2MAdd:
|
||||
"""Tests for m2m_add helper."""
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_adds_single_related(self, db_session: AsyncSession):
|
||||
"""Associates one related instance via the secondary table."""
|
||||
user = User(username="m2m_author", email="m2m@test.com")
|
||||
db_session.add(user)
|
||||
await db_session.flush()
|
||||
|
||||
post = Post(title="Post A", author_id=user.id)
|
||||
tag = Tag(name="python")
|
||||
db_session.add_all([post, tag])
|
||||
await db_session.flush()
|
||||
|
||||
async with get_transaction(db_session):
|
||||
await m2m_add(db_session, post, Post.tags, tag)
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Post).where(Post.id == post.id).options(selectinload(Post.tags))
|
||||
)
|
||||
loaded = result.scalar_one()
|
||||
assert len(loaded.tags) == 1
|
||||
assert loaded.tags[0].id == tag.id
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_adds_multiple_related(self, db_session: AsyncSession):
|
||||
"""Associates multiple related instances in a single call."""
|
||||
user = User(username="m2m_author2", email="m2m2@test.com")
|
||||
db_session.add(user)
|
||||
await db_session.flush()
|
||||
|
||||
post = Post(title="Post B", author_id=user.id)
|
||||
tag1 = Tag(name="web")
|
||||
tag2 = Tag(name="api")
|
||||
tag3 = Tag(name="async")
|
||||
db_session.add_all([post, tag1, tag2, tag3])
|
||||
await db_session.flush()
|
||||
|
||||
async with get_transaction(db_session):
|
||||
await m2m_add(db_session, post, Post.tags, tag1, tag2, tag3)
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Post).where(Post.id == post.id).options(selectinload(Post.tags))
|
||||
)
|
||||
loaded = result.scalar_one()
|
||||
assert {t.id for t in loaded.tags} == {tag1.id, tag2.id, tag3.id}
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_noop_for_empty_related(self, db_session: AsyncSession):
|
||||
"""Calling with no related instances is a no-op."""
|
||||
user = User(username="m2m_author3", email="m2m3@test.com")
|
||||
db_session.add(user)
|
||||
await db_session.flush()
|
||||
|
||||
post = Post(title="Post C", author_id=user.id)
|
||||
db_session.add(post)
|
||||
await db_session.flush()
|
||||
|
||||
async with get_transaction(db_session):
|
||||
await m2m_add(db_session, post, Post.tags) # no related instances
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Post).where(Post.id == post.id).options(selectinload(Post.tags))
|
||||
)
|
||||
loaded = result.scalar_one()
|
||||
assert loaded.tags == []
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_ignore_conflicts_true(self, db_session: AsyncSession):
|
||||
"""Duplicate inserts are silently skipped when ignore_conflicts=True."""
|
||||
user = User(username="m2m_author4", email="m2m4@test.com")
|
||||
db_session.add(user)
|
||||
await db_session.flush()
|
||||
|
||||
post = Post(title="Post D", author_id=user.id)
|
||||
tag = Tag(name="duplicate_tag")
|
||||
db_session.add_all([post, tag])
|
||||
await db_session.flush()
|
||||
|
||||
async with get_transaction(db_session):
|
||||
await m2m_add(db_session, post, Post.tags, tag)
|
||||
|
||||
# Second call with ignore_conflicts=True must not raise
|
||||
async with get_transaction(db_session):
|
||||
await m2m_add(db_session, post, Post.tags, tag, ignore_conflicts=True)
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Post).where(Post.id == post.id).options(selectinload(Post.tags))
|
||||
)
|
||||
loaded = result.scalar_one()
|
||||
assert len(loaded.tags) == 1
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_ignore_conflicts_false_raises(self, db_session: AsyncSession):
|
||||
"""Duplicate inserts raise IntegrityError when ignore_conflicts=False (default)."""
|
||||
user = User(username="m2m_author5", email="m2m5@test.com")
|
||||
db_session.add(user)
|
||||
await db_session.flush()
|
||||
|
||||
post = Post(title="Post E", author_id=user.id)
|
||||
tag = Tag(name="conflict_tag")
|
||||
db_session.add_all([post, tag])
|
||||
await db_session.flush()
|
||||
|
||||
async with get_transaction(db_session):
|
||||
await m2m_add(db_session, post, Post.tags, tag)
|
||||
|
||||
with pytest.raises(IntegrityError):
|
||||
async with get_transaction(db_session):
|
||||
await m2m_add(db_session, post, Post.tags, tag)
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_non_m2m_raises_type_error(self, db_session: AsyncSession):
|
||||
"""Passing a non-M2M relationship attribute raises TypeError."""
|
||||
user = User(username="m2m_author6", email="m2m6@test.com")
|
||||
db_session.add(user)
|
||||
await db_session.flush()
|
||||
|
||||
role = Role(name="type_err_role")
|
||||
db_session.add(role)
|
||||
await db_session.flush()
|
||||
|
||||
with pytest.raises(TypeError, match="Many-to-Many"):
|
||||
await m2m_add(db_session, user, User.role, role)
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_works_inside_lock_tables(self, db_session: AsyncSession):
|
||||
"""m2m_add works correctly inside a lock_tables nested transaction."""
|
||||
user = User(username="m2m_lock_author", email="m2m_lock@test.com")
|
||||
db_session.add(user)
|
||||
await db_session.flush()
|
||||
|
||||
async with lock_tables(db_session, [Tag]):
|
||||
tag = Tag(name="locked_tag")
|
||||
db_session.add(tag)
|
||||
await db_session.flush()
|
||||
|
||||
post = Post(title="Post Lock", author_id=user.id)
|
||||
db_session.add(post)
|
||||
await db_session.flush()
|
||||
|
||||
await m2m_add(db_session, post, Post.tags, tag)
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Post).where(Post.id == post.id).options(selectinload(Post.tags))
|
||||
)
|
||||
loaded = result.scalar_one()
|
||||
assert len(loaded.tags) == 1
|
||||
assert loaded.tags[0].name == "locked_tag"
|
||||
|
||||
|
||||
class _LocalBase(DeclarativeBase):
|
||||
pass
|
||||
|
||||
|
||||
_comp_assoc = Table(
|
||||
"_comp_assoc",
|
||||
_LocalBase.metadata,
|
||||
Column("owner_id", Uuid, ForeignKey("_comp_owners.id"), primary_key=True),
|
||||
Column("item_group", String(50), primary_key=True),
|
||||
Column("item_code", String(50), primary_key=True),
|
||||
ForeignKeyConstraint(
|
||||
["item_group", "item_code"],
|
||||
["_comp_items.group_id", "_comp_items.item_code"],
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class _CompOwner(_LocalBase):
|
||||
__tablename__ = "_comp_owners"
|
||||
id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)
|
||||
items: Mapped[list["_CompItem"]] = relationship(secondary=_comp_assoc)
|
||||
|
||||
|
||||
class _CompItem(_LocalBase):
|
||||
__tablename__ = "_comp_items"
|
||||
group_id: Mapped[str] = mapped_column(String(50), primary_key=True)
|
||||
item_code: Mapped[str] = mapped_column(String(50), primary_key=True)
|
||||
|
||||
|
||||
class TestM2MRemove:
|
||||
"""Tests for m2m_remove helper."""
|
||||
|
||||
async def _setup(
|
||||
self, session: AsyncSession, username: str, email: str, *tag_names: str
|
||||
):
|
||||
"""Create a user, post, and tags; associate all tags with the post."""
|
||||
user = User(username=username, email=email)
|
||||
session.add(user)
|
||||
await session.flush()
|
||||
|
||||
post = Post(title=f"Post {username}", author_id=user.id)
|
||||
tags = [Tag(name=n) for n in tag_names]
|
||||
session.add(post)
|
||||
session.add_all(tags)
|
||||
await session.flush()
|
||||
|
||||
async with get_transaction(session):
|
||||
await m2m_add(session, post, Post.tags, *tags)
|
||||
|
||||
return post, tags
|
||||
|
||||
async def _load_tags(self, session: AsyncSession, post: Post) -> list[Tag]:
|
||||
result = await session.execute(
|
||||
select(Post).where(Post.id == post.id).options(selectinload(Post.tags))
|
||||
)
|
||||
return result.scalar_one().tags
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_removes_single(self, db_session: AsyncSession):
|
||||
"""Removes one association, leaving others intact."""
|
||||
post, (tag1, tag2) = await self._setup(
|
||||
db_session, "rm_author1", "rm1@test.com", "tag_rm_a", "tag_rm_b"
|
||||
)
|
||||
|
||||
async with get_transaction(db_session):
|
||||
await m2m_remove(db_session, post, Post.tags, tag1)
|
||||
|
||||
remaining = await self._load_tags(db_session, post)
|
||||
assert len(remaining) == 1
|
||||
assert remaining[0].id == tag2.id
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_removes_multiple(self, db_session: AsyncSession):
|
||||
"""Removes multiple associations in one call."""
|
||||
post, (tag1, tag2, tag3) = await self._setup(
|
||||
db_session, "rm_author2", "rm2@test.com", "tag_rm_c", "tag_rm_d", "tag_rm_e"
|
||||
)
|
||||
|
||||
async with get_transaction(db_session):
|
||||
await m2m_remove(db_session, post, Post.tags, tag1, tag3)
|
||||
|
||||
remaining = await self._load_tags(db_session, post)
|
||||
assert len(remaining) == 1
|
||||
assert remaining[0].id == tag2.id
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_noop_for_empty_related(self, db_session: AsyncSession):
|
||||
"""Calling with no related instances is a no-op."""
|
||||
post, (tag,) = await self._setup(
|
||||
db_session, "rm_author3", "rm3@test.com", "tag_rm_f"
|
||||
)
|
||||
|
||||
async with get_transaction(db_session):
|
||||
await m2m_remove(db_session, post, Post.tags)
|
||||
|
||||
remaining = await self._load_tags(db_session, post)
|
||||
assert len(remaining) == 1
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_idempotent_for_missing_association(self, db_session: AsyncSession):
|
||||
"""Removing a non-existent association does not raise."""
|
||||
post, (tag1,) = await self._setup(
|
||||
db_session, "rm_author4", "rm4@test.com", "tag_rm_g"
|
||||
)
|
||||
tag2 = Tag(name="tag_rm_h")
|
||||
db_session.add(tag2)
|
||||
await db_session.flush()
|
||||
|
||||
# tag2 was never associated — should not raise
|
||||
async with get_transaction(db_session):
|
||||
await m2m_remove(db_session, post, Post.tags, tag2)
|
||||
|
||||
remaining = await self._load_tags(db_session, post)
|
||||
assert len(remaining) == 1
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_non_m2m_raises_type_error(self, db_session: AsyncSession):
|
||||
"""Passing a non-M2M relationship attribute raises TypeError."""
|
||||
user = User(username="rm_author5", email="rm5@test.com")
|
||||
db_session.add(user)
|
||||
await db_session.flush()
|
||||
|
||||
role = Role(name="rm_type_err_role")
|
||||
db_session.add(role)
|
||||
await db_session.flush()
|
||||
|
||||
with pytest.raises(TypeError, match="Many-to-Many"):
|
||||
await m2m_remove(db_session, user, User.role, role)
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_removes_composite_pk_related(self):
|
||||
"""Composite-PK branch: DELETE uses tuple IN when related side has multi-col PK."""
|
||||
engine = create_async_engine(DATABASE_URL, echo=False)
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(_LocalBase.metadata.create_all)
|
||||
|
||||
session_factory = async_sessionmaker(engine, expire_on_commit=False)
|
||||
try:
|
||||
async with session_factory() as session:
|
||||
owner = _CompOwner()
|
||||
item1 = _CompItem(group_id="g1", item_code="c1")
|
||||
item2 = _CompItem(group_id="g1", item_code="c2")
|
||||
session.add_all([owner, item1, item2])
|
||||
await session.flush()
|
||||
|
||||
async with get_transaction(session):
|
||||
await m2m_add(session, owner, _CompOwner.items, item1, item2)
|
||||
|
||||
async with get_transaction(session):
|
||||
await m2m_remove(session, owner, _CompOwner.items, item1)
|
||||
|
||||
await session.commit()
|
||||
|
||||
async with session_factory() as verify:
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
result = await verify.execute(
|
||||
select(_CompOwner)
|
||||
.where(_CompOwner.id == owner.id)
|
||||
.options(selectinload(_CompOwner.items))
|
||||
)
|
||||
loaded = result.scalar_one()
|
||||
assert len(loaded.items) == 1
|
||||
assert (loaded.items[0].group_id, loaded.items[0].item_code) == (
|
||||
"g1",
|
||||
"c2",
|
||||
)
|
||||
finally:
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(_LocalBase.metadata.drop_all)
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
class TestM2MSet:
|
||||
"""Tests for m2m_set helper."""
|
||||
|
||||
async def _load_tags(self, session: AsyncSession, post: Post) -> list[Tag]:
|
||||
result = await session.execute(
|
||||
select(Post).where(Post.id == post.id).options(selectinload(Post.tags))
|
||||
)
|
||||
return result.scalar_one().tags
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_replaces_existing_set(self, db_session: AsyncSession):
|
||||
"""Replaces the full association set atomically."""
|
||||
user = User(username="set_author1", email="set1@test.com")
|
||||
db_session.add(user)
|
||||
await db_session.flush()
|
||||
|
||||
post = Post(title="Post Set A", author_id=user.id)
|
||||
tag1 = Tag(name="tag_set_a")
|
||||
tag2 = Tag(name="tag_set_b")
|
||||
tag3 = Tag(name="tag_set_c")
|
||||
db_session.add_all([post, tag1, tag2, tag3])
|
||||
await db_session.flush()
|
||||
|
||||
async with get_transaction(db_session):
|
||||
await m2m_add(db_session, post, Post.tags, tag1, tag2)
|
||||
|
||||
async with get_transaction(db_session):
|
||||
await m2m_set(db_session, post, Post.tags, tag3)
|
||||
|
||||
remaining = await self._load_tags(db_session, post)
|
||||
assert len(remaining) == 1
|
||||
assert remaining[0].id == tag3.id
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_clears_all_when_no_related(self, db_session: AsyncSession):
|
||||
"""Passing no related instances clears all associations."""
|
||||
user = User(username="set_author2", email="set2@test.com")
|
||||
db_session.add(user)
|
||||
await db_session.flush()
|
||||
|
||||
post = Post(title="Post Set B", author_id=user.id)
|
||||
tag = Tag(name="tag_set_d")
|
||||
db_session.add_all([post, tag])
|
||||
await db_session.flush()
|
||||
|
||||
async with get_transaction(db_session):
|
||||
await m2m_add(db_session, post, Post.tags, tag)
|
||||
|
||||
async with get_transaction(db_session):
|
||||
await m2m_set(db_session, post, Post.tags)
|
||||
|
||||
remaining = await self._load_tags(db_session, post)
|
||||
assert remaining == []
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_set_on_empty_then_populate(self, db_session: AsyncSession):
|
||||
"""m2m_set works on a post with no existing associations."""
|
||||
user = User(username="set_author3", email="set3@test.com")
|
||||
db_session.add(user)
|
||||
await db_session.flush()
|
||||
|
||||
post = Post(title="Post Set C", author_id=user.id)
|
||||
tag1 = Tag(name="tag_set_e")
|
||||
tag2 = Tag(name="tag_set_f")
|
||||
db_session.add_all([post, tag1, tag2])
|
||||
await db_session.flush()
|
||||
|
||||
async with get_transaction(db_session):
|
||||
await m2m_set(db_session, post, Post.tags, tag1, tag2)
|
||||
|
||||
remaining = await self._load_tags(db_session, post)
|
||||
assert {t.id for t in remaining} == {tag1.id, tag2.id}
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_non_m2m_raises_type_error(self, db_session: AsyncSession):
|
||||
"""Passing a non-M2M relationship attribute raises TypeError."""
|
||||
user = User(username="set_author4", email="set4@test.com")
|
||||
db_session.add(user)
|
||||
await db_session.flush()
|
||||
|
||||
role = Role(name="set_type_err_role")
|
||||
db_session.add(role)
|
||||
await db_session.flush()
|
||||
|
||||
with pytest.raises(TypeError, match="Many-to-Many"):
|
||||
await m2m_set(db_session, user, User.role, role)
|
||||
|
||||
@@ -10,6 +10,7 @@ from fastapi_toolsets.fixtures import (
|
||||
Context,
|
||||
FixtureRegistry,
|
||||
LoadStrategy,
|
||||
get_field_by_attr,
|
||||
get_obj_by_attr,
|
||||
load_fixtures,
|
||||
load_fixtures_by_context,
|
||||
@@ -951,6 +952,41 @@ class TestGetObjByAttr:
|
||||
get_obj_by_attr(self.roles, "id", "not-a-uuid")
|
||||
|
||||
|
||||
class TestGetFieldByAttr:
|
||||
"""Tests for get_field_by_attr helper function."""
|
||||
|
||||
def setup_method(self):
|
||||
self.registry = FixtureRegistry()
|
||||
self.role_id_1 = uuid.uuid4()
|
||||
self.role_id_2 = uuid.uuid4()
|
||||
role_id_1 = self.role_id_1
|
||||
role_id_2 = self.role_id_2
|
||||
|
||||
@self.registry.register
|
||||
def roles() -> list[Role]:
|
||||
return [
|
||||
Role(id=role_id_1, name="admin"),
|
||||
Role(id=role_id_2, name="user"),
|
||||
]
|
||||
|
||||
self.roles = roles
|
||||
|
||||
def test_returns_id_by_default(self):
|
||||
"""Returns the id field when no field is specified."""
|
||||
result = get_field_by_attr(self.roles, "name", "admin")
|
||||
assert result == self.role_id_1
|
||||
|
||||
def test_returns_specified_field(self):
|
||||
"""Returns the requested field instead of id."""
|
||||
result = get_field_by_attr(self.roles, "id", self.role_id_2, field="name")
|
||||
assert result == "user"
|
||||
|
||||
def test_no_match_raises_stop_iteration(self):
|
||||
"""Propagates StopIteration from get_obj_by_attr when no match found."""
|
||||
with pytest.raises(StopIteration, match="No object with name=missing"):
|
||||
get_field_by_attr(self.roles, "name", "missing")
|
||||
|
||||
|
||||
class TestGetPrimaryKey:
|
||||
"""Unit tests for the _get_primary_key helper (composite PK paths)."""
|
||||
|
||||
|
||||
@@ -1,17 +1,18 @@
|
||||
"""Tests for fastapi_toolsets.pytest module."""
|
||||
|
||||
import uuid
|
||||
from typing import cast
|
||||
|
||||
import pytest
|
||||
from fastapi import Depends, FastAPI
|
||||
from httpx import AsyncClient
|
||||
from sqlalchemy import select, text
|
||||
from sqlalchemy import ForeignKey, String, select, text
|
||||
from sqlalchemy.engine import make_url
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
from sqlalchemy.orm import selectinload
|
||||
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
|
||||
|
||||
from fastapi_toolsets.db import get_transaction
|
||||
from fastapi_toolsets.fixtures import Context, FixtureRegistry
|
||||
from fastapi_toolsets.fixtures import Context, FixtureRegistry, LoadStrategy
|
||||
from fastapi_toolsets.pytest import (
|
||||
create_async_client,
|
||||
create_db_session,
|
||||
@@ -19,9 +20,23 @@ from fastapi_toolsets.pytest import (
|
||||
register_fixtures,
|
||||
worker_database_url,
|
||||
)
|
||||
from fastapi_toolsets.pytest.plugin import (
|
||||
_get_primary_key,
|
||||
_relationship_load_options,
|
||||
_reload_with_relationships,
|
||||
)
|
||||
from fastapi_toolsets.pytest.utils import _get_xdist_worker
|
||||
|
||||
from .conftest import DATABASE_URL, Base, Role, RoleCrud, User, UserCrud
|
||||
from .conftest import (
|
||||
DATABASE_URL,
|
||||
Base,
|
||||
IntRole,
|
||||
Permission,
|
||||
Role,
|
||||
RoleCrud,
|
||||
User,
|
||||
UserCrud,
|
||||
)
|
||||
|
||||
test_registry = FixtureRegistry()
|
||||
|
||||
@@ -136,14 +151,8 @@ class TestGeneratedFixtures:
|
||||
async def test_fixture_relationships_work(
|
||||
self, db_session: AsyncSession, fixture_users: list[User]
|
||||
):
|
||||
"""Loaded fixtures have working relationships."""
|
||||
# Load user with role relationship
|
||||
user = await UserCrud.get(
|
||||
db_session,
|
||||
[User.id == USER_ADMIN_ID],
|
||||
load_options=[selectinload(User.role)],
|
||||
)
|
||||
|
||||
"""Loaded fixtures have working relationships directly accessible."""
|
||||
user = next(u for u in fixture_users if u.id == USER_ADMIN_ID)
|
||||
assert user.role is not None
|
||||
assert user.role.name == "plugin_admin"
|
||||
|
||||
@@ -177,6 +186,15 @@ class TestGeneratedFixtures:
|
||||
assert users[0].username == "plugin_admin"
|
||||
assert users[1].username == "plugin_user"
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_fixture_auto_loads_relationships(
|
||||
self, db_session: AsyncSession, fixture_users: list[User]
|
||||
):
|
||||
"""Fixtures automatically eager-load all direct relationships."""
|
||||
user = next(u for u in fixture_users if u.username == "plugin_admin")
|
||||
assert user.role is not None
|
||||
assert user.role.name == "plugin_admin"
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_multiple_fixtures_in_same_test(
|
||||
self,
|
||||
@@ -516,3 +534,192 @@ class TestCreateWorkerDatabase:
|
||||
)
|
||||
assert result.scalar() is None
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
class _LocalBase(DeclarativeBase):
|
||||
pass
|
||||
|
||||
|
||||
class _Group(_LocalBase):
|
||||
__tablename__ = "_test_groups"
|
||||
id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
|
||||
name: Mapped[str] = mapped_column(String(50))
|
||||
|
||||
|
||||
class _CompositeItem(_LocalBase):
|
||||
"""Model with composite PK and a relationship — exercises the fallback path."""
|
||||
|
||||
__tablename__ = "_test_composite_items"
|
||||
group_id: Mapped[uuid.UUID] = mapped_column(
|
||||
ForeignKey("_test_groups.id"), primary_key=True
|
||||
)
|
||||
item_code: Mapped[str] = mapped_column(String(50), primary_key=True)
|
||||
group: Mapped["_Group"] = relationship()
|
||||
|
||||
|
||||
class TestGetPrimaryKey:
|
||||
"""Unit tests for _get_primary_key — no DB needed."""
|
||||
|
||||
def test_single_pk_returns_value(self):
|
||||
rid = uuid.UUID("00000000-0000-0000-0000-000000000001")
|
||||
role = Role(id=rid, name="x")
|
||||
assert _get_primary_key(role) == rid
|
||||
|
||||
def test_composite_pk_all_set_returns_tuple(self):
|
||||
perm = Permission(subject="posts", action="read")
|
||||
assert _get_primary_key(perm) == ("posts", "read")
|
||||
|
||||
def test_composite_pk_partial_none_returns_none(self):
|
||||
perm = Permission(subject=None, action="read")
|
||||
assert _get_primary_key(perm) is None
|
||||
|
||||
def test_composite_pk_all_none_returns_none(self):
|
||||
perm = Permission(subject=None, action=None)
|
||||
assert _get_primary_key(perm) is None
|
||||
|
||||
|
||||
class TestRelationshipLoadOptions:
|
||||
"""Unit tests for _relationship_load_options — no DB needed."""
|
||||
|
||||
def test_empty_for_model_with_no_relationships(self):
|
||||
assert _relationship_load_options(IntRole) == []
|
||||
|
||||
def test_returns_options_for_model_with_relationships(self):
|
||||
opts = _relationship_load_options(User)
|
||||
assert len(opts) >= 1
|
||||
|
||||
|
||||
class TestFixtureStrategies:
|
||||
"""Integration tests covering INSERT, SKIP_EXISTING, empty fixture, no-rels model."""
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_empty_fixture_returns_empty_list(self, db_session: AsyncSession):
|
||||
"""Fixture function returning [] produces an empty list."""
|
||||
registry = FixtureRegistry()
|
||||
|
||||
@registry.register()
|
||||
def empty() -> list[Role]:
|
||||
return []
|
||||
|
||||
local_ns: dict = {}
|
||||
register_fixtures(registry, local_ns, session_fixture="db_session")
|
||||
inner = local_ns["fixture_empty"].__wrapped__ # type: ignore[attr-defined]
|
||||
result = await inner(db_session=db_session)
|
||||
assert result == []
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_insert_strategy_no_relationships(self, db_session: AsyncSession):
|
||||
"""INSERT strategy adds instances; model with no rels skips reload (line 135)."""
|
||||
registry = FixtureRegistry()
|
||||
|
||||
@registry.register()
|
||||
def int_roles() -> list[IntRole]:
|
||||
return [IntRole(name="insert_role")]
|
||||
|
||||
local_ns: dict = {}
|
||||
register_fixtures(
|
||||
registry,
|
||||
local_ns,
|
||||
session_fixture="db_session",
|
||||
strategy=LoadStrategy.INSERT,
|
||||
)
|
||||
inner = local_ns["fixture_int_roles"].__wrapped__ # type: ignore[attr-defined]
|
||||
result = await inner(db_session=db_session)
|
||||
assert len(result) == 1
|
||||
assert result[0].name == "insert_role"
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_skip_existing_inserts_new_record(self, db_session: AsyncSession):
|
||||
"""SKIP_EXISTING inserts when the record does not yet exist."""
|
||||
registry = FixtureRegistry()
|
||||
role_id = uuid.uuid4()
|
||||
|
||||
@registry.register()
|
||||
def new_roles() -> list[Role]:
|
||||
return [Role(id=role_id, name="skip_new")]
|
||||
|
||||
local_ns: dict = {}
|
||||
register_fixtures(
|
||||
registry,
|
||||
local_ns,
|
||||
session_fixture="db_session",
|
||||
strategy=LoadStrategy.SKIP_EXISTING,
|
||||
)
|
||||
inner = local_ns["fixture_new_roles"].__wrapped__ # type: ignore[attr-defined]
|
||||
result = await inner(db_session=db_session)
|
||||
assert len(result) == 1
|
||||
assert result[0].id == role_id
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_skip_existing_returns_existing_record(
|
||||
self, db_session: AsyncSession
|
||||
):
|
||||
"""SKIP_EXISTING returns the existing DB record when PK already present."""
|
||||
role_id = uuid.uuid4()
|
||||
existing = Role(id=role_id, name="already_there")
|
||||
db_session.add(existing)
|
||||
await db_session.flush()
|
||||
|
||||
registry = FixtureRegistry()
|
||||
|
||||
@registry.register()
|
||||
def dup_roles() -> list[Role]:
|
||||
return [Role(id=role_id, name="should_not_overwrite")]
|
||||
|
||||
local_ns: dict = {}
|
||||
register_fixtures(
|
||||
registry,
|
||||
local_ns,
|
||||
session_fixture="db_session",
|
||||
strategy=LoadStrategy.SKIP_EXISTING,
|
||||
)
|
||||
inner = local_ns["fixture_dup_roles"].__wrapped__ # type: ignore[attr-defined]
|
||||
result = await inner(db_session=db_session)
|
||||
assert len(result) == 1
|
||||
assert result[0].name == "already_there"
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_skip_existing_null_pk_inserts(self, db_session: AsyncSession):
|
||||
"""SKIP_EXISTING with null PK (auto-increment) falls through to session.add()."""
|
||||
registry = FixtureRegistry()
|
||||
|
||||
@registry.register()
|
||||
def auto_roles() -> list[IntRole]:
|
||||
return [IntRole(name="auto_int")]
|
||||
|
||||
local_ns: dict = {}
|
||||
register_fixtures(
|
||||
registry,
|
||||
local_ns,
|
||||
session_fixture="db_session",
|
||||
strategy=LoadStrategy.SKIP_EXISTING,
|
||||
)
|
||||
inner = local_ns["fixture_auto_roles"].__wrapped__ # type: ignore[attr-defined]
|
||||
result = await inner(db_session=db_session)
|
||||
assert len(result) == 1
|
||||
assert result[0].name == "auto_int"
|
||||
|
||||
|
||||
class TestReloadWithRelationshipsCompositePK:
|
||||
"""Integration test for _reload_with_relationships composite-PK fallback."""
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_composite_pk_fallback_loads_relationships(self):
|
||||
"""Models with composite PKs are reloaded per-instance via session.get()."""
|
||||
async with create_db_session(DATABASE_URL, _LocalBase) as session:
|
||||
group = _Group(id=uuid.uuid4(), name="g1")
|
||||
session.add(group)
|
||||
await session.flush()
|
||||
|
||||
item = _CompositeItem(group_id=group.id, item_code="A")
|
||||
session.add(item)
|
||||
await session.flush()
|
||||
|
||||
load_opts = _relationship_load_options(_CompositeItem)
|
||||
assert load_opts # _CompositeItem has 'group' relationship
|
||||
|
||||
reloaded = await _reload_with_relationships(session, [item], load_opts)
|
||||
assert len(reloaded) == 1
|
||||
reloaded_item = cast(_CompositeItem, reloaded[0])
|
||||
assert reloaded_item.group is not None
|
||||
assert reloaded_item.group.name == "g1"
|
||||
|
||||
38
uv.lock
generated
38
uv.lock
generated
@@ -251,7 +251,7 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "fastapi-toolsets"
|
||||
version = "3.0.3"
|
||||
version = "3.1.0"
|
||||
source = { editable = "." }
|
||||
dependencies = [
|
||||
{ name = "asyncpg" },
|
||||
@@ -740,26 +740,26 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "prek"
|
||||
version = "0.3.8"
|
||||
version = "0.3.9"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/62/ee/03e8180e3fda9de25b6480bd15cc2bde40d573868d50648b0e527b35562f/prek-0.3.8.tar.gz", hash = "sha256:434a214256516f187a3ab15f869d950243be66b94ad47987ee4281b69643a2d9", size = 400224, upload-time = "2026-03-23T08:23:35.981Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/15/ff/5b7a2a9c4fa3dd2ffc8b13a9ec22aa550deda5b39ab273f8e02863b12642/prek-0.3.9.tar.gz", hash = "sha256:f82b92d81f42f1f90a47f5fbbf492373e25ef1f790080215b2722dd6da66510e", size = 423801, upload-time = "2026-04-13T12:30:38.191Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/00/84/40d2ddf362d12c4cd4a25a8c89a862edf87cdfbf1422aa41aac8e315d409/prek-0.3.8-py3-none-linux_armv6l.whl", hash = "sha256:6fb646ada60658fa6dd7771b2e0fb097f005151be222f869dada3eb26d79ed33", size = 5226646, upload-time = "2026-03-23T08:23:18.306Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/e1/52/7308a033fa43b7e8e188797bd2b3b017c0f0adda70fa7af575b1f43ea888/prek-0.3.8-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:f3d7fdadb15efc19c09953c7a33cf2061a70f367d1e1957358d3ad5cc49d0616", size = 5620104, upload-time = "2026-03-23T08:23:40.053Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/ff/b1/f106ac000a91511a9cd80169868daf2f5b693480ef5232cec5517a38a512/prek-0.3.8-py3-none-macosx_11_0_arm64.whl", hash = "sha256:72728c3295e79ca443f8c1ec037d2a5b914ec73a358f69cf1bc1964511876bf8", size = 5199867, upload-time = "2026-03-23T08:23:38.066Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/b3/e9/970713f4b019f69de9844e1bab37b8ddb67558e410916f4eb5869a696165/prek-0.3.8-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.musllinux_1_1_aarch64.whl", hash = "sha256:48efc28f2f53b5b8087efca9daaed91572d62df97d5f24a1c7a087fecb5017de", size = 5441801, upload-time = "2026-03-23T08:23:32.617Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/12/a4/7ef44032b181753e19452ec3b09abb3a32607cf6b0a0508f0604becaaf2b/prek-0.3.8-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:f6ca9d63bacbc448a5c18e955c78d3ac5176c3a17c3baacdd949b1a623e08a36", size = 5155107, upload-time = "2026-03-23T08:23:31.021Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/bd/77/4d9c8985dbba84149760785dfe07093ea1e29d710257dfb7c89615e2234c/prek-0.3.8-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1000f7029696b4fe712fb1fefd4c55b9c4de72b65509c8e50296370a06f9dc3f", size = 5566541, upload-time = "2026-03-23T08:23:45.694Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/1a/1a/81e6769ac1f7f8346d09ce2ab0b47cf06466acd9ff72e87e5d1f0d98cd32/prek-0.3.8-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:6ff0bed0e2c1286522987d982168a86cbbd0d069d840506a46c9fda983515517", size = 6552991, upload-time = "2026-03-23T08:23:21.958Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/6f/fa/ce2df0dd2dc75a9437a52463239d0782998943d7b04e191fb89b83016c34/prek-0.3.8-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4fb087ac0ffda3ac65bbbae9a38326a7fd27ee007bb4a94323ce1eb539d8bbec", size = 5832972, upload-time = "2026-03-23T08:23:20.258Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/18/6b/9d4269df9073216d296244595a21c253b6475dfc9076c0bd2906be7a436c/prek-0.3.8-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:2e1e5e206ff7b31bd079cce525daddc96cd6bc544d20dc128921ad92f7a4c85d", size = 5448371, upload-time = "2026-03-23T08:23:41.835Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/60/1d/1e4d8a78abefa5b9d086e5a9f1638a74b5e540eec8a648d9946707701f29/prek-0.3.8-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:dcea3fe23832a4481bccb7c45f55650cb233be7c805602e788bb7dba60f2d861", size = 5270546, upload-time = "2026-03-23T08:23:24.231Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/77/07/34f36551a6319ae36e272bea63a42f59d41d2d47ab0d5fb00eb7b4e88e87/prek-0.3.8-py3-none-musllinux_1_1_armv7l.whl", hash = "sha256:4d25e647e9682f6818ab5c31e7a4b842993c14782a6ffcd128d22b784e0d677f", size = 5124032, upload-time = "2026-03-23T08:23:26.368Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/e3/01/6d544009bb655e709993411796af77339f439526db4f3b3509c583ad8eb9/prek-0.3.8-py3-none-musllinux_1_1_i686.whl", hash = "sha256:de528b82935e33074815acff3c7c86026754d1212136295bc88fe9c43b4231d5", size = 5432245, upload-time = "2026-03-23T08:23:47.877Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/54/96/1237ee269e9bfa283ffadbcba1f401f48a47aed2b2563eb1002740d6079d/prek-0.3.8-py3-none-musllinux_1_1_x86_64.whl", hash = "sha256:6d660f1c25a126e6d9f682fe61449441226514f412a4469f5d71f8f8cad56db2", size = 5950550, upload-time = "2026-03-23T08:23:43.8Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/ca/6b/a574411459049bc691047c9912f375deda10c44a707b6ce98df2b658f0b3/prek-0.3.8-py3-none-win32.whl", hash = "sha256:b0c291c577615d9f8450421dff0b32bfd77a6b0d223ee4115a1f820cb636fdf1", size = 4949501, upload-time = "2026-03-23T08:23:16.338Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/0c/b4/46b59fe49f635acd9f6530778ce577f9d8b49452835726a5311ffc902c67/prek-0.3.8-py3-none-win_amd64.whl", hash = "sha256:bc147fdbdd4ec33fc7a987b893ecb69b1413ac100d95c9889a70f3fd58c73d06", size = 5346551, upload-time = "2026-03-23T08:23:34.501Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/53/05/9cca1708bb8c65264124eb4b04251e0f65ce5bfc707080bb6b492d5a0df7/prek-0.3.8-py3-none-win_arm64.whl", hash = "sha256:a2614647aeafa817a5802ccb9561e92eedc20dcf840639a1b00826e2c2442515", size = 5190872, upload-time = "2026-03-23T08:23:29.463Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/3c/08/c11a6b7834b461223763b6b1552f32c9199393685d52d555de621e900ee7/prek-0.3.9-py3-none-linux_armv6l.whl", hash = "sha256:3ed793d51bfaa27bddb64d525d7acb77a7c8644f549412d82252e3eb0b88aad8", size = 5337784, upload-time = "2026-04-13T12:30:46.044Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/15/d9/974b02832a645c6411069c713e3191ce807f9962006da108e4727efd2fa1/prek-0.3.9-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:399c58400c0bd0b82a93a3c09dc1bfd88d8d0cfb242d414d2ed247187b06ead1", size = 5713864, upload-time = "2026-04-13T12:30:27.007Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/40/e1/4ed14bef15eb30039a75177b0807ac007095a5a110284706ccf900a8d512/prek-0.3.9-py3-none-macosx_11_0_arm64.whl", hash = "sha256:e2ea1ffb124e92f081b8e2ca5b5a623a733efb3be0c5b1f4b7ffe2ee17d1f20c", size = 5290437, upload-time = "2026-04-13T12:30:30.658Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/67/80/d5c3015e9da161dede566bfeef41f098f92470613157daa4f7377ab08d58/prek-0.3.9-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.musllinux_1_1_aarch64.whl", hash = "sha256:aaf639f95b7301639298311d8d44aad0d0b4864e9736083ad3c71ce9765d37ab", size = 5536208, upload-time = "2026-04-13T12:30:47.964Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/c8/54/8cdc5eb1018437d7828740defd322e7a96459c02fc8961160c4120325313/prek-0.3.9-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:ff104863b187fa443ea8451ca55d51e2c6e94f99f00d88784b5c3c4c623f1ebe", size = 5251785, upload-time = "2026-04-13T12:30:39.78Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/bd/e2/a5fc35a0fd3167224a000ca1b6235ecbdea0ac77e24af5979a75b0e6b5a4/prek-0.3.9-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:039ecaf87c63a3e67cca645ebd5bc5eb6aafa6c9d929e9a27b2921e7849d7ef9", size = 5668548, upload-time = "2026-04-13T12:30:24.914Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/09/e8/a189ee79f401c259f66f8af587f899d4d5bfb04e0ca371bfd01e49871007/prek-0.3.9-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3bde2a3d045705095983c7f78ba04f72a7565fe1c2b4e85f5628502a254754ff", size = 6660927, upload-time = "2026-04-13T12:30:44.495Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/a4/5a/54117316e98ff62a14911ad1488a3a0945530242a2ce3e92f7a40b6ccc02/prek-0.3.9-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:28a0960a21543563e2c8e19aaad176cc8423a87aac3c914d0f313030d7a9244a", size = 5932244, upload-time = "2026-04-13T12:30:49.532Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/a7/f9/e88d4361f59be7adeeb3a8a3819d69d286d86fe6f7606840af6734362675/prek-0.3.9-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:0dfb5d5171d7523271909246ee306b4dc3d5b63752e7dd7c7e8a8908fc9490d1", size = 5542139, upload-time = "2026-04-13T12:30:41.266Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/11/1f/204837115087bb8d063bda754a7fe975428c5d5b6548c30dd749f8ab85d4/prek-0.3.9-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:82b791bd36c1430c84d3ae7220a85152babc7eaf00f70adcb961bd594e756ba3", size = 5392519, upload-time = "2026-04-13T12:30:32.603Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/bd/00/de57b5795e670b6d38e7eda6d9ac6fd6d757ca22f725e5054b042104cd53/prek-0.3.9-py3-none-musllinux_1_1_armv7l.whl", hash = "sha256:6eac6d2f736b041118f053a1487abed468a70dd85a8688eaf87bb42d3dcecf20", size = 5222780, upload-time = "2026-04-13T12:30:36.576Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/f5/14/0bc055c305d92980b151f2ec00c14d28fe94c6d51180ca07fded28771cbf/prek-0.3.9-py3-none-musllinux_1_1_i686.whl", hash = "sha256:5517e46e761367a3759b3168eabc120840ffbca9dfbc53187167298a98f87dc4", size = 5524310, upload-time = "2026-04-13T12:30:34.469Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/b9/d1/eebc2b69be0de36cd84adbe0a0710f4deb468a90e30525be027d6db02d54/prek-0.3.9-py3-none-musllinux_1_1_x86_64.whl", hash = "sha256:92024778cf78683ca32687bb249ab6a7d5c33887b5ee1d1a9f6d0c14228f4cf3", size = 6043751, upload-time = "2026-04-13T12:30:29.101Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/46/cb/be98c04e702cbc0b0328cd745ff4634ace69ad5a84461bde36f88a7be873/prek-0.3.9-py3-none-win32.whl", hash = "sha256:7f89c55e5f480f5d073769e319924ad69d4bf9f98c5cb46a83082e26e634c958", size = 5045940, upload-time = "2026-04-13T12:30:42.882Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/a6/b6/b51771d69f6282e34edeb73f23d956da34f2cabbb5ba16ba175cc0a056f9/prek-0.3.9-py3-none-win_amd64.whl", hash = "sha256:7722f3372eaa83b147e70a43cb7b9fe2128c13d0c78d8a1cdbf2a8ec2ee071eb", size = 5435204, upload-time = "2026-04-13T12:30:51.482Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/30/8a/f8a87c15b095460eccd67c8d89a086b7a37aac8d363f89544b8ce6ec653d/prek-0.3.9-py3-none-win_arm64.whl", hash = "sha256:0bced6278d6cc8a4b46048979e36bc9da034611dc8facd77ab123177b833a929", size = 5279552, upload-time = "2026-04-13T12:30:53.011Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
Reference in New Issue
Block a user