Compare commits

..

3 Commits

6 changed files with 359 additions and 15 deletions

View File

@@ -22,6 +22,8 @@ UserCrud = CrudFactory(model=User)
## Basic operations ## Basic operations
!!! info "`get_or_none` added in `v2.2`"
```python ```python
# Create # Create
user = await UserCrud.create(session=session, obj=UserCreateSchema(username="alice")) user = await UserCrud.create(session=session, obj=UserCreateSchema(username="alice"))
@@ -29,6 +31,9 @@ user = await UserCrud.create(session=session, obj=UserCreateSchema(username="ali
# Get one (raises NotFoundError if not found) # Get one (raises NotFoundError if not found)
user = await UserCrud.get(session=session, filters=[User.id == user_id]) user = await UserCrud.get(session=session, filters=[User.id == user_id])
# Get one or None (never raises)
user = await UserCrud.get_or_none(session=session, filters=[User.id == user_id])
# Get first or None # Get first or None
user = await UserCrud.first(session=session, filters=[User.email == email]) user = await UserCrud.first(session=session, filters=[User.email == email])
@@ -46,6 +51,36 @@ count = await UserCrud.count(session=session, filters=[User.is_active == True])
exists = await UserCrud.exists(session=session, filters=[User.email == email]) exists = await UserCrud.exists(session=session, filters=[User.email == email])
``` ```
## Fetching a single record
Three methods fetch a single record — choose based on how you want to handle the "not found" case and whether you need strict uniqueness:
| Method | Not found | Multiple results |
|---|---|---|
| `get` | raises `NotFoundError` | raises `MultipleResultsFound` |
| `get_or_none` | returns `None` | raises `MultipleResultsFound` |
| `first` | returns `None` | returns the first match silently |
Use `get` when the record must exist (e.g. a detail endpoint that should return 404):
```python
user = await UserCrud.get(session=session, filters=[User.id == user_id])
```
Use `get_or_none` when the record may not exist but you still want strict uniqueness enforcement:
```python
user = await UserCrud.get_or_none(session=session, filters=[User.email == email])
if user is None:
... # handle missing case without catching an exception
```
Use `first` when you only care about any one match and don't need uniqueness:
```python
user = await UserCrud.first(session=session, filters=[User.is_active == True])
```
## Pagination ## Pagination
!!! info "Added in `v1.1` (only offset_pagination via `paginate` if `<v1.1`)" !!! info "Added in `v1.1` (only offset_pagination via `paginate` if `<v1.1`)"

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "fastapi-toolsets" name = "fastapi-toolsets"
version = "2.1.0" version = "2.2.0"
description = "Production-ready utilities for FastAPI applications" description = "Production-ready utilities for FastAPI applications"
readme = "README.md" readme = "README.md"
license = "MIT" license = "MIT"

View File

@@ -21,4 +21,4 @@ Example usage:
return Response(data={"user": user.username}, message="Success") return Response(data={"user": user.username}, message="Success")
""" """
__version__ = "2.1.0" __version__ = "2.2.0"

View File

@@ -14,7 +14,6 @@ from typing import Any, ClassVar, Generic, Literal, Self, cast, overload
from fastapi import Query from fastapi import Query
from pydantic import BaseModel from pydantic import BaseModel
from sqlalchemy import Date, DateTime, Float, Integer, Numeric, Uuid, and_, func, select from sqlalchemy import Date, DateTime, Float, Integer, Numeric, Uuid, and_, func, select
from sqlalchemy import delete as sql_delete
from sqlalchemy.dialects.postgresql import insert from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.exc import NoResultFound from sqlalchemy.exc import NoResultFound
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
@@ -410,6 +409,82 @@ class AsyncCrud(Generic[ModelType]):
NotFoundError: If no record found NotFoundError: If no record found
MultipleResultsFound: If more than one record found MultipleResultsFound: If more than one record found
""" """
result = await cls.get_or_none(
session,
filters,
joins=joins,
outer_join=outer_join,
with_for_update=with_for_update,
load_options=load_options,
schema=schema,
)
if result is None:
raise NotFoundError()
return result
@overload
@classmethod
async def get_or_none( # pragma: no cover
cls: type[Self],
session: AsyncSession,
filters: list[Any],
*,
joins: JoinType | None = None,
outer_join: bool = False,
with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None,
schema: type[SchemaType],
) -> Response[SchemaType] | None: ...
@overload
@classmethod
async def get_or_none( # pragma: no cover
cls: type[Self],
session: AsyncSession,
filters: list[Any],
*,
joins: JoinType | None = None,
outer_join: bool = False,
with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None,
schema: None = ...,
) -> ModelType | None: ...
@classmethod
async def get_or_none(
cls: type[Self],
session: AsyncSession,
filters: list[Any],
*,
joins: JoinType | None = None,
outer_join: bool = False,
with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None,
schema: type[BaseModel] | None = None,
) -> ModelType | Response[Any] | None:
"""Get exactly one record, or ``None`` if not found.
Like :meth:`get` but returns ``None`` instead of raising
:class:`~fastapi_toolsets.exceptions.NotFoundError` when no record
matches the filters.
Args:
session: DB async session
filters: List of SQLAlchemy filter conditions
joins: List of (model, condition) tuples for joining related tables
outer_join: Use LEFT OUTER JOIN instead of INNER JOIN
with_for_update: Lock the row for update
load_options: SQLAlchemy loader options (e.g., selectinload)
schema: Pydantic schema to serialize the result into. When provided,
the result is automatically wrapped in a ``Response[schema]``.
Returns:
Model instance, ``Response[schema]`` when ``schema`` is given,
or ``None`` when no record matches.
Raises:
MultipleResultsFound: If more than one record found
"""
q = select(cls.model) q = select(cls.model)
q = _apply_joins(q, joins, outer_join) q = _apply_joins(q, joins, outer_join)
q = q.where(and_(*filters)) q = q.where(and_(*filters))
@@ -419,12 +494,40 @@ class AsyncCrud(Generic[ModelType]):
q = q.with_for_update() q = q.with_for_update()
result = await session.execute(q) result = await session.execute(q)
item = result.unique().scalar_one_or_none() item = result.unique().scalar_one_or_none()
if not item: if item is None:
raise NotFoundError() return None
result = cast(ModelType, item) db_model = cast(ModelType, item)
if schema: if schema:
return Response(data=schema.model_validate(result)) return Response(data=schema.model_validate(db_model))
return result return db_model
@overload
@classmethod
async def first( # pragma: no cover
cls: type[Self],
session: AsyncSession,
filters: list[Any] | None = None,
*,
joins: JoinType | None = None,
outer_join: bool = False,
with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None,
schema: type[SchemaType],
) -> Response[SchemaType] | None: ...
@overload
@classmethod
async def first( # pragma: no cover
cls: type[Self],
session: AsyncSession,
filters: list[Any] | None = None,
*,
joins: JoinType | None = None,
outer_join: bool = False,
with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None,
schema: None = ...,
) -> ModelType | None: ...
@classmethod @classmethod
async def first( async def first(
@@ -434,8 +537,10 @@ class AsyncCrud(Generic[ModelType]):
*, *,
joins: JoinType | None = None, joins: JoinType | None = None,
outer_join: bool = False, outer_join: bool = False,
with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None, load_options: list[ExecutableOption] | None = None,
) -> ModelType | None: schema: type[BaseModel] | None = None,
) -> ModelType | Response[Any] | None:
"""Get the first matching record, or None. """Get the first matching record, or None.
Args: Args:
@@ -443,10 +548,14 @@ class AsyncCrud(Generic[ModelType]):
filters: List of SQLAlchemy filter conditions filters: List of SQLAlchemy filter conditions
joins: List of (model, condition) tuples for joining related tables joins: List of (model, condition) tuples for joining related tables
outer_join: Use LEFT OUTER JOIN instead of INNER JOIN outer_join: Use LEFT OUTER JOIN instead of INNER JOIN
load_options: SQLAlchemy loader options with_for_update: Lock the row for update
load_options: SQLAlchemy loader options (e.g., selectinload)
schema: Pydantic schema to serialize the result into. When provided,
the result is automatically wrapped in a ``Response[schema]``.
Returns: Returns:
Model instance or None Model instance, ``Response[schema]`` when ``schema`` is given,
or ``None`` when no record matches.
""" """
q = select(cls.model) q = select(cls.model)
q = _apply_joins(q, joins, outer_join) q = _apply_joins(q, joins, outer_join)
@@ -454,8 +563,16 @@ class AsyncCrud(Generic[ModelType]):
q = q.where(and_(*filters)) q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options): if resolved := cls._resolve_load_options(load_options):
q = q.options(*resolved) q = q.options(*resolved)
if with_for_update:
q = q.with_for_update()
result = await session.execute(q) result = await session.execute(q)
return cast(ModelType | None, result.unique().scalars().first()) item = result.unique().scalars().first()
if item is None:
return None
db_model = cast(ModelType, item)
if schema:
return Response(data=schema.model_validate(db_model))
return db_model
@classmethod @classmethod
async def get_multi( async def get_multi(
@@ -674,8 +791,10 @@ class AsyncCrud(Generic[ModelType]):
``None``, or ``Response[None]`` when ``return_response=True``. ``None``, or ``Response[None]`` when ``return_response=True``.
""" """
async with get_transaction(session): async with get_transaction(session):
q = sql_delete(cls.model).where(and_(*filters)) result = await session.execute(select(cls.model).where(and_(*filters)))
await session.execute(q) objects = result.scalars().all()
for obj in objects:
await session.delete(obj)
if return_response: if return_response:
return Response(data=None) return Response(data=None)
return None return None

View File

@@ -35,6 +35,7 @@ from .conftest import (
RoleCursorCrud, RoleCursorCrud,
RoleRead, RoleRead,
RoleUpdate, RoleUpdate,
Tag,
TagCreate, TagCreate,
TagCrud, TagCrud,
User, User,
@@ -294,6 +295,100 @@ class TestCrudGet:
assert user.username == "active" assert user.username == "active"
class TestCrudGetOrNone:
"""Tests for CRUD get_or_none operations."""
@pytest.mark.anyio
async def test_returns_record_when_found(self, db_session: AsyncSession):
"""get_or_none returns the record when it exists."""
created = await RoleCrud.create(db_session, RoleCreate(name="admin"))
fetched = await RoleCrud.get_or_none(db_session, [Role.id == created.id])
assert fetched is not None
assert fetched.id == created.id
assert fetched.name == "admin"
@pytest.mark.anyio
async def test_returns_none_when_not_found(self, db_session: AsyncSession):
"""get_or_none returns None instead of raising NotFoundError."""
result = await RoleCrud.get_or_none(db_session, [Role.id == uuid.uuid4()])
assert result is None
@pytest.mark.anyio
async def test_with_schema_returns_response_when_found(
self, db_session: AsyncSession
):
"""get_or_none with schema returns Response[schema] when found."""
from fastapi_toolsets.schemas import Response
created = await RoleCrud.create(db_session, RoleCreate(name="editor"))
result = await RoleCrud.get_or_none(
db_session, [Role.id == created.id], schema=RoleRead
)
assert isinstance(result, Response)
assert isinstance(result.data, RoleRead)
assert result.data.name == "editor"
@pytest.mark.anyio
async def test_with_schema_returns_none_when_not_found(
self, db_session: AsyncSession
):
"""get_or_none with schema returns None (not Response) when not found."""
result = await RoleCrud.get_or_none(
db_session, [Role.id == uuid.uuid4()], schema=RoleRead
)
assert result is None
@pytest.mark.anyio
async def test_with_load_options(self, db_session: AsyncSession):
"""get_or_none respects load_options."""
from sqlalchemy.orm import selectinload
role = await RoleCrud.create(db_session, RoleCreate(name="member"))
user = await UserCrud.create(
db_session,
UserCreate(username="alice", email="alice@test.com", role_id=role.id),
)
fetched = await UserCrud.get_or_none(
db_session,
[User.id == user.id],
load_options=[selectinload(User.role)],
)
assert fetched is not None
assert fetched.role is not None
assert fetched.role.name == "member"
@pytest.mark.anyio
async def test_with_join(self, db_session: AsyncSession):
"""get_or_none respects joins."""
user = await UserCrud.create(
db_session, UserCreate(username="author", email="author@test.com")
)
await PostCrud.create(
db_session,
PostCreate(title="Published", author_id=user.id, is_published=True),
)
fetched = await UserCrud.get_or_none(
db_session,
[User.id == user.id, Post.is_published == True], # noqa: E712
joins=[(Post, Post.author_id == User.id)],
)
assert fetched is not None
assert fetched.id == user.id
# Filter that matches no join — returns None
missing = await UserCrud.get_or_none(
db_session,
[User.id == user.id, Post.is_published == False], # noqa: E712
joins=[(Post, Post.author_id == User.id)],
)
assert missing is None
class TestCrudFirst: class TestCrudFirst:
"""Tests for CRUD first operations.""" """Tests for CRUD first operations."""
@@ -321,6 +416,38 @@ class TestCrudFirst:
role = await RoleCrud.first(db_session) role = await RoleCrud.first(db_session)
assert role is not None assert role is not None
@pytest.mark.anyio
async def test_first_with_schema(self, db_session: AsyncSession):
"""First with schema returns a Response wrapping the serialized record."""
await RoleCrud.create(db_session, RoleCreate(name="admin"))
result = await RoleCrud.first(
db_session, [Role.name == "admin"], schema=RoleRead
)
assert result is not None
assert result.data is not None
assert result.data.name == "admin"
@pytest.mark.anyio
async def test_first_with_schema_not_found(self, db_session: AsyncSession):
"""First with schema returns None when no record matches."""
result = await RoleCrud.first(
db_session, [Role.name == "ghost"], schema=RoleRead
)
assert result is None
@pytest.mark.anyio
async def test_first_with_for_update(self, db_session: AsyncSession):
"""First with with_for_update locks the row."""
await RoleCrud.create(db_session, RoleCreate(name="admin"))
role = await RoleCrud.first(
db_session, [Role.name == "admin"], with_for_update=True
)
assert role is not None
assert role.name == "admin"
class TestCrudGetMulti: class TestCrudGetMulti:
"""Tests for CRUD get_multi operations.""" """Tests for CRUD get_multi operations."""
@@ -480,6 +607,69 @@ class TestCrudDelete:
assert result.data is None assert result.data is None
assert await RoleCrud.first(db_session, [Role.id == role.id]) is None assert await RoleCrud.first(db_session, [Role.id == role.id]) is None
@pytest.mark.anyio
async def test_delete_m2m_cascade(self, db_session: AsyncSession):
"""Deleting a record with M2M relationships cleans up the association table."""
from sqlalchemy import text
user = await UserCrud.create(
db_session, UserCreate(username="author", email="author@test.com")
)
tag1 = await TagCrud.create(db_session, TagCreate(name="python"))
tag2 = await TagCrud.create(db_session, TagCreate(name="fastapi"))
post = await PostM2MCrud.create(
db_session,
PostM2MCreate(
title="M2M Delete Test",
author_id=user.id,
tag_ids=[tag1.id, tag2.id],
),
)
await PostM2MCrud.delete(db_session, [Post.id == post.id])
# Post is gone
assert await PostCrud.first(db_session, [Post.id == post.id]) is None
# Association rows are gone — tags themselves must still exist
assert await TagCrud.first(db_session, [Tag.id == tag1.id]) is not None
assert await TagCrud.first(db_session, [Tag.id == tag2.id]) is not None
# No orphaned rows in post_tags
result = await db_session.execute(
text("SELECT COUNT(*) FROM post_tags WHERE post_id = :pid").bindparams(
pid=post.id
)
)
assert result.scalar() == 0
@pytest.mark.anyio
async def test_delete_m2m_does_not_delete_related_records(
self, db_session: AsyncSession
):
"""Deleting a post with M2M tags must not delete the tags themselves."""
user = await UserCrud.create(
db_session, UserCreate(username="author2", email="author2@test.com")
)
tag = await TagCrud.create(db_session, TagCreate(name="shared_tag"))
post1 = await PostM2MCrud.create(
db_session,
PostM2MCreate(title="Post 1", author_id=user.id, tag_ids=[tag.id]),
)
post2 = await PostM2MCrud.create(
db_session,
PostM2MCreate(title="Post 2", author_id=user.id, tag_ids=[tag.id]),
)
# Delete only post1
await PostM2MCrud.delete(db_session, [Post.id == post1.id])
# Tag and post2 still exist
assert await TagCrud.first(db_session, [Tag.id == tag.id]) is not None
assert await PostCrud.first(db_session, [Post.id == post2.id]) is not None
class TestCrudExists: class TestCrudExists:
"""Tests for CRUD exists operations.""" """Tests for CRUD exists operations."""

2
uv.lock generated
View File

@@ -251,7 +251,7 @@ wheels = [
[[package]] [[package]]
name = "fastapi-toolsets" name = "fastapi-toolsets"
version = "2.1.0" version = "2.2.0"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "asyncpg" }, { name = "asyncpg" },