mirror of
https://github.com/d3vyce/fastapi-toolsets.git
synced 2026-04-16 14:46:24 +02:00
Compare commits
3 Commits
5a1493266e
...
v2.2.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
dde5183e68
|
|||
|
|
e4250a9910 | ||
|
|
4800941934 |
@@ -22,6 +22,8 @@ UserCrud = CrudFactory(model=User)
|
|||||||
|
|
||||||
## Basic operations
|
## Basic operations
|
||||||
|
|
||||||
|
!!! info "`get_or_none` added in `v2.2`"
|
||||||
|
|
||||||
```python
|
```python
|
||||||
# Create
|
# Create
|
||||||
user = await UserCrud.create(session=session, obj=UserCreateSchema(username="alice"))
|
user = await UserCrud.create(session=session, obj=UserCreateSchema(username="alice"))
|
||||||
@@ -29,6 +31,9 @@ user = await UserCrud.create(session=session, obj=UserCreateSchema(username="ali
|
|||||||
# Get one (raises NotFoundError if not found)
|
# Get one (raises NotFoundError if not found)
|
||||||
user = await UserCrud.get(session=session, filters=[User.id == user_id])
|
user = await UserCrud.get(session=session, filters=[User.id == user_id])
|
||||||
|
|
||||||
|
# Get one or None (never raises)
|
||||||
|
user = await UserCrud.get_or_none(session=session, filters=[User.id == user_id])
|
||||||
|
|
||||||
# Get first or None
|
# Get first or None
|
||||||
user = await UserCrud.first(session=session, filters=[User.email == email])
|
user = await UserCrud.first(session=session, filters=[User.email == email])
|
||||||
|
|
||||||
@@ -46,6 +51,36 @@ count = await UserCrud.count(session=session, filters=[User.is_active == True])
|
|||||||
exists = await UserCrud.exists(session=session, filters=[User.email == email])
|
exists = await UserCrud.exists(session=session, filters=[User.email == email])
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Fetching a single record
|
||||||
|
|
||||||
|
Three methods fetch a single record — choose based on how you want to handle the "not found" case and whether you need strict uniqueness:
|
||||||
|
|
||||||
|
| Method | Not found | Multiple results |
|
||||||
|
|---|---|---|
|
||||||
|
| `get` | raises `NotFoundError` | raises `MultipleResultsFound` |
|
||||||
|
| `get_or_none` | returns `None` | raises `MultipleResultsFound` |
|
||||||
|
| `first` | returns `None` | returns the first match silently |
|
||||||
|
|
||||||
|
Use `get` when the record must exist (e.g. a detail endpoint that should return 404):
|
||||||
|
|
||||||
|
```python
|
||||||
|
user = await UserCrud.get(session=session, filters=[User.id == user_id])
|
||||||
|
```
|
||||||
|
|
||||||
|
Use `get_or_none` when the record may not exist but you still want strict uniqueness enforcement:
|
||||||
|
|
||||||
|
```python
|
||||||
|
user = await UserCrud.get_or_none(session=session, filters=[User.email == email])
|
||||||
|
if user is None:
|
||||||
|
... # handle missing case without catching an exception
|
||||||
|
```
|
||||||
|
|
||||||
|
Use `first` when you only care about any one match and don't need uniqueness:
|
||||||
|
|
||||||
|
```python
|
||||||
|
user = await UserCrud.first(session=session, filters=[User.is_active == True])
|
||||||
|
```
|
||||||
|
|
||||||
## Pagination
|
## Pagination
|
||||||
|
|
||||||
!!! info "Added in `v1.1` (only offset_pagination via `paginate` if `<v1.1`)"
|
!!! info "Added in `v1.1` (only offset_pagination via `paginate` if `<v1.1`)"
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[project]
|
[project]
|
||||||
name = "fastapi-toolsets"
|
name = "fastapi-toolsets"
|
||||||
version = "2.1.0"
|
version = "2.2.0"
|
||||||
description = "Production-ready utilities for FastAPI applications"
|
description = "Production-ready utilities for FastAPI applications"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
|
|||||||
@@ -21,4 +21,4 @@ Example usage:
|
|||||||
return Response(data={"user": user.username}, message="Success")
|
return Response(data={"user": user.username}, message="Success")
|
||||||
"""
|
"""
|
||||||
|
|
||||||
__version__ = "2.1.0"
|
__version__ = "2.2.0"
|
||||||
|
|||||||
@@ -14,7 +14,6 @@ from typing import Any, ClassVar, Generic, Literal, Self, cast, overload
|
|||||||
from fastapi import Query
|
from fastapi import Query
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from sqlalchemy import Date, DateTime, Float, Integer, Numeric, Uuid, and_, func, select
|
from sqlalchemy import Date, DateTime, Float, Integer, Numeric, Uuid, and_, func, select
|
||||||
from sqlalchemy import delete as sql_delete
|
|
||||||
from sqlalchemy.dialects.postgresql import insert
|
from sqlalchemy.dialects.postgresql import insert
|
||||||
from sqlalchemy.exc import NoResultFound
|
from sqlalchemy.exc import NoResultFound
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
@@ -410,6 +409,82 @@ class AsyncCrud(Generic[ModelType]):
|
|||||||
NotFoundError: If no record found
|
NotFoundError: If no record found
|
||||||
MultipleResultsFound: If more than one record found
|
MultipleResultsFound: If more than one record found
|
||||||
"""
|
"""
|
||||||
|
result = await cls.get_or_none(
|
||||||
|
session,
|
||||||
|
filters,
|
||||||
|
joins=joins,
|
||||||
|
outer_join=outer_join,
|
||||||
|
with_for_update=with_for_update,
|
||||||
|
load_options=load_options,
|
||||||
|
schema=schema,
|
||||||
|
)
|
||||||
|
if result is None:
|
||||||
|
raise NotFoundError()
|
||||||
|
return result
|
||||||
|
|
||||||
|
@overload
|
||||||
|
@classmethod
|
||||||
|
async def get_or_none( # pragma: no cover
|
||||||
|
cls: type[Self],
|
||||||
|
session: AsyncSession,
|
||||||
|
filters: list[Any],
|
||||||
|
*,
|
||||||
|
joins: JoinType | None = None,
|
||||||
|
outer_join: bool = False,
|
||||||
|
with_for_update: bool = False,
|
||||||
|
load_options: list[ExecutableOption] | None = None,
|
||||||
|
schema: type[SchemaType],
|
||||||
|
) -> Response[SchemaType] | None: ...
|
||||||
|
|
||||||
|
@overload
|
||||||
|
@classmethod
|
||||||
|
async def get_or_none( # pragma: no cover
|
||||||
|
cls: type[Self],
|
||||||
|
session: AsyncSession,
|
||||||
|
filters: list[Any],
|
||||||
|
*,
|
||||||
|
joins: JoinType | None = None,
|
||||||
|
outer_join: bool = False,
|
||||||
|
with_for_update: bool = False,
|
||||||
|
load_options: list[ExecutableOption] | None = None,
|
||||||
|
schema: None = ...,
|
||||||
|
) -> ModelType | None: ...
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def get_or_none(
|
||||||
|
cls: type[Self],
|
||||||
|
session: AsyncSession,
|
||||||
|
filters: list[Any],
|
||||||
|
*,
|
||||||
|
joins: JoinType | None = None,
|
||||||
|
outer_join: bool = False,
|
||||||
|
with_for_update: bool = False,
|
||||||
|
load_options: list[ExecutableOption] | None = None,
|
||||||
|
schema: type[BaseModel] | None = None,
|
||||||
|
) -> ModelType | Response[Any] | None:
|
||||||
|
"""Get exactly one record, or ``None`` if not found.
|
||||||
|
|
||||||
|
Like :meth:`get` but returns ``None`` instead of raising
|
||||||
|
:class:`~fastapi_toolsets.exceptions.NotFoundError` when no record
|
||||||
|
matches the filters.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
session: DB async session
|
||||||
|
filters: List of SQLAlchemy filter conditions
|
||||||
|
joins: List of (model, condition) tuples for joining related tables
|
||||||
|
outer_join: Use LEFT OUTER JOIN instead of INNER JOIN
|
||||||
|
with_for_update: Lock the row for update
|
||||||
|
load_options: SQLAlchemy loader options (e.g., selectinload)
|
||||||
|
schema: Pydantic schema to serialize the result into. When provided,
|
||||||
|
the result is automatically wrapped in a ``Response[schema]``.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Model instance, ``Response[schema]`` when ``schema`` is given,
|
||||||
|
or ``None`` when no record matches.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
MultipleResultsFound: If more than one record found
|
||||||
|
"""
|
||||||
q = select(cls.model)
|
q = select(cls.model)
|
||||||
q = _apply_joins(q, joins, outer_join)
|
q = _apply_joins(q, joins, outer_join)
|
||||||
q = q.where(and_(*filters))
|
q = q.where(and_(*filters))
|
||||||
@@ -419,12 +494,40 @@ class AsyncCrud(Generic[ModelType]):
|
|||||||
q = q.with_for_update()
|
q = q.with_for_update()
|
||||||
result = await session.execute(q)
|
result = await session.execute(q)
|
||||||
item = result.unique().scalar_one_or_none()
|
item = result.unique().scalar_one_or_none()
|
||||||
if not item:
|
if item is None:
|
||||||
raise NotFoundError()
|
return None
|
||||||
result = cast(ModelType, item)
|
db_model = cast(ModelType, item)
|
||||||
if schema:
|
if schema:
|
||||||
return Response(data=schema.model_validate(result))
|
return Response(data=schema.model_validate(db_model))
|
||||||
return result
|
return db_model
|
||||||
|
|
||||||
|
@overload
|
||||||
|
@classmethod
|
||||||
|
async def first( # pragma: no cover
|
||||||
|
cls: type[Self],
|
||||||
|
session: AsyncSession,
|
||||||
|
filters: list[Any] | None = None,
|
||||||
|
*,
|
||||||
|
joins: JoinType | None = None,
|
||||||
|
outer_join: bool = False,
|
||||||
|
with_for_update: bool = False,
|
||||||
|
load_options: list[ExecutableOption] | None = None,
|
||||||
|
schema: type[SchemaType],
|
||||||
|
) -> Response[SchemaType] | None: ...
|
||||||
|
|
||||||
|
@overload
|
||||||
|
@classmethod
|
||||||
|
async def first( # pragma: no cover
|
||||||
|
cls: type[Self],
|
||||||
|
session: AsyncSession,
|
||||||
|
filters: list[Any] | None = None,
|
||||||
|
*,
|
||||||
|
joins: JoinType | None = None,
|
||||||
|
outer_join: bool = False,
|
||||||
|
with_for_update: bool = False,
|
||||||
|
load_options: list[ExecutableOption] | None = None,
|
||||||
|
schema: None = ...,
|
||||||
|
) -> ModelType | None: ...
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def first(
|
async def first(
|
||||||
@@ -434,8 +537,10 @@ class AsyncCrud(Generic[ModelType]):
|
|||||||
*,
|
*,
|
||||||
joins: JoinType | None = None,
|
joins: JoinType | None = None,
|
||||||
outer_join: bool = False,
|
outer_join: bool = False,
|
||||||
|
with_for_update: bool = False,
|
||||||
load_options: list[ExecutableOption] | None = None,
|
load_options: list[ExecutableOption] | None = None,
|
||||||
) -> ModelType | None:
|
schema: type[BaseModel] | None = None,
|
||||||
|
) -> ModelType | Response[Any] | None:
|
||||||
"""Get the first matching record, or None.
|
"""Get the first matching record, or None.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@@ -443,10 +548,14 @@ class AsyncCrud(Generic[ModelType]):
|
|||||||
filters: List of SQLAlchemy filter conditions
|
filters: List of SQLAlchemy filter conditions
|
||||||
joins: List of (model, condition) tuples for joining related tables
|
joins: List of (model, condition) tuples for joining related tables
|
||||||
outer_join: Use LEFT OUTER JOIN instead of INNER JOIN
|
outer_join: Use LEFT OUTER JOIN instead of INNER JOIN
|
||||||
load_options: SQLAlchemy loader options
|
with_for_update: Lock the row for update
|
||||||
|
load_options: SQLAlchemy loader options (e.g., selectinload)
|
||||||
|
schema: Pydantic schema to serialize the result into. When provided,
|
||||||
|
the result is automatically wrapped in a ``Response[schema]``.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Model instance or None
|
Model instance, ``Response[schema]`` when ``schema`` is given,
|
||||||
|
or ``None`` when no record matches.
|
||||||
"""
|
"""
|
||||||
q = select(cls.model)
|
q = select(cls.model)
|
||||||
q = _apply_joins(q, joins, outer_join)
|
q = _apply_joins(q, joins, outer_join)
|
||||||
@@ -454,8 +563,16 @@ class AsyncCrud(Generic[ModelType]):
|
|||||||
q = q.where(and_(*filters))
|
q = q.where(and_(*filters))
|
||||||
if resolved := cls._resolve_load_options(load_options):
|
if resolved := cls._resolve_load_options(load_options):
|
||||||
q = q.options(*resolved)
|
q = q.options(*resolved)
|
||||||
|
if with_for_update:
|
||||||
|
q = q.with_for_update()
|
||||||
result = await session.execute(q)
|
result = await session.execute(q)
|
||||||
return cast(ModelType | None, result.unique().scalars().first())
|
item = result.unique().scalars().first()
|
||||||
|
if item is None:
|
||||||
|
return None
|
||||||
|
db_model = cast(ModelType, item)
|
||||||
|
if schema:
|
||||||
|
return Response(data=schema.model_validate(db_model))
|
||||||
|
return db_model
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def get_multi(
|
async def get_multi(
|
||||||
@@ -674,8 +791,10 @@ class AsyncCrud(Generic[ModelType]):
|
|||||||
``None``, or ``Response[None]`` when ``return_response=True``.
|
``None``, or ``Response[None]`` when ``return_response=True``.
|
||||||
"""
|
"""
|
||||||
async with get_transaction(session):
|
async with get_transaction(session):
|
||||||
q = sql_delete(cls.model).where(and_(*filters))
|
result = await session.execute(select(cls.model).where(and_(*filters)))
|
||||||
await session.execute(q)
|
objects = result.scalars().all()
|
||||||
|
for obj in objects:
|
||||||
|
await session.delete(obj)
|
||||||
if return_response:
|
if return_response:
|
||||||
return Response(data=None)
|
return Response(data=None)
|
||||||
return None
|
return None
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ from .conftest import (
|
|||||||
RoleCursorCrud,
|
RoleCursorCrud,
|
||||||
RoleRead,
|
RoleRead,
|
||||||
RoleUpdate,
|
RoleUpdate,
|
||||||
|
Tag,
|
||||||
TagCreate,
|
TagCreate,
|
||||||
TagCrud,
|
TagCrud,
|
||||||
User,
|
User,
|
||||||
@@ -294,6 +295,100 @@ class TestCrudGet:
|
|||||||
assert user.username == "active"
|
assert user.username == "active"
|
||||||
|
|
||||||
|
|
||||||
|
class TestCrudGetOrNone:
|
||||||
|
"""Tests for CRUD get_or_none operations."""
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_returns_record_when_found(self, db_session: AsyncSession):
|
||||||
|
"""get_or_none returns the record when it exists."""
|
||||||
|
created = await RoleCrud.create(db_session, RoleCreate(name="admin"))
|
||||||
|
fetched = await RoleCrud.get_or_none(db_session, [Role.id == created.id])
|
||||||
|
|
||||||
|
assert fetched is not None
|
||||||
|
assert fetched.id == created.id
|
||||||
|
assert fetched.name == "admin"
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_returns_none_when_not_found(self, db_session: AsyncSession):
|
||||||
|
"""get_or_none returns None instead of raising NotFoundError."""
|
||||||
|
result = await RoleCrud.get_or_none(db_session, [Role.id == uuid.uuid4()])
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_with_schema_returns_response_when_found(
|
||||||
|
self, db_session: AsyncSession
|
||||||
|
):
|
||||||
|
"""get_or_none with schema returns Response[schema] when found."""
|
||||||
|
from fastapi_toolsets.schemas import Response
|
||||||
|
|
||||||
|
created = await RoleCrud.create(db_session, RoleCreate(name="editor"))
|
||||||
|
result = await RoleCrud.get_or_none(
|
||||||
|
db_session, [Role.id == created.id], schema=RoleRead
|
||||||
|
)
|
||||||
|
|
||||||
|
assert isinstance(result, Response)
|
||||||
|
assert isinstance(result.data, RoleRead)
|
||||||
|
assert result.data.name == "editor"
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_with_schema_returns_none_when_not_found(
|
||||||
|
self, db_session: AsyncSession
|
||||||
|
):
|
||||||
|
"""get_or_none with schema returns None (not Response) when not found."""
|
||||||
|
result = await RoleCrud.get_or_none(
|
||||||
|
db_session, [Role.id == uuid.uuid4()], schema=RoleRead
|
||||||
|
)
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_with_load_options(self, db_session: AsyncSession):
|
||||||
|
"""get_or_none respects load_options."""
|
||||||
|
from sqlalchemy.orm import selectinload
|
||||||
|
|
||||||
|
role = await RoleCrud.create(db_session, RoleCreate(name="member"))
|
||||||
|
user = await UserCrud.create(
|
||||||
|
db_session,
|
||||||
|
UserCreate(username="alice", email="alice@test.com", role_id=role.id),
|
||||||
|
)
|
||||||
|
|
||||||
|
fetched = await UserCrud.get_or_none(
|
||||||
|
db_session,
|
||||||
|
[User.id == user.id],
|
||||||
|
load_options=[selectinload(User.role)],
|
||||||
|
)
|
||||||
|
|
||||||
|
assert fetched is not None
|
||||||
|
assert fetched.role is not None
|
||||||
|
assert fetched.role.name == "member"
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_with_join(self, db_session: AsyncSession):
|
||||||
|
"""get_or_none respects joins."""
|
||||||
|
user = await UserCrud.create(
|
||||||
|
db_session, UserCreate(username="author", email="author@test.com")
|
||||||
|
)
|
||||||
|
await PostCrud.create(
|
||||||
|
db_session,
|
||||||
|
PostCreate(title="Published", author_id=user.id, is_published=True),
|
||||||
|
)
|
||||||
|
|
||||||
|
fetched = await UserCrud.get_or_none(
|
||||||
|
db_session,
|
||||||
|
[User.id == user.id, Post.is_published == True], # noqa: E712
|
||||||
|
joins=[(Post, Post.author_id == User.id)],
|
||||||
|
)
|
||||||
|
assert fetched is not None
|
||||||
|
assert fetched.id == user.id
|
||||||
|
|
||||||
|
# Filter that matches no join — returns None
|
||||||
|
missing = await UserCrud.get_or_none(
|
||||||
|
db_session,
|
||||||
|
[User.id == user.id, Post.is_published == False], # noqa: E712
|
||||||
|
joins=[(Post, Post.author_id == User.id)],
|
||||||
|
)
|
||||||
|
assert missing is None
|
||||||
|
|
||||||
|
|
||||||
class TestCrudFirst:
|
class TestCrudFirst:
|
||||||
"""Tests for CRUD first operations."""
|
"""Tests for CRUD first operations."""
|
||||||
|
|
||||||
@@ -321,6 +416,38 @@ class TestCrudFirst:
|
|||||||
role = await RoleCrud.first(db_session)
|
role = await RoleCrud.first(db_session)
|
||||||
assert role is not None
|
assert role is not None
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_first_with_schema(self, db_session: AsyncSession):
|
||||||
|
"""First with schema returns a Response wrapping the serialized record."""
|
||||||
|
await RoleCrud.create(db_session, RoleCreate(name="admin"))
|
||||||
|
|
||||||
|
result = await RoleCrud.first(
|
||||||
|
db_session, [Role.name == "admin"], schema=RoleRead
|
||||||
|
)
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert result.data is not None
|
||||||
|
assert result.data.name == "admin"
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_first_with_schema_not_found(self, db_session: AsyncSession):
|
||||||
|
"""First with schema returns None when no record matches."""
|
||||||
|
result = await RoleCrud.first(
|
||||||
|
db_session, [Role.name == "ghost"], schema=RoleRead
|
||||||
|
)
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_first_with_for_update(self, db_session: AsyncSession):
|
||||||
|
"""First with with_for_update locks the row."""
|
||||||
|
await RoleCrud.create(db_session, RoleCreate(name="admin"))
|
||||||
|
|
||||||
|
role = await RoleCrud.first(
|
||||||
|
db_session, [Role.name == "admin"], with_for_update=True
|
||||||
|
)
|
||||||
|
assert role is not None
|
||||||
|
assert role.name == "admin"
|
||||||
|
|
||||||
|
|
||||||
class TestCrudGetMulti:
|
class TestCrudGetMulti:
|
||||||
"""Tests for CRUD get_multi operations."""
|
"""Tests for CRUD get_multi operations."""
|
||||||
@@ -480,6 +607,69 @@ class TestCrudDelete:
|
|||||||
assert result.data is None
|
assert result.data is None
|
||||||
assert await RoleCrud.first(db_session, [Role.id == role.id]) is None
|
assert await RoleCrud.first(db_session, [Role.id == role.id]) is None
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_delete_m2m_cascade(self, db_session: AsyncSession):
|
||||||
|
"""Deleting a record with M2M relationships cleans up the association table."""
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
user = await UserCrud.create(
|
||||||
|
db_session, UserCreate(username="author", email="author@test.com")
|
||||||
|
)
|
||||||
|
tag1 = await TagCrud.create(db_session, TagCreate(name="python"))
|
||||||
|
tag2 = await TagCrud.create(db_session, TagCreate(name="fastapi"))
|
||||||
|
|
||||||
|
post = await PostM2MCrud.create(
|
||||||
|
db_session,
|
||||||
|
PostM2MCreate(
|
||||||
|
title="M2M Delete Test",
|
||||||
|
author_id=user.id,
|
||||||
|
tag_ids=[tag1.id, tag2.id],
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
await PostM2MCrud.delete(db_session, [Post.id == post.id])
|
||||||
|
|
||||||
|
# Post is gone
|
||||||
|
assert await PostCrud.first(db_session, [Post.id == post.id]) is None
|
||||||
|
|
||||||
|
# Association rows are gone — tags themselves must still exist
|
||||||
|
assert await TagCrud.first(db_session, [Tag.id == tag1.id]) is not None
|
||||||
|
assert await TagCrud.first(db_session, [Tag.id == tag2.id]) is not None
|
||||||
|
|
||||||
|
# No orphaned rows in post_tags
|
||||||
|
result = await db_session.execute(
|
||||||
|
text("SELECT COUNT(*) FROM post_tags WHERE post_id = :pid").bindparams(
|
||||||
|
pid=post.id
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert result.scalar() == 0
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_delete_m2m_does_not_delete_related_records(
|
||||||
|
self, db_session: AsyncSession
|
||||||
|
):
|
||||||
|
"""Deleting a post with M2M tags must not delete the tags themselves."""
|
||||||
|
user = await UserCrud.create(
|
||||||
|
db_session, UserCreate(username="author2", email="author2@test.com")
|
||||||
|
)
|
||||||
|
tag = await TagCrud.create(db_session, TagCreate(name="shared_tag"))
|
||||||
|
|
||||||
|
post1 = await PostM2MCrud.create(
|
||||||
|
db_session,
|
||||||
|
PostM2MCreate(title="Post 1", author_id=user.id, tag_ids=[tag.id]),
|
||||||
|
)
|
||||||
|
post2 = await PostM2MCrud.create(
|
||||||
|
db_session,
|
||||||
|
PostM2MCreate(title="Post 2", author_id=user.id, tag_ids=[tag.id]),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Delete only post1
|
||||||
|
await PostM2MCrud.delete(db_session, [Post.id == post1.id])
|
||||||
|
|
||||||
|
# Tag and post2 still exist
|
||||||
|
assert await TagCrud.first(db_session, [Tag.id == tag.id]) is not None
|
||||||
|
assert await PostCrud.first(db_session, [Post.id == post2.id]) is not None
|
||||||
|
|
||||||
|
|
||||||
class TestCrudExists:
|
class TestCrudExists:
|
||||||
"""Tests for CRUD exists operations."""
|
"""Tests for CRUD exists operations."""
|
||||||
|
|||||||
2
uv.lock
generated
2
uv.lock
generated
@@ -251,7 +251,7 @@ wheels = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fastapi-toolsets"
|
name = "fastapi-toolsets"
|
||||||
version = "2.1.0"
|
version = "2.2.0"
|
||||||
source = { editable = "." }
|
source = { editable = "." }
|
||||||
dependencies = [
|
dependencies = [
|
||||||
{ name = "asyncpg" },
|
{ name = "asyncpg" },
|
||||||
|
|||||||
Reference in New Issue
Block a user