feat: bring first() to parity with get()/get_or_none() — add with_for_update and schema support (#123)

This commit is contained in:
d3vyce
2026-03-10 19:34:18 +01:00
committed by GitHub
parent 4800941934
commit e4250a9910
3 changed files with 286 additions and 9 deletions

View File

@@ -29,6 +29,9 @@ user = await UserCrud.create(session=session, obj=UserCreateSchema(username="ali
# Get one (raises NotFoundError if not found)
user = await UserCrud.get(session=session, filters=[User.id == user_id])
# Get one or None (never raises)
user = await UserCrud.get_or_none(session=session, filters=[User.id == user_id])
# Get first or None
user = await UserCrud.first(session=session, filters=[User.email == email])
@@ -46,6 +49,36 @@ count = await UserCrud.count(session=session, filters=[User.is_active == True])
exists = await UserCrud.exists(session=session, filters=[User.email == email])
```
## Fetching a single record
Three methods fetch a single record — choose based on how you want to handle the "not found" case and whether you need strict uniqueness:
| Method | Not found | Multiple results |
|---|---|---|
| `get` | raises `NotFoundError` | raises `MultipleResultsFound` |
| `get_or_none` | returns `None` | raises `MultipleResultsFound` |
| `first` | returns `None` | returns the first match silently |
Use `get` when the record must exist (e.g. a detail endpoint that should return 404):
```python
user = await UserCrud.get(session=session, filters=[User.id == user_id])
```
Use `get_or_none` when the record may not exist but you still want strict uniqueness enforcement:
```python
user = await UserCrud.get_or_none(session=session, filters=[User.email == email])
if user is None:
... # handle missing case without catching an exception
```
Use `first` when you only care about any one match and don't need uniqueness:
```python
user = await UserCrud.first(session=session, filters=[User.is_active == True])
```
## Pagination
!!! info "Added in `v1.1` (only offset_pagination via `paginate` if `<v1.1`)"

View File

@@ -409,6 +409,82 @@ class AsyncCrud(Generic[ModelType]):
NotFoundError: If no record found
MultipleResultsFound: If more than one record found
"""
result = await cls.get_or_none(
session,
filters,
joins=joins,
outer_join=outer_join,
with_for_update=with_for_update,
load_options=load_options,
schema=schema,
)
if result is None:
raise NotFoundError()
return result
@overload
@classmethod
async def get_or_none( # pragma: no cover
cls: type[Self],
session: AsyncSession,
filters: list[Any],
*,
joins: JoinType | None = None,
outer_join: bool = False,
with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None,
schema: type[SchemaType],
) -> Response[SchemaType] | None: ...
@overload
@classmethod
async def get_or_none( # pragma: no cover
cls: type[Self],
session: AsyncSession,
filters: list[Any],
*,
joins: JoinType | None = None,
outer_join: bool = False,
with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None,
schema: None = ...,
) -> ModelType | None: ...
@classmethod
async def get_or_none(
cls: type[Self],
session: AsyncSession,
filters: list[Any],
*,
joins: JoinType | None = None,
outer_join: bool = False,
with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None,
schema: type[BaseModel] | None = None,
) -> ModelType | Response[Any] | None:
"""Get exactly one record, or ``None`` if not found.
Like :meth:`get` but returns ``None`` instead of raising
:class:`~fastapi_toolsets.exceptions.NotFoundError` when no record
matches the filters.
Args:
session: DB async session
filters: List of SQLAlchemy filter conditions
joins: List of (model, condition) tuples for joining related tables
outer_join: Use LEFT OUTER JOIN instead of INNER JOIN
with_for_update: Lock the row for update
load_options: SQLAlchemy loader options (e.g., selectinload)
schema: Pydantic schema to serialize the result into. When provided,
the result is automatically wrapped in a ``Response[schema]``.
Returns:
Model instance, ``Response[schema]`` when ``schema`` is given,
or ``None`` when no record matches.
Raises:
MultipleResultsFound: If more than one record found
"""
q = select(cls.model)
q = _apply_joins(q, joins, outer_join)
q = q.where(and_(*filters))
@@ -418,12 +494,40 @@ class AsyncCrud(Generic[ModelType]):
q = q.with_for_update()
result = await session.execute(q)
item = result.unique().scalar_one_or_none()
if not item:
raise NotFoundError()
result = cast(ModelType, item)
if item is None:
return None
db_model = cast(ModelType, item)
if schema:
return Response(data=schema.model_validate(result))
return result
return Response(data=schema.model_validate(db_model))
return db_model
@overload
@classmethod
async def first( # pragma: no cover
cls: type[Self],
session: AsyncSession,
filters: list[Any] | None = None,
*,
joins: JoinType | None = None,
outer_join: bool = False,
with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None,
schema: type[SchemaType],
) -> Response[SchemaType] | None: ...
@overload
@classmethod
async def first( # pragma: no cover
cls: type[Self],
session: AsyncSession,
filters: list[Any] | None = None,
*,
joins: JoinType | None = None,
outer_join: bool = False,
with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None,
schema: None = ...,
) -> ModelType | None: ...
@classmethod
async def first(
@@ -433,8 +537,10 @@ class AsyncCrud(Generic[ModelType]):
*,
joins: JoinType | None = None,
outer_join: bool = False,
with_for_update: bool = False,
load_options: list[ExecutableOption] | None = None,
) -> ModelType | None:
schema: type[BaseModel] | None = None,
) -> ModelType | Response[Any] | None:
"""Get the first matching record, or None.
Args:
@@ -442,10 +548,14 @@ class AsyncCrud(Generic[ModelType]):
filters: List of SQLAlchemy filter conditions
joins: List of (model, condition) tuples for joining related tables
outer_join: Use LEFT OUTER JOIN instead of INNER JOIN
load_options: SQLAlchemy loader options
with_for_update: Lock the row for update
load_options: SQLAlchemy loader options (e.g., selectinload)
schema: Pydantic schema to serialize the result into. When provided,
the result is automatically wrapped in a ``Response[schema]``.
Returns:
Model instance or None
Model instance, ``Response[schema]`` when ``schema`` is given,
or ``None`` when no record matches.
"""
q = select(cls.model)
q = _apply_joins(q, joins, outer_join)
@@ -453,8 +563,16 @@ class AsyncCrud(Generic[ModelType]):
q = q.where(and_(*filters))
if resolved := cls._resolve_load_options(load_options):
q = q.options(*resolved)
if with_for_update:
q = q.with_for_update()
result = await session.execute(q)
return cast(ModelType | None, result.unique().scalars().first())
item = result.unique().scalars().first()
if item is None:
return None
db_model = cast(ModelType, item)
if schema:
return Response(data=schema.model_validate(db_model))
return db_model
@classmethod
async def get_multi(

View File

@@ -295,6 +295,100 @@ class TestCrudGet:
assert user.username == "active"
class TestCrudGetOrNone:
"""Tests for CRUD get_or_none operations."""
@pytest.mark.anyio
async def test_returns_record_when_found(self, db_session: AsyncSession):
"""get_or_none returns the record when it exists."""
created = await RoleCrud.create(db_session, RoleCreate(name="admin"))
fetched = await RoleCrud.get_or_none(db_session, [Role.id == created.id])
assert fetched is not None
assert fetched.id == created.id
assert fetched.name == "admin"
@pytest.mark.anyio
async def test_returns_none_when_not_found(self, db_session: AsyncSession):
"""get_or_none returns None instead of raising NotFoundError."""
result = await RoleCrud.get_or_none(db_session, [Role.id == uuid.uuid4()])
assert result is None
@pytest.mark.anyio
async def test_with_schema_returns_response_when_found(
self, db_session: AsyncSession
):
"""get_or_none with schema returns Response[schema] when found."""
from fastapi_toolsets.schemas import Response
created = await RoleCrud.create(db_session, RoleCreate(name="editor"))
result = await RoleCrud.get_or_none(
db_session, [Role.id == created.id], schema=RoleRead
)
assert isinstance(result, Response)
assert isinstance(result.data, RoleRead)
assert result.data.name == "editor"
@pytest.mark.anyio
async def test_with_schema_returns_none_when_not_found(
self, db_session: AsyncSession
):
"""get_or_none with schema returns None (not Response) when not found."""
result = await RoleCrud.get_or_none(
db_session, [Role.id == uuid.uuid4()], schema=RoleRead
)
assert result is None
@pytest.mark.anyio
async def test_with_load_options(self, db_session: AsyncSession):
"""get_or_none respects load_options."""
from sqlalchemy.orm import selectinload
role = await RoleCrud.create(db_session, RoleCreate(name="member"))
user = await UserCrud.create(
db_session,
UserCreate(username="alice", email="alice@test.com", role_id=role.id),
)
fetched = await UserCrud.get_or_none(
db_session,
[User.id == user.id],
load_options=[selectinload(User.role)],
)
assert fetched is not None
assert fetched.role is not None
assert fetched.role.name == "member"
@pytest.mark.anyio
async def test_with_join(self, db_session: AsyncSession):
"""get_or_none respects joins."""
user = await UserCrud.create(
db_session, UserCreate(username="author", email="author@test.com")
)
await PostCrud.create(
db_session,
PostCreate(title="Published", author_id=user.id, is_published=True),
)
fetched = await UserCrud.get_or_none(
db_session,
[User.id == user.id, Post.is_published == True], # noqa: E712
joins=[(Post, Post.author_id == User.id)],
)
assert fetched is not None
assert fetched.id == user.id
# Filter that matches no join — returns None
missing = await UserCrud.get_or_none(
db_session,
[User.id == user.id, Post.is_published == False], # noqa: E712
joins=[(Post, Post.author_id == User.id)],
)
assert missing is None
class TestCrudFirst:
"""Tests for CRUD first operations."""
@@ -322,6 +416,38 @@ class TestCrudFirst:
role = await RoleCrud.first(db_session)
assert role is not None
@pytest.mark.anyio
async def test_first_with_schema(self, db_session: AsyncSession):
"""First with schema returns a Response wrapping the serialized record."""
await RoleCrud.create(db_session, RoleCreate(name="admin"))
result = await RoleCrud.first(
db_session, [Role.name == "admin"], schema=RoleRead
)
assert result is not None
assert result.data is not None
assert result.data.name == "admin"
@pytest.mark.anyio
async def test_first_with_schema_not_found(self, db_session: AsyncSession):
"""First with schema returns None when no record matches."""
result = await RoleCrud.first(
db_session, [Role.name == "ghost"], schema=RoleRead
)
assert result is None
@pytest.mark.anyio
async def test_first_with_for_update(self, db_session: AsyncSession):
"""First with with_for_update locks the row."""
await RoleCrud.create(db_session, RoleCreate(name="admin"))
role = await RoleCrud.first(
db_session, [Role.name == "admin"], with_for_update=True
)
assert role is not None
assert role.name == "admin"
class TestCrudGetMulti:
"""Tests for CRUD get_multi operations."""