diff --git a/docs/module/crud.md b/docs/module/crud.md index 40ca127..374d66d 100644 --- a/docs/module/crud.md +++ b/docs/module/crud.md @@ -29,6 +29,9 @@ user = await UserCrud.create(session=session, obj=UserCreateSchema(username="ali # Get one (raises NotFoundError if not found) user = await UserCrud.get(session=session, filters=[User.id == user_id]) +# Get one or None (never raises) +user = await UserCrud.get_or_none(session=session, filters=[User.id == user_id]) + # Get first or None user = await UserCrud.first(session=session, filters=[User.email == email]) @@ -46,6 +49,36 @@ count = await UserCrud.count(session=session, filters=[User.is_active == True]) exists = await UserCrud.exists(session=session, filters=[User.email == email]) ``` +## Fetching a single record + +Three methods fetch a single record — choose based on how you want to handle the "not found" case and whether you need strict uniqueness: + +| Method | Not found | Multiple results | +|---|---|---| +| `get` | raises `NotFoundError` | raises `MultipleResultsFound` | +| `get_or_none` | returns `None` | raises `MultipleResultsFound` | +| `first` | returns `None` | returns the first match silently | + +Use `get` when the record must exist (e.g. a detail endpoint that should return 404): + +```python +user = await UserCrud.get(session=session, filters=[User.id == user_id]) +``` + +Use `get_or_none` when the record may not exist but you still want strict uniqueness enforcement: + +```python +user = await UserCrud.get_or_none(session=session, filters=[User.email == email]) +if user is None: + ... # handle missing case without catching an exception +``` + +Use `first` when you only care about any one match and don't need uniqueness: + +```python +user = await UserCrud.first(session=session, filters=[User.is_active == True]) +``` + ## Pagination !!! info "Added in `v1.1` (only offset_pagination via `paginate` if ` Response[SchemaType] | None: ... + + @overload + @classmethod + async def get_or_none( # pragma: no cover + cls: type[Self], + session: AsyncSession, + filters: list[Any], + *, + joins: JoinType | None = None, + outer_join: bool = False, + with_for_update: bool = False, + load_options: list[ExecutableOption] | None = None, + schema: None = ..., + ) -> ModelType | None: ... + + @classmethod + async def get_or_none( + cls: type[Self], + session: AsyncSession, + filters: list[Any], + *, + joins: JoinType | None = None, + outer_join: bool = False, + with_for_update: bool = False, + load_options: list[ExecutableOption] | None = None, + schema: type[BaseModel] | None = None, + ) -> ModelType | Response[Any] | None: + """Get exactly one record, or ``None`` if not found. + + Like :meth:`get` but returns ``None`` instead of raising + :class:`~fastapi_toolsets.exceptions.NotFoundError` when no record + matches the filters. + + Args: + session: DB async session + filters: List of SQLAlchemy filter conditions + joins: List of (model, condition) tuples for joining related tables + outer_join: Use LEFT OUTER JOIN instead of INNER JOIN + with_for_update: Lock the row for update + load_options: SQLAlchemy loader options (e.g., selectinload) + schema: Pydantic schema to serialize the result into. When provided, + the result is automatically wrapped in a ``Response[schema]``. + + Returns: + Model instance, ``Response[schema]`` when ``schema`` is given, + or ``None`` when no record matches. + + Raises: + MultipleResultsFound: If more than one record found + """ q = select(cls.model) q = _apply_joins(q, joins, outer_join) q = q.where(and_(*filters)) @@ -418,12 +494,40 @@ class AsyncCrud(Generic[ModelType]): q = q.with_for_update() result = await session.execute(q) item = result.unique().scalar_one_or_none() - if not item: - raise NotFoundError() - result = cast(ModelType, item) + if item is None: + return None + db_model = cast(ModelType, item) if schema: - return Response(data=schema.model_validate(result)) - return result + return Response(data=schema.model_validate(db_model)) + return db_model + + @overload + @classmethod + async def first( # pragma: no cover + cls: type[Self], + session: AsyncSession, + filters: list[Any] | None = None, + *, + joins: JoinType | None = None, + outer_join: bool = False, + with_for_update: bool = False, + load_options: list[ExecutableOption] | None = None, + schema: type[SchemaType], + ) -> Response[SchemaType] | None: ... + + @overload + @classmethod + async def first( # pragma: no cover + cls: type[Self], + session: AsyncSession, + filters: list[Any] | None = None, + *, + joins: JoinType | None = None, + outer_join: bool = False, + with_for_update: bool = False, + load_options: list[ExecutableOption] | None = None, + schema: None = ..., + ) -> ModelType | None: ... @classmethod async def first( @@ -433,8 +537,10 @@ class AsyncCrud(Generic[ModelType]): *, joins: JoinType | None = None, outer_join: bool = False, + with_for_update: bool = False, load_options: list[ExecutableOption] | None = None, - ) -> ModelType | None: + schema: type[BaseModel] | None = None, + ) -> ModelType | Response[Any] | None: """Get the first matching record, or None. Args: @@ -442,10 +548,14 @@ class AsyncCrud(Generic[ModelType]): filters: List of SQLAlchemy filter conditions joins: List of (model, condition) tuples for joining related tables outer_join: Use LEFT OUTER JOIN instead of INNER JOIN - load_options: SQLAlchemy loader options + with_for_update: Lock the row for update + load_options: SQLAlchemy loader options (e.g., selectinload) + schema: Pydantic schema to serialize the result into. When provided, + the result is automatically wrapped in a ``Response[schema]``. Returns: - Model instance or None + Model instance, ``Response[schema]`` when ``schema`` is given, + or ``None`` when no record matches. """ q = select(cls.model) q = _apply_joins(q, joins, outer_join) @@ -453,8 +563,16 @@ class AsyncCrud(Generic[ModelType]): q = q.where(and_(*filters)) if resolved := cls._resolve_load_options(load_options): q = q.options(*resolved) + if with_for_update: + q = q.with_for_update() result = await session.execute(q) - return cast(ModelType | None, result.unique().scalars().first()) + item = result.unique().scalars().first() + if item is None: + return None + db_model = cast(ModelType, item) + if schema: + return Response(data=schema.model_validate(db_model)) + return db_model @classmethod async def get_multi( diff --git a/tests/test_crud.py b/tests/test_crud.py index 79506c4..f63f21a 100644 --- a/tests/test_crud.py +++ b/tests/test_crud.py @@ -295,6 +295,100 @@ class TestCrudGet: assert user.username == "active" +class TestCrudGetOrNone: + """Tests for CRUD get_or_none operations.""" + + @pytest.mark.anyio + async def test_returns_record_when_found(self, db_session: AsyncSession): + """get_or_none returns the record when it exists.""" + created = await RoleCrud.create(db_session, RoleCreate(name="admin")) + fetched = await RoleCrud.get_or_none(db_session, [Role.id == created.id]) + + assert fetched is not None + assert fetched.id == created.id + assert fetched.name == "admin" + + @pytest.mark.anyio + async def test_returns_none_when_not_found(self, db_session: AsyncSession): + """get_or_none returns None instead of raising NotFoundError.""" + result = await RoleCrud.get_or_none(db_session, [Role.id == uuid.uuid4()]) + assert result is None + + @pytest.mark.anyio + async def test_with_schema_returns_response_when_found( + self, db_session: AsyncSession + ): + """get_or_none with schema returns Response[schema] when found.""" + from fastapi_toolsets.schemas import Response + + created = await RoleCrud.create(db_session, RoleCreate(name="editor")) + result = await RoleCrud.get_or_none( + db_session, [Role.id == created.id], schema=RoleRead + ) + + assert isinstance(result, Response) + assert isinstance(result.data, RoleRead) + assert result.data.name == "editor" + + @pytest.mark.anyio + async def test_with_schema_returns_none_when_not_found( + self, db_session: AsyncSession + ): + """get_or_none with schema returns None (not Response) when not found.""" + result = await RoleCrud.get_or_none( + db_session, [Role.id == uuid.uuid4()], schema=RoleRead + ) + assert result is None + + @pytest.mark.anyio + async def test_with_load_options(self, db_session: AsyncSession): + """get_or_none respects load_options.""" + from sqlalchemy.orm import selectinload + + role = await RoleCrud.create(db_session, RoleCreate(name="member")) + user = await UserCrud.create( + db_session, + UserCreate(username="alice", email="alice@test.com", role_id=role.id), + ) + + fetched = await UserCrud.get_or_none( + db_session, + [User.id == user.id], + load_options=[selectinload(User.role)], + ) + + assert fetched is not None + assert fetched.role is not None + assert fetched.role.name == "member" + + @pytest.mark.anyio + async def test_with_join(self, db_session: AsyncSession): + """get_or_none respects joins.""" + user = await UserCrud.create( + db_session, UserCreate(username="author", email="author@test.com") + ) + await PostCrud.create( + db_session, + PostCreate(title="Published", author_id=user.id, is_published=True), + ) + + fetched = await UserCrud.get_or_none( + db_session, + [User.id == user.id, Post.is_published == True], # noqa: E712 + joins=[(Post, Post.author_id == User.id)], + ) + assert fetched is not None + assert fetched.id == user.id + + # Filter that matches no join — returns None + missing = await UserCrud.get_or_none( + db_session, + [User.id == user.id, Post.is_published == False], # noqa: E712 + joins=[(Post, Post.author_id == User.id)], + ) + assert missing is None + + class TestCrudFirst: """Tests for CRUD first operations.""" @@ -322,6 +416,38 @@ class TestCrudFirst: role = await RoleCrud.first(db_session) assert role is not None + @pytest.mark.anyio + async def test_first_with_schema(self, db_session: AsyncSession): + """First with schema returns a Response wrapping the serialized record.""" + await RoleCrud.create(db_session, RoleCreate(name="admin")) + + result = await RoleCrud.first( + db_session, [Role.name == "admin"], schema=RoleRead + ) + + assert result is not None + assert result.data is not None + assert result.data.name == "admin" + + @pytest.mark.anyio + async def test_first_with_schema_not_found(self, db_session: AsyncSession): + """First with schema returns None when no record matches.""" + result = await RoleCrud.first( + db_session, [Role.name == "ghost"], schema=RoleRead + ) + assert result is None + + @pytest.mark.anyio + async def test_first_with_for_update(self, db_session: AsyncSession): + """First with with_for_update locks the row.""" + await RoleCrud.create(db_session, RoleCreate(name="admin")) + + role = await RoleCrud.first( + db_session, [Role.name == "admin"], with_for_update=True + ) + assert role is not None + assert role.name == "admin" + class TestCrudGetMulti: """Tests for CRUD get_multi operations."""