mirror of
https://github.com/d3vyce/fastapi-toolsets.git
synced 2026-04-16 06:36:26 +02:00
Compare commits
4 Commits
v2.1.0
...
ca8718da3c
| Author | SHA1 | Date | |
|---|---|---|---|
|
ca8718da3c
|
|||
|
dde5183e68
|
|||
|
|
e4250a9910 | ||
|
|
4800941934 |
28
.github/workflows/docs.yml
vendored
28
.github/workflows/docs.yml
vendored
@@ -5,7 +5,7 @@ on:
|
||||
types: [published]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
contents: write
|
||||
pages: write
|
||||
id-token: write
|
||||
|
||||
@@ -16,9 +16,14 @@ jobs:
|
||||
url: ${{ steps.deployment.outputs.page_url }}
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/configure-pages@v5
|
||||
|
||||
- uses: actions/checkout@v6
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Configure git
|
||||
run: |
|
||||
git config user.name "github-actions[bot]"
|
||||
git config user.email "github-actions[bot]@users.noreply.github.com"
|
||||
|
||||
- name: Install uv
|
||||
uses: astral-sh/setup-uv@v7
|
||||
@@ -26,9 +31,22 @@ jobs:
|
||||
- name: Set up Python
|
||||
run: uv python install 3.13
|
||||
|
||||
- run: uv sync --group dev
|
||||
- run: uv sync --group docs
|
||||
|
||||
- run: uv run zensical build --clean
|
||||
- name: Install mkdocs shim
|
||||
run: cp scripts/mkdocs .venv/bin/mkdocs && chmod +x .venv/bin/mkdocs
|
||||
|
||||
- name: Deploy docs version
|
||||
run: |
|
||||
VERSION="${GITHUB_REF_NAME#v}"
|
||||
MINOR_VERSION="${VERSION%.*}"
|
||||
uv run mike deploy --push --update-aliases "$MINOR_VERSION" latest
|
||||
uv run mike set-default --push latest
|
||||
|
||||
- name: Prepare site artifact
|
||||
run: git worktree add site gh-pages
|
||||
|
||||
- uses: actions/configure-pages@v5
|
||||
|
||||
- uses: actions/upload-pages-artifact@v4
|
||||
with:
|
||||
|
||||
@@ -22,6 +22,8 @@ UserCrud = CrudFactory(model=User)
|
||||
|
||||
## Basic operations
|
||||
|
||||
!!! info "`get_or_none` added in `v2.2`"
|
||||
|
||||
```python
|
||||
# Create
|
||||
user = await UserCrud.create(session=session, obj=UserCreateSchema(username="alice"))
|
||||
@@ -29,6 +31,9 @@ user = await UserCrud.create(session=session, obj=UserCreateSchema(username="ali
|
||||
# Get one (raises NotFoundError if not found)
|
||||
user = await UserCrud.get(session=session, filters=[User.id == user_id])
|
||||
|
||||
# Get one or None (never raises)
|
||||
user = await UserCrud.get_or_none(session=session, filters=[User.id == user_id])
|
||||
|
||||
# Get first or None
|
||||
user = await UserCrud.first(session=session, filters=[User.email == email])
|
||||
|
||||
@@ -46,6 +51,36 @@ count = await UserCrud.count(session=session, filters=[User.is_active == True])
|
||||
exists = await UserCrud.exists(session=session, filters=[User.email == email])
|
||||
```
|
||||
|
||||
## Fetching a single record
|
||||
|
||||
Three methods fetch a single record — choose based on how you want to handle the "not found" case and whether you need strict uniqueness:
|
||||
|
||||
| Method | Not found | Multiple results |
|
||||
|---|---|---|
|
||||
| `get` | raises `NotFoundError` | raises `MultipleResultsFound` |
|
||||
| `get_or_none` | returns `None` | raises `MultipleResultsFound` |
|
||||
| `first` | returns `None` | returns the first match silently |
|
||||
|
||||
Use `get` when the record must exist (e.g. a detail endpoint that should return 404):
|
||||
|
||||
```python
|
||||
user = await UserCrud.get(session=session, filters=[User.id == user_id])
|
||||
```
|
||||
|
||||
Use `get_or_none` when the record may not exist but you still want strict uniqueness enforcement:
|
||||
|
||||
```python
|
||||
user = await UserCrud.get_or_none(session=session, filters=[User.email == email])
|
||||
if user is None:
|
||||
... # handle missing case without catching an exception
|
||||
```
|
||||
|
||||
Use `first` when you only care about any one match and don't need uniqueness:
|
||||
|
||||
```python
|
||||
user = await UserCrud.first(session=session, filters=[User.is_active == True])
|
||||
```
|
||||
|
||||
## Pagination
|
||||
|
||||
!!! info "Added in `v1.1` (only offset_pagination via `paginate` if `<v1.1`)"
|
||||
|
||||
69
docs/overrides/partials/header.html
Normal file
69
docs/overrides/partials/header.html
Normal file
@@ -0,0 +1,69 @@
|
||||
{#-
|
||||
Override of partials/header.html — adds version selector slot missing in Zensical.
|
||||
-#}
|
||||
{% set class = "md-header" %}
|
||||
{% if "navigation.tabs.sticky" in features %}
|
||||
{% set class = class ~ " md-header--shadow md-header--lifted" %}
|
||||
{% elif "navigation.tabs" not in features %}
|
||||
{% set class = class ~ " md-header--shadow" %}
|
||||
{% endif %}
|
||||
<header class="{{ class }}" data-md-component="header">
|
||||
<nav class="md-header__inner md-grid" aria-label="{{ lang.t('header') }}">
|
||||
<a href="{{ config.extra.homepage | d(nav.homepage.url, true) | url }}" title="{{ config.site_name | e }}" class="md-header__button md-logo" aria-label="{{ config.site_name }}" data-md-component="logo">
|
||||
{% include "partials/logo.html" %}
|
||||
</a>
|
||||
<label class="md-header__button md-icon" for="__drawer">
|
||||
{% set icon = config.theme.icon.menu or "material/menu" %}
|
||||
{% include ".icons/" ~ icon ~ ".svg" %}
|
||||
</label>
|
||||
<div class="md-header__title" data-md-component="header-title">
|
||||
<div class="md-header__ellipsis">
|
||||
<div class="md-header__topic">
|
||||
<span class="md-ellipsis">
|
||||
{{ config.site_name }}
|
||||
</span>
|
||||
</div>
|
||||
<div class="md-header__topic" data-md-component="header-topic">
|
||||
<span class="md-ellipsis">
|
||||
{% if page.meta and page.meta.title %}
|
||||
{{ page.meta.title }}
|
||||
{% else %}
|
||||
{{ page.title }}
|
||||
{% endif %}
|
||||
</span>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
{% if config.theme.palette %}
|
||||
{% if not config.theme.palette is mapping %}
|
||||
{% include "partials/palette.html" %}
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% if not config.theme.palette is mapping %}
|
||||
{% include "partials/javascripts/palette.html" %}
|
||||
{% endif %}
|
||||
{% if config.extra.alternate %}
|
||||
{% include "partials/alternate.html" %}
|
||||
{% endif %}
|
||||
{% if "search" in config.plugins %}
|
||||
{% set search = config.plugins["search"] | attr("config") %}
|
||||
{% if search.enabled %}
|
||||
<label class="md-header__button md-icon" for="__search">
|
||||
{% set icon = config.theme.icon.search or "material/magnify" %}
|
||||
{% include ".icons/" ~ icon ~ ".svg" %}
|
||||
</label>
|
||||
{% include "partials/search.html" %}
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
<div class="md-header__source">
|
||||
{% if config.repo_url %}
|
||||
{% include "partials/source.html" %}
|
||||
{% endif %}
|
||||
</div>
|
||||
</nav>
|
||||
{% if "navigation.tabs.sticky" in features %}
|
||||
{% if "navigation.tabs" in features %}
|
||||
{% include "partials/tabs.html" %}
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
</header>
|
||||
100
docs/stylesheets/version.css
Normal file
100
docs/stylesheets/version.css
Normal file
@@ -0,0 +1,100 @@
|
||||
/* Version selector styles for Zensical modern theme (backported from classic theme).
|
||||
The JS appends .md-version into .md-header__topic, so we make that flex. */
|
||||
.md-header__topic:has(.md-version) {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
|
||||
:root {
|
||||
--md-version-icon: url('data:image/svg+xml;charset=utf-8,<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 320 512"><path fill="currentColor" d="M140.3 376.8c12.6 10.2 31.1 9.5 42.8-2.2l128-128c9.2-9.2 11.9-22.9 6.9-34.9S301.4 192 288.5 192h-256c-12.9 0-24.6 7.8-29.6 19.8s-2.2 25.7 7 34.8l128 128z"/></svg>');
|
||||
}
|
||||
|
||||
.md-version {
|
||||
flex-shrink: 0;
|
||||
font-size: .8rem;
|
||||
height: 2.4rem;
|
||||
}
|
||||
|
||||
[dir=ltr] .md-version__current { margin-left: 1.4rem; margin-right: .4rem; }
|
||||
[dir=rtl] .md-version__current { margin-left: .4rem; margin-right: 1.4rem; }
|
||||
|
||||
.md-version__current {
|
||||
color: inherit;
|
||||
cursor: pointer;
|
||||
outline: none;
|
||||
position: relative;
|
||||
top: .05rem;
|
||||
}
|
||||
|
||||
[dir=ltr] .md-version__current:after { margin-left: .4rem; }
|
||||
[dir=rtl] .md-version__current:after { margin-right: .4rem; }
|
||||
|
||||
.md-version__current:after {
|
||||
background-color: currentcolor;
|
||||
content: "";
|
||||
display: inline-block;
|
||||
height: .6rem;
|
||||
-webkit-mask-image: var(--md-version-icon);
|
||||
mask-image: var(--md-version-icon);
|
||||
-webkit-mask-position: center;
|
||||
mask-position: center;
|
||||
-webkit-mask-repeat: no-repeat;
|
||||
mask-repeat: no-repeat;
|
||||
-webkit-mask-size: contain;
|
||||
mask-size: contain;
|
||||
width: .4rem;
|
||||
}
|
||||
|
||||
.md-version__alias {
|
||||
margin-left: .3rem;
|
||||
opacity: .7;
|
||||
}
|
||||
|
||||
.md-version__list {
|
||||
background-color: var(--md-default-bg-color);
|
||||
border-radius: .1rem;
|
||||
box-shadow: var(--md-shadow-z2);
|
||||
color: var(--md-default-fg-color);
|
||||
list-style-type: none;
|
||||
margin: .2rem .8rem;
|
||||
max-height: 0;
|
||||
opacity: 0;
|
||||
overflow: auto;
|
||||
padding: 0;
|
||||
position: absolute;
|
||||
scroll-snap-type: y mandatory;
|
||||
top: .15rem;
|
||||
transition: max-height 0ms .5s, opacity .25s .25s;
|
||||
z-index: 3;
|
||||
}
|
||||
|
||||
.md-version:focus-within .md-version__list,
|
||||
.md-version:hover .md-version__list {
|
||||
max-height: 10rem;
|
||||
opacity: 1;
|
||||
transition: max-height 0ms, opacity .25s;
|
||||
}
|
||||
|
||||
.md-version:hover .md-version__list { animation: hoverfix .25s forwards; }
|
||||
.md-version:focus-within .md-version__list { animation: none; }
|
||||
|
||||
.md-version__item { line-height: 1.8rem; }
|
||||
|
||||
[dir=ltr] .md-version__link { padding-left: .6rem; padding-right: 1.2rem; }
|
||||
[dir=rtl] .md-version__link { padding-left: 1.2rem; padding-right: .6rem; }
|
||||
|
||||
.md-version__link {
|
||||
cursor: pointer;
|
||||
display: block;
|
||||
outline: none;
|
||||
scroll-snap-align: start;
|
||||
transition: color .25s, background-color .25s;
|
||||
white-space: nowrap;
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
.md-version__link:focus,
|
||||
.md-version__link:hover { color: var(--md-accent-fg-color); }
|
||||
|
||||
.md-version__link:focus { background-color: var(--md-default-fg-color--lightest); }
|
||||
5
mkdocs.yml
Normal file
5
mkdocs.yml
Normal file
@@ -0,0 +1,5 @@
|
||||
# Minimal stub for mike compatibility.
|
||||
# The actual build is handled by zensical (see scripts/mkdocs shim).
|
||||
site_name: FastAPI Toolsets
|
||||
docs_dir: docs
|
||||
site_dir: site
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "fastapi-toolsets"
|
||||
version = "2.1.0"
|
||||
version = "2.2.0"
|
||||
description = "Production-ready utilities for FastAPI applications"
|
||||
readme = "README.md"
|
||||
license = "MIT"
|
||||
|
||||
60
scripts/mkdocs
Executable file
60
scripts/mkdocs
Executable file
@@ -0,0 +1,60 @@
|
||||
#!/usr/bin/env python3
|
||||
"""mkdocs shim for mike compatibility.
|
||||
|
||||
mike parses mkdocs.yml (valid YAML stub) for its Python internals, then calls
|
||||
`mkdocs build --config-file <mike-injected-temp.yml>` as a subprocess.
|
||||
|
||||
This shim intercepts that subprocess call, ignores the temp config, and
|
||||
delegates the actual build to `zensical build -f zensical.toml` instead.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = sys.argv[1:]
|
||||
|
||||
# mike calls `mkdocs --version` to embed in the commit message
|
||||
if args and args[0] == "--version":
|
||||
print("mkdocs, version 1.0.0 (zensical shim)")
|
||||
return
|
||||
|
||||
if not args or args[0] != "build":
|
||||
result = subprocess.run(["python3", "-m", "mkdocs"] + args)
|
||||
sys.exit(result.returncode)
|
||||
|
||||
config_file = "mkdocs.yml"
|
||||
clean = False
|
||||
|
||||
i = 1
|
||||
while i < len(args):
|
||||
if args[i] in ("-f", "--config-file") and i + 1 < len(args):
|
||||
config_file = args[i + 1]
|
||||
i += 2
|
||||
elif args[i] in ("-c", "--clean"):
|
||||
clean = True
|
||||
i += 1
|
||||
elif args[i] == "--dirty":
|
||||
i += 1
|
||||
else:
|
||||
i += 1
|
||||
|
||||
# mike creates a temp file prefixed with "mike-mkdocs"; always delegate
|
||||
# the actual build to zensical regardless of which config was passed.
|
||||
del config_file # unused — zensical auto-discovers zensical.toml
|
||||
|
||||
cmd = ["zensical", "build"]
|
||||
if clean:
|
||||
cmd.append("--clean")
|
||||
|
||||
env = os.environ.copy()
|
||||
result = subprocess.run(cmd, env=env)
|
||||
sys.exit(result.returncode)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -21,4 +21,4 @@ Example usage:
|
||||
return Response(data={"user": user.username}, message="Success")
|
||||
"""
|
||||
|
||||
__version__ = "2.1.0"
|
||||
__version__ = "2.2.0"
|
||||
|
||||
@@ -14,7 +14,6 @@ from typing import Any, ClassVar, Generic, Literal, Self, cast, overload
|
||||
from fastapi import Query
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import Date, DateTime, Float, Integer, Numeric, Uuid, and_, func, select
|
||||
from sqlalchemy import delete as sql_delete
|
||||
from sqlalchemy.dialects.postgresql import insert
|
||||
from sqlalchemy.exc import NoResultFound
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
@@ -410,6 +409,82 @@ class AsyncCrud(Generic[ModelType]):
|
||||
NotFoundError: If no record found
|
||||
MultipleResultsFound: If more than one record found
|
||||
"""
|
||||
result = await cls.get_or_none(
|
||||
session,
|
||||
filters,
|
||||
joins=joins,
|
||||
outer_join=outer_join,
|
||||
with_for_update=with_for_update,
|
||||
load_options=load_options,
|
||||
schema=schema,
|
||||
)
|
||||
if result is None:
|
||||
raise NotFoundError()
|
||||
return result
|
||||
|
||||
@overload
|
||||
@classmethod
|
||||
async def get_or_none( # pragma: no cover
|
||||
cls: type[Self],
|
||||
session: AsyncSession,
|
||||
filters: list[Any],
|
||||
*,
|
||||
joins: JoinType | None = None,
|
||||
outer_join: bool = False,
|
||||
with_for_update: bool = False,
|
||||
load_options: list[ExecutableOption] | None = None,
|
||||
schema: type[SchemaType],
|
||||
) -> Response[SchemaType] | None: ...
|
||||
|
||||
@overload
|
||||
@classmethod
|
||||
async def get_or_none( # pragma: no cover
|
||||
cls: type[Self],
|
||||
session: AsyncSession,
|
||||
filters: list[Any],
|
||||
*,
|
||||
joins: JoinType | None = None,
|
||||
outer_join: bool = False,
|
||||
with_for_update: bool = False,
|
||||
load_options: list[ExecutableOption] | None = None,
|
||||
schema: None = ...,
|
||||
) -> ModelType | None: ...
|
||||
|
||||
@classmethod
|
||||
async def get_or_none(
|
||||
cls: type[Self],
|
||||
session: AsyncSession,
|
||||
filters: list[Any],
|
||||
*,
|
||||
joins: JoinType | None = None,
|
||||
outer_join: bool = False,
|
||||
with_for_update: bool = False,
|
||||
load_options: list[ExecutableOption] | None = None,
|
||||
schema: type[BaseModel] | None = None,
|
||||
) -> ModelType | Response[Any] | None:
|
||||
"""Get exactly one record, or ``None`` if not found.
|
||||
|
||||
Like :meth:`get` but returns ``None`` instead of raising
|
||||
:class:`~fastapi_toolsets.exceptions.NotFoundError` when no record
|
||||
matches the filters.
|
||||
|
||||
Args:
|
||||
session: DB async session
|
||||
filters: List of SQLAlchemy filter conditions
|
||||
joins: List of (model, condition) tuples for joining related tables
|
||||
outer_join: Use LEFT OUTER JOIN instead of INNER JOIN
|
||||
with_for_update: Lock the row for update
|
||||
load_options: SQLAlchemy loader options (e.g., selectinload)
|
||||
schema: Pydantic schema to serialize the result into. When provided,
|
||||
the result is automatically wrapped in a ``Response[schema]``.
|
||||
|
||||
Returns:
|
||||
Model instance, ``Response[schema]`` when ``schema`` is given,
|
||||
or ``None`` when no record matches.
|
||||
|
||||
Raises:
|
||||
MultipleResultsFound: If more than one record found
|
||||
"""
|
||||
q = select(cls.model)
|
||||
q = _apply_joins(q, joins, outer_join)
|
||||
q = q.where(and_(*filters))
|
||||
@@ -419,12 +494,40 @@ class AsyncCrud(Generic[ModelType]):
|
||||
q = q.with_for_update()
|
||||
result = await session.execute(q)
|
||||
item = result.unique().scalar_one_or_none()
|
||||
if not item:
|
||||
raise NotFoundError()
|
||||
result = cast(ModelType, item)
|
||||
if item is None:
|
||||
return None
|
||||
db_model = cast(ModelType, item)
|
||||
if schema:
|
||||
return Response(data=schema.model_validate(result))
|
||||
return result
|
||||
return Response(data=schema.model_validate(db_model))
|
||||
return db_model
|
||||
|
||||
@overload
|
||||
@classmethod
|
||||
async def first( # pragma: no cover
|
||||
cls: type[Self],
|
||||
session: AsyncSession,
|
||||
filters: list[Any] | None = None,
|
||||
*,
|
||||
joins: JoinType | None = None,
|
||||
outer_join: bool = False,
|
||||
with_for_update: bool = False,
|
||||
load_options: list[ExecutableOption] | None = None,
|
||||
schema: type[SchemaType],
|
||||
) -> Response[SchemaType] | None: ...
|
||||
|
||||
@overload
|
||||
@classmethod
|
||||
async def first( # pragma: no cover
|
||||
cls: type[Self],
|
||||
session: AsyncSession,
|
||||
filters: list[Any] | None = None,
|
||||
*,
|
||||
joins: JoinType | None = None,
|
||||
outer_join: bool = False,
|
||||
with_for_update: bool = False,
|
||||
load_options: list[ExecutableOption] | None = None,
|
||||
schema: None = ...,
|
||||
) -> ModelType | None: ...
|
||||
|
||||
@classmethod
|
||||
async def first(
|
||||
@@ -434,8 +537,10 @@ class AsyncCrud(Generic[ModelType]):
|
||||
*,
|
||||
joins: JoinType | None = None,
|
||||
outer_join: bool = False,
|
||||
with_for_update: bool = False,
|
||||
load_options: list[ExecutableOption] | None = None,
|
||||
) -> ModelType | None:
|
||||
schema: type[BaseModel] | None = None,
|
||||
) -> ModelType | Response[Any] | None:
|
||||
"""Get the first matching record, or None.
|
||||
|
||||
Args:
|
||||
@@ -443,10 +548,14 @@ class AsyncCrud(Generic[ModelType]):
|
||||
filters: List of SQLAlchemy filter conditions
|
||||
joins: List of (model, condition) tuples for joining related tables
|
||||
outer_join: Use LEFT OUTER JOIN instead of INNER JOIN
|
||||
load_options: SQLAlchemy loader options
|
||||
with_for_update: Lock the row for update
|
||||
load_options: SQLAlchemy loader options (e.g., selectinload)
|
||||
schema: Pydantic schema to serialize the result into. When provided,
|
||||
the result is automatically wrapped in a ``Response[schema]``.
|
||||
|
||||
Returns:
|
||||
Model instance or None
|
||||
Model instance, ``Response[schema]`` when ``schema`` is given,
|
||||
or ``None`` when no record matches.
|
||||
"""
|
||||
q = select(cls.model)
|
||||
q = _apply_joins(q, joins, outer_join)
|
||||
@@ -454,8 +563,16 @@ class AsyncCrud(Generic[ModelType]):
|
||||
q = q.where(and_(*filters))
|
||||
if resolved := cls._resolve_load_options(load_options):
|
||||
q = q.options(*resolved)
|
||||
if with_for_update:
|
||||
q = q.with_for_update()
|
||||
result = await session.execute(q)
|
||||
return cast(ModelType | None, result.unique().scalars().first())
|
||||
item = result.unique().scalars().first()
|
||||
if item is None:
|
||||
return None
|
||||
db_model = cast(ModelType, item)
|
||||
if schema:
|
||||
return Response(data=schema.model_validate(db_model))
|
||||
return db_model
|
||||
|
||||
@classmethod
|
||||
async def get_multi(
|
||||
@@ -674,8 +791,10 @@ class AsyncCrud(Generic[ModelType]):
|
||||
``None``, or ``Response[None]`` when ``return_response=True``.
|
||||
"""
|
||||
async with get_transaction(session):
|
||||
q = sql_delete(cls.model).where(and_(*filters))
|
||||
await session.execute(q)
|
||||
result = await session.execute(select(cls.model).where(and_(*filters)))
|
||||
objects = result.scalars().all()
|
||||
for obj in objects:
|
||||
await session.delete(obj)
|
||||
if return_response:
|
||||
return Response(data=None)
|
||||
return None
|
||||
|
||||
@@ -35,6 +35,7 @@ from .conftest import (
|
||||
RoleCursorCrud,
|
||||
RoleRead,
|
||||
RoleUpdate,
|
||||
Tag,
|
||||
TagCreate,
|
||||
TagCrud,
|
||||
User,
|
||||
@@ -294,6 +295,100 @@ class TestCrudGet:
|
||||
assert user.username == "active"
|
||||
|
||||
|
||||
class TestCrudGetOrNone:
|
||||
"""Tests for CRUD get_or_none operations."""
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_returns_record_when_found(self, db_session: AsyncSession):
|
||||
"""get_or_none returns the record when it exists."""
|
||||
created = await RoleCrud.create(db_session, RoleCreate(name="admin"))
|
||||
fetched = await RoleCrud.get_or_none(db_session, [Role.id == created.id])
|
||||
|
||||
assert fetched is not None
|
||||
assert fetched.id == created.id
|
||||
assert fetched.name == "admin"
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_returns_none_when_not_found(self, db_session: AsyncSession):
|
||||
"""get_or_none returns None instead of raising NotFoundError."""
|
||||
result = await RoleCrud.get_or_none(db_session, [Role.id == uuid.uuid4()])
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_with_schema_returns_response_when_found(
|
||||
self, db_session: AsyncSession
|
||||
):
|
||||
"""get_or_none with schema returns Response[schema] when found."""
|
||||
from fastapi_toolsets.schemas import Response
|
||||
|
||||
created = await RoleCrud.create(db_session, RoleCreate(name="editor"))
|
||||
result = await RoleCrud.get_or_none(
|
||||
db_session, [Role.id == created.id], schema=RoleRead
|
||||
)
|
||||
|
||||
assert isinstance(result, Response)
|
||||
assert isinstance(result.data, RoleRead)
|
||||
assert result.data.name == "editor"
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_with_schema_returns_none_when_not_found(
|
||||
self, db_session: AsyncSession
|
||||
):
|
||||
"""get_or_none with schema returns None (not Response) when not found."""
|
||||
result = await RoleCrud.get_or_none(
|
||||
db_session, [Role.id == uuid.uuid4()], schema=RoleRead
|
||||
)
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_with_load_options(self, db_session: AsyncSession):
|
||||
"""get_or_none respects load_options."""
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
role = await RoleCrud.create(db_session, RoleCreate(name="member"))
|
||||
user = await UserCrud.create(
|
||||
db_session,
|
||||
UserCreate(username="alice", email="alice@test.com", role_id=role.id),
|
||||
)
|
||||
|
||||
fetched = await UserCrud.get_or_none(
|
||||
db_session,
|
||||
[User.id == user.id],
|
||||
load_options=[selectinload(User.role)],
|
||||
)
|
||||
|
||||
assert fetched is not None
|
||||
assert fetched.role is not None
|
||||
assert fetched.role.name == "member"
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_with_join(self, db_session: AsyncSession):
|
||||
"""get_or_none respects joins."""
|
||||
user = await UserCrud.create(
|
||||
db_session, UserCreate(username="author", email="author@test.com")
|
||||
)
|
||||
await PostCrud.create(
|
||||
db_session,
|
||||
PostCreate(title="Published", author_id=user.id, is_published=True),
|
||||
)
|
||||
|
||||
fetched = await UserCrud.get_or_none(
|
||||
db_session,
|
||||
[User.id == user.id, Post.is_published == True], # noqa: E712
|
||||
joins=[(Post, Post.author_id == User.id)],
|
||||
)
|
||||
assert fetched is not None
|
||||
assert fetched.id == user.id
|
||||
|
||||
# Filter that matches no join — returns None
|
||||
missing = await UserCrud.get_or_none(
|
||||
db_session,
|
||||
[User.id == user.id, Post.is_published == False], # noqa: E712
|
||||
joins=[(Post, Post.author_id == User.id)],
|
||||
)
|
||||
assert missing is None
|
||||
|
||||
|
||||
class TestCrudFirst:
|
||||
"""Tests for CRUD first operations."""
|
||||
|
||||
@@ -321,6 +416,38 @@ class TestCrudFirst:
|
||||
role = await RoleCrud.first(db_session)
|
||||
assert role is not None
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_first_with_schema(self, db_session: AsyncSession):
|
||||
"""First with schema returns a Response wrapping the serialized record."""
|
||||
await RoleCrud.create(db_session, RoleCreate(name="admin"))
|
||||
|
||||
result = await RoleCrud.first(
|
||||
db_session, [Role.name == "admin"], schema=RoleRead
|
||||
)
|
||||
|
||||
assert result is not None
|
||||
assert result.data is not None
|
||||
assert result.data.name == "admin"
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_first_with_schema_not_found(self, db_session: AsyncSession):
|
||||
"""First with schema returns None when no record matches."""
|
||||
result = await RoleCrud.first(
|
||||
db_session, [Role.name == "ghost"], schema=RoleRead
|
||||
)
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_first_with_for_update(self, db_session: AsyncSession):
|
||||
"""First with with_for_update locks the row."""
|
||||
await RoleCrud.create(db_session, RoleCreate(name="admin"))
|
||||
|
||||
role = await RoleCrud.first(
|
||||
db_session, [Role.name == "admin"], with_for_update=True
|
||||
)
|
||||
assert role is not None
|
||||
assert role.name == "admin"
|
||||
|
||||
|
||||
class TestCrudGetMulti:
|
||||
"""Tests for CRUD get_multi operations."""
|
||||
@@ -480,6 +607,69 @@ class TestCrudDelete:
|
||||
assert result.data is None
|
||||
assert await RoleCrud.first(db_session, [Role.id == role.id]) is None
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_delete_m2m_cascade(self, db_session: AsyncSession):
|
||||
"""Deleting a record with M2M relationships cleans up the association table."""
|
||||
from sqlalchemy import text
|
||||
|
||||
user = await UserCrud.create(
|
||||
db_session, UserCreate(username="author", email="author@test.com")
|
||||
)
|
||||
tag1 = await TagCrud.create(db_session, TagCreate(name="python"))
|
||||
tag2 = await TagCrud.create(db_session, TagCreate(name="fastapi"))
|
||||
|
||||
post = await PostM2MCrud.create(
|
||||
db_session,
|
||||
PostM2MCreate(
|
||||
title="M2M Delete Test",
|
||||
author_id=user.id,
|
||||
tag_ids=[tag1.id, tag2.id],
|
||||
),
|
||||
)
|
||||
|
||||
await PostM2MCrud.delete(db_session, [Post.id == post.id])
|
||||
|
||||
# Post is gone
|
||||
assert await PostCrud.first(db_session, [Post.id == post.id]) is None
|
||||
|
||||
# Association rows are gone — tags themselves must still exist
|
||||
assert await TagCrud.first(db_session, [Tag.id == tag1.id]) is not None
|
||||
assert await TagCrud.first(db_session, [Tag.id == tag2.id]) is not None
|
||||
|
||||
# No orphaned rows in post_tags
|
||||
result = await db_session.execute(
|
||||
text("SELECT COUNT(*) FROM post_tags WHERE post_id = :pid").bindparams(
|
||||
pid=post.id
|
||||
)
|
||||
)
|
||||
assert result.scalar() == 0
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_delete_m2m_does_not_delete_related_records(
|
||||
self, db_session: AsyncSession
|
||||
):
|
||||
"""Deleting a post with M2M tags must not delete the tags themselves."""
|
||||
user = await UserCrud.create(
|
||||
db_session, UserCreate(username="author2", email="author2@test.com")
|
||||
)
|
||||
tag = await TagCrud.create(db_session, TagCreate(name="shared_tag"))
|
||||
|
||||
post1 = await PostM2MCrud.create(
|
||||
db_session,
|
||||
PostM2MCreate(title="Post 1", author_id=user.id, tag_ids=[tag.id]),
|
||||
)
|
||||
post2 = await PostM2MCrud.create(
|
||||
db_session,
|
||||
PostM2MCreate(title="Post 2", author_id=user.id, tag_ids=[tag.id]),
|
||||
)
|
||||
|
||||
# Delete only post1
|
||||
await PostM2MCrud.delete(db_session, [Post.id == post1.id])
|
||||
|
||||
# Tag and post2 still exist
|
||||
assert await TagCrud.first(db_session, [Tag.id == tag.id]) is not None
|
||||
assert await PostCrud.first(db_session, [Post.id == post2.id]) is not None
|
||||
|
||||
|
||||
class TestCrudExists:
|
||||
"""Tests for CRUD exists operations."""
|
||||
|
||||
Reference in New Issue
Block a user