From baebf022f651536efa460b41b1511feb57e884c0 Mon Sep 17 00:00:00 2001 From: d3vyce <44915747+d3vyce@users.noreply.github.com> Date: Mon, 9 Mar 2026 17:15:50 +0100 Subject: [PATCH] feat: move db related function from pytest to db module (#119) --- docs/module/db.md | 27 +++ docs/module/pytest.md | 20 +- docs/reference/db.md | 6 + docs/reference/pytest.md | 2 - src/fastapi_toolsets/db.py | 71 ++++++- src/fastapi_toolsets/pytest/utils.py | 288 ++++++++++++--------------- tests/test_db.py | 88 +++++++- tests/test_pytest.py | 56 +----- 8 files changed, 335 insertions(+), 223 deletions(-) diff --git a/docs/module/db.md b/docs/module/db.md index 5116369..09b2be1 100644 --- a/docs/module/db.md +++ b/docs/module/db.md @@ -87,6 +87,33 @@ await wait_for_row_change( ) ``` +## Creating a database + +[`create_database`](../reference/db.md#fastapi_toolsets.db.create_database) creates a database at a given URL. It connects to *server_url* and issues a `CREATE DATABASE` statement: + +```python +from fastapi_toolsets.db import create_database + +SERVER_URL = "postgresql+asyncpg://postgres:postgres@localhost/postgres" + +await create_database(db_name="myapp_test", server_url=SERVER_URL) +``` + +For test isolation with automatic cleanup, use [`create_worker_database`](../reference/pytest.md#fastapi_toolsets.pytest.utils.create_worker_database) from the `pytest` module instead — it handles drop-before, create, and drop-after automatically. + +## Cleaning up tables + +[`cleanup_tables`](../reference/db.md#fastapi_toolsets.db.cleanup_tables) truncates all tables: + +```python +from fastapi_toolsets.db import cleanup_tables + +@pytest.fixture(autouse=True) +async def clean(db_session): + yield + await cleanup_tables(session=db_session, base=Base) +``` + --- [:material-api: API Reference](../reference/db.md) diff --git a/docs/module/pytest.md b/docs/module/pytest.md index 8bac694..52824b8 100644 --- a/docs/module/pytest.md +++ b/docs/module/pytest.md @@ -40,10 +40,10 @@ async def http_client(db_session): ## Database sessions in tests -Use [`create_db_session`](../reference/pytest.md#fastapi_toolsets.pytest.utils.create_db_session) to create an isolated `AsyncSession` for a test: +Use [`create_db_session`](../reference/pytest.md#fastapi_toolsets.pytest.utils.create_db_session) to create an isolated `AsyncSession` for a test, combined with [`create_worker_database`](../reference/pytest.md#fastapi_toolsets.pytest.utils.create_worker_database) to set up a per-worker database: ```python -from fastapi_toolsets.pytest import create_db_session, create_worker_database +from fastapi_toolsets.pytest import create_worker_database, create_db_session @pytest.fixture(scope="session") async def worker_db_url(): @@ -64,16 +64,28 @@ async def db_session(worker_db_url): !!! info In this example, the database is reset between each test using the argument `cleanup=True`. +Use [`worker_database_url`](../reference/pytest.md#fastapi_toolsets.pytest.utils.worker_database_url) to derive the per-worker URL manually if needed: + +```python +from fastapi_toolsets.pytest import worker_database_url + +url = worker_database_url("postgresql+asyncpg://user:pass@localhost/test_db", default_test_db="test") +# e.g. "postgresql+asyncpg://user:pass@localhost/test_db_gw0" under xdist +``` + ## Parallel testing with pytest-xdist The examples above are already compatible with parallel test execution with `pytest-xdist`. ## Cleaning up tables -If you want to manually clean up a database you can use [`cleanup_tables`](../reference/pytest.md#fastapi_toolsets.pytest.utils.cleanup_tables), this will truncates all tables between tests for fast isolation: +!!! warning + Since `V2.1.0` `cleanup_tables` now live in `fastapi_toolsets.db`. For backward compatibility the function is still available in `fastapi_toolsets.pytest`, but this will be remove in `V3.0.0`. + +If you want to manually clean up a database you can use [`cleanup_tables`](../reference/db.md#fastapi_toolsets.db.cleanup_tables), this will truncate all tables between tests for fast isolation: ```python -from fastapi_toolsets.pytest import cleanup_tables +from fastapi_toolsets.db import cleanup_tables @pytest.fixture(autouse=True) async def clean(db_session): diff --git a/docs/reference/db.md b/docs/reference/db.md index 1fafc70..e34a790 100644 --- a/docs/reference/db.md +++ b/docs/reference/db.md @@ -7,6 +7,8 @@ You can import them directly from `fastapi_toolsets.db`: ```python from fastapi_toolsets.db import ( LockMode, + cleanup_tables, + create_database, create_db_dependency, create_db_context, get_transaction, @@ -26,3 +28,7 @@ from fastapi_toolsets.db import ( ## ::: fastapi_toolsets.db.lock_tables ## ::: fastapi_toolsets.db.wait_for_row_change + +## ::: fastapi_toolsets.db.create_database + +## ::: fastapi_toolsets.db.cleanup_tables diff --git a/docs/reference/pytest.md b/docs/reference/pytest.md index faaefcb..06fe2f1 100644 --- a/docs/reference/pytest.md +++ b/docs/reference/pytest.md @@ -24,5 +24,3 @@ from fastapi_toolsets.pytest import ( ## ::: fastapi_toolsets.pytest.utils.worker_database_url ## ::: fastapi_toolsets.pytest.utils.create_worker_database - -## ::: fastapi_toolsets.pytest.utils.cleanup_tables diff --git a/src/fastapi_toolsets/db.py b/src/fastapi_toolsets/db.py index 641a5a9..fdeaab0 100644 --- a/src/fastapi_toolsets/db.py +++ b/src/fastapi_toolsets/db.py @@ -7,17 +7,19 @@ from enum import Enum from typing import Any, TypeVar from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.orm import DeclarativeBase from .exceptions import NotFoundError __all__ = [ "LockMode", + "cleanup_tables", + "create_database", "create_db_context", "create_db_dependency", - "lock_tables", "get_transaction", + "lock_tables", "wait_for_row_change", ] @@ -188,6 +190,71 @@ async def lock_tables( yield session +async def create_database( + db_name: str, + *, + server_url: str, +) -> None: + """Create a database. + + Connects to *server_url* using ``AUTOCOMMIT`` isolation and issues a + ``CREATE DATABASE`` statement for *db_name*. + + Args: + db_name: Name of the database to create. + server_url: URL used for server-level DDL (must point to an existing + database on the same server). + + Example: + ```python + from fastapi_toolsets.db import create_database + + SERVER_URL = "postgresql+asyncpg://postgres:postgres@localhost/postgres" + await create_database("myapp_test", server_url=SERVER_URL) + ``` + """ + engine = create_async_engine(server_url, isolation_level="AUTOCOMMIT") + try: + async with engine.connect() as conn: + await conn.execute(text(f"CREATE DATABASE {db_name}")) + finally: + await engine.dispose() + + +async def cleanup_tables( + session: AsyncSession, + base: type[DeclarativeBase], +) -> None: + """Truncate all tables for fast between-test cleanup. + + Executes a single ``TRUNCATE … RESTART IDENTITY CASCADE`` statement + across every table in *base*'s metadata, which is significantly faster + than dropping and re-creating tables between tests. + + This is a no-op when the metadata contains no tables. + + Args: + session: An active async database session. + base: SQLAlchemy DeclarativeBase class containing model metadata. + + Example: + ```python + @pytest.fixture + async def db_session(worker_db_url): + async with create_db_session(worker_db_url, Base) as session: + yield session + await cleanup_tables(session, Base) + ``` + """ + tables = base.metadata.sorted_tables + if not tables: + return + + table_names = ", ".join(f'"{t.name}"' for t in tables) + await session.execute(text(f"TRUNCATE {table_names} RESTART IDENTITY CASCADE")) + await session.commit() + + _M = TypeVar("_M", bound=DeclarativeBase) diff --git a/src/fastapi_toolsets/pytest/utils.py b/src/fastapi_toolsets/pytest/utils.py index 01f7615..28d23db 100644 --- a/src/fastapi_toolsets/pytest/utils.py +++ b/src/fastapi_toolsets/pytest/utils.py @@ -1,12 +1,12 @@ """Pytest helper utilities for FastAPI testing.""" import os +import warnings from collections.abc import AsyncGenerator, Callable from contextlib import asynccontextmanager from typing import Any from httpx import ASGITransport, AsyncClient -from sqlalchemy import text from sqlalchemy.engine import make_url from sqlalchemy.ext.asyncio import ( AsyncSession, @@ -15,7 +15,134 @@ from sqlalchemy.ext.asyncio import ( ) from sqlalchemy.orm import DeclarativeBase -from ..db import create_db_context +from sqlalchemy import text + +from ..db import ( + cleanup_tables as _cleanup_tables, + create_database, + create_db_context, +) + + +async def cleanup_tables( + session: AsyncSession, + base: type[DeclarativeBase], +) -> None: + """Truncate all tables for fast between-test cleanup. + + .. deprecated:: + Import ``cleanup_tables`` from ``fastapi_toolsets.db`` instead. + This re-export will be removed in v3.0.0. + """ + warnings.warn( + "Importing cleanup_tables from fastapi_toolsets.pytest is deprecated " + "and will be removed in v3.0.0. " + "Use 'from fastapi_toolsets.db import cleanup_tables' instead.", + DeprecationWarning, + stacklevel=2, + ) + await _cleanup_tables(session=session, base=base) + + +def _get_xdist_worker(default_test_db: str) -> str: + """Return the pytest-xdist worker name, or *default_test_db* when not running under xdist. + + Reads the ``PYTEST_XDIST_WORKER`` environment variable that xdist sets + automatically in each worker process (e.g. ``"gw0"``, ``"gw1"``). + When xdist is not installed or not active, the variable is absent and + *default_test_db* is returned instead. + + Args: + default_test_db: Fallback value returned when ``PYTEST_XDIST_WORKER`` + is not set. + """ + return os.environ.get("PYTEST_XDIST_WORKER", default_test_db) + + +def worker_database_url(database_url: str, default_test_db: str) -> str: + """Derive a per-worker database URL for pytest-xdist parallel runs. + + Appends ``_{worker_name}`` to the database name so each xdist worker + operates on its own database. When not running under xdist, + ``_{default_test_db}`` is appended instead. + + The worker name is read from the ``PYTEST_XDIST_WORKER`` environment + variable (set automatically by xdist in each worker process). + + Args: + database_url: Original database connection URL. + default_test_db: Suffix appended to the database name when + ``PYTEST_XDIST_WORKER`` is not set. + + Returns: + A database URL with a worker- or default-specific database name. + """ + worker = _get_xdist_worker(default_test_db=default_test_db) + + url = make_url(database_url) + url = url.set(database=f"{url.database}_{worker}") + return url.render_as_string(hide_password=False) + + +@asynccontextmanager +async def create_worker_database( + database_url: str, + default_test_db: str = "test_db", +) -> AsyncGenerator[str, None]: + """Create and drop a per-worker database for pytest-xdist isolation. + + Derives a worker-specific database URL using :func:`worker_database_url`, + then delegates to :func:`~fastapi_toolsets.db.create_database` to create + and drop it. Intended for use as a **session-scoped** fixture. + + When running under xdist the database name is suffixed with the worker + name (e.g. ``_gw0``). Otherwise it is suffixed with *default_test_db*. + + Args: + database_url: Original database connection URL (used as the server + connection and as the base for the worker database name). + default_test_db: Suffix appended to the database name when + ``PYTEST_XDIST_WORKER`` is not set. Defaults to ``"test_db"``. + + Yields: + The worker-specific database URL. + + Example: + ```python + from fastapi_toolsets.pytest import create_worker_database, create_db_session + + DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost/test_db" + + @pytest.fixture(scope="session") + async def worker_db_url(): + async with create_worker_database(DATABASE_URL) as url: + yield url + + @pytest.fixture + async def db_session(worker_db_url): + async with create_db_session( + worker_db_url, Base, cleanup=True + ) as session: + yield session + ``` + """ + worker_url = worker_database_url( + database_url=database_url, default_test_db=default_test_db + ) + worker_db_name: str = make_url(worker_url).database # type: ignore[assignment] + + engine = create_async_engine(database_url, isolation_level="AUTOCOMMIT") + try: + async with engine.connect() as conn: + await conn.execute(text(f"DROP DATABASE IF EXISTS {worker_db_name}")) + await create_database(db_name=worker_db_name, server_url=database_url) + + yield worker_url + + async with engine.connect() as conn: + await conn.execute(text(f"DROP DATABASE IF EXISTS {worker_db_name}")) + finally: + await engine.dispose() @asynccontextmanager @@ -156,160 +283,3 @@ async def create_db_session( await conn.run_sync(base.metadata.drop_all) finally: await engine.dispose() - - -def _get_xdist_worker(default_test_db: str) -> str: - """Return the pytest-xdist worker name, or *default_test_db* when not running under xdist. - - Reads the ``PYTEST_XDIST_WORKER`` environment variable that xdist sets - automatically in each worker process (e.g. ``"gw0"``, ``"gw1"``). - When xdist is not installed or not active, the variable is absent and - *default_test_db* is returned instead. - - Args: - default_test_db: Fallback value returned when ``PYTEST_XDIST_WORKER`` - is not set. - """ - return os.environ.get("PYTEST_XDIST_WORKER", default_test_db) - - -def worker_database_url(database_url: str, default_test_db: str) -> str: - """Derive a per-worker database URL for pytest-xdist parallel runs. - - Appends ``_{worker_name}`` to the database name so each xdist worker - operates on its own database. When not running under xdist, - ``_{default_test_db}`` is appended instead. - - The worker name is read from the ``PYTEST_XDIST_WORKER`` environment - variable (set automatically by xdist in each worker process). - - Args: - database_url: Original database connection URL. - default_test_db: Suffix appended to the database name when - ``PYTEST_XDIST_WORKER`` is not set. - - Returns: - A database URL with a worker- or default-specific database name. - - Example: - ```python - # With PYTEST_XDIST_WORKER="gw0": - url = worker_database_url( - "postgresql+asyncpg://user:pass@localhost/test_db", - default_test_db="test", - ) - # "postgresql+asyncpg://user:pass@localhost/test_db_gw0" - - # Without PYTEST_XDIST_WORKER: - url = worker_database_url( - "postgresql+asyncpg://user:pass@localhost/test_db", - default_test_db="test", - ) - # "postgresql+asyncpg://user:pass@localhost/test_db_test" - ``` - """ - worker = _get_xdist_worker(default_test_db=default_test_db) - - url = make_url(database_url) - url = url.set(database=f"{url.database}_{worker}") - return url.render_as_string(hide_password=False) - - -@asynccontextmanager -async def create_worker_database( - database_url: str, - default_test_db: str = "test_db", -) -> AsyncGenerator[str, None]: - """Create and drop a per-worker database for pytest-xdist isolation. - - Intended for use as a **session-scoped** fixture. Connects to the server - using the original *database_url* (with ``AUTOCOMMIT`` isolation for DDL), - creates a dedicated database for the worker, and yields the worker-specific - URL. On cleanup the worker database is dropped. - - When running under xdist the database name is suffixed with the worker - name (e.g. ``_gw0``). Otherwise it is suffixed with *default_test_db*. - - Args: - database_url: Original database connection URL. - default_test_db: Suffix appended to the database name when - ``PYTEST_XDIST_WORKER`` is not set. Defaults to ``"test_db"``. - - Yields: - The worker-specific database URL. - - Example: - ```python - from fastapi_toolsets.pytest import ( - create_worker_database, create_db_session, - ) - - DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost/test_db" - - @pytest.fixture(scope="session") - async def worker_db_url(): - async with create_worker_database(DATABASE_URL) as url: - yield url - - @pytest.fixture - async def db_session(worker_db_url): - async with create_db_session( - worker_db_url, Base, cleanup=True - ) as session: - yield session - ``` - """ - worker_url = worker_database_url( - database_url=database_url, default_test_db=default_test_db - ) - worker_db_name = make_url(worker_url).database - - engine = create_async_engine( - database_url, - isolation_level="AUTOCOMMIT", - ) - try: - async with engine.connect() as conn: - await conn.execute(text(f"DROP DATABASE IF EXISTS {worker_db_name}")) - await conn.execute(text(f"CREATE DATABASE {worker_db_name}")) - - yield worker_url - - async with engine.connect() as conn: - await conn.execute(text(f"DROP DATABASE IF EXISTS {worker_db_name}")) - finally: - await engine.dispose() - - -async def cleanup_tables( - session: AsyncSession, - base: type[DeclarativeBase], -) -> None: - """Truncate all tables for fast between-test cleanup. - - Executes a single ``TRUNCATE … RESTART IDENTITY CASCADE`` statement - across every table in *base*'s metadata, which is significantly faster - than dropping and re-creating tables between tests. - - This is a no-op when the metadata contains no tables. - - Args: - session: An active async database session. - base: SQLAlchemy DeclarativeBase class containing model metadata. - - Example: - ```python - @pytest.fixture - async def db_session(worker_db_url): - async with create_db_session(worker_db_url, Base) as session: - yield session - await cleanup_tables(session, Base) - ``` - """ - tables = base.metadata.sorted_tables - if not tables: - return - - table_names = ", ".join(f'"{t.name}"' for t in tables) - await session.execute(text(f"TRUNCATE {table_names} RESTART IDENTITY CASCADE")) - await session.commit() diff --git a/tests/test_db.py b/tests/test_db.py index 9bd537d..0cfad0b 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -4,10 +4,15 @@ import asyncio import uuid import pytest +from sqlalchemy import text +from sqlalchemy.engine import make_url from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy.orm import DeclarativeBase from fastapi_toolsets.db import ( LockMode, + cleanup_tables, + create_database, create_db_context, create_db_dependency, get_transaction, @@ -15,8 +20,9 @@ from fastapi_toolsets.db import ( wait_for_row_change, ) from fastapi_toolsets.exceptions import NotFoundError +from fastapi_toolsets.pytest import create_db_session -from .conftest import DATABASE_URL, Base, Role, RoleCrud, User +from .conftest import DATABASE_URL, Base, Role, RoleCrud, User, UserCrud class TestCreateDbDependency: @@ -344,3 +350,83 @@ class TestWaitForRowChange: with pytest.raises(NotFoundError): await wait_for_row_change(db_session, Role, role.id, interval=0.05) await delete_task + + +class TestCreateDatabase: + """Tests for create_database.""" + + @pytest.mark.anyio + async def test_creates_database(self): + """Database is created by create_database.""" + target_url = ( + make_url(DATABASE_URL) + .set(database="test_create_db_general") + .render_as_string(hide_password=False) + ) + expected_db: str = make_url(target_url).database # type: ignore[assignment] + + engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT") + try: + async with engine.connect() as conn: + await conn.execute(text(f"DROP DATABASE IF EXISTS {expected_db}")) + + await create_database(db_name=expected_db, server_url=DATABASE_URL) + + async with engine.connect() as conn: + result = await conn.execute( + text("SELECT 1 FROM pg_database WHERE datname = :name"), + {"name": expected_db}, + ) + assert result.scalar() == 1 + + # Cleanup + async with engine.connect() as conn: + await conn.execute(text(f"DROP DATABASE IF EXISTS {expected_db}")) + finally: + await engine.dispose() + + +class TestCleanupTables: + """Tests for cleanup_tables helper.""" + + @pytest.mark.anyio + async def test_truncates_all_tables(self): + """All table rows are removed after cleanup_tables.""" + async with create_db_session(DATABASE_URL, Base, drop_tables=True) as session: + role = Role(id=uuid.uuid4(), name="cleanup_role") + session.add(role) + await session.flush() + + user = User( + id=uuid.uuid4(), + username="cleanup_user", + email="cleanup@test.com", + role_id=role.id, + ) + session.add(user) + await session.commit() + + # Verify rows exist + roles_count = await RoleCrud.count(session) + users_count = await UserCrud.count(session) + assert roles_count == 1 + assert users_count == 1 + + await cleanup_tables(session, Base) + + # Verify tables are empty + roles_count = await RoleCrud.count(session) + users_count = await UserCrud.count(session) + assert roles_count == 0 + assert users_count == 0 + + @pytest.mark.anyio + async def test_noop_for_empty_metadata(self): + """cleanup_tables does not raise when metadata has no tables.""" + + class EmptyBase(DeclarativeBase): + pass + + async with create_db_session(DATABASE_URL, Base, drop_tables=True) as session: + # Should not raise + await cleanup_tables(session, EmptyBase) diff --git a/tests/test_pytest.py b/tests/test_pytest.py index d51947b..7791f78 100644 --- a/tests/test_pytest.py +++ b/tests/test_pytest.py @@ -8,11 +8,10 @@ from httpx import AsyncClient from sqlalchemy import select, text from sqlalchemy.engine import make_url from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine -from sqlalchemy.orm import DeclarativeBase, selectinload +from sqlalchemy.orm import selectinload from fastapi_toolsets.fixtures import Context, FixtureRegistry from fastapi_toolsets.pytest import ( - cleanup_tables, create_async_client, create_db_session, create_worker_database, @@ -406,7 +405,6 @@ class TestCreateWorkerDatabase: ) as url: assert make_url(url).database == expected_db - # Verify the database exists while inside the context engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT") async with engine.connect() as conn: result = await conn.execute( @@ -416,7 +414,6 @@ class TestCreateWorkerDatabase: assert result.scalar() == 1 await engine.dispose() - # After context exit the database should be dropped engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT") async with engine.connect() as conn: result = await conn.execute( @@ -439,7 +436,6 @@ class TestCreateWorkerDatabase: async with create_worker_database(DATABASE_URL) as url: assert make_url(url).database == expected_db - # Verify the database exists while inside the context engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT") async with engine.connect() as conn: result = await conn.execute( @@ -449,7 +445,6 @@ class TestCreateWorkerDatabase: assert result.scalar() == 1 await engine.dispose() - # After context exit the database should be dropped engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT") async with engine.connect() as conn: result = await conn.execute( @@ -467,18 +462,15 @@ class TestCreateWorkerDatabase: worker_database_url(DATABASE_URL, default_test_db="unused") ).database - # Pre-create the database to simulate a stale leftover engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT") async with engine.connect() as conn: await conn.execute(text(f"DROP DATABASE IF EXISTS {expected_db}")) await conn.execute(text(f"CREATE DATABASE {expected_db}")) await engine.dispose() - # Should succeed despite the database already existing async with create_worker_database(DATABASE_URL) as url: assert make_url(url).database == expected_db - # Verify cleanup after context exit engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT") async with engine.connect() as conn: result = await conn.execute( @@ -487,49 +479,3 @@ class TestCreateWorkerDatabase: ) assert result.scalar() is None await engine.dispose() - - -class TestCleanupTables: - """Tests for cleanup_tables helper.""" - - @pytest.mark.anyio - async def test_truncates_all_tables(self): - """All table rows are removed after cleanup_tables.""" - async with create_db_session(DATABASE_URL, Base, drop_tables=True) as session: - role = Role(id=uuid.uuid4(), name="cleanup_role") - session.add(role) - await session.flush() - - user = User( - id=uuid.uuid4(), - username="cleanup_user", - email="cleanup@test.com", - role_id=role.id, - ) - session.add(user) - await session.commit() - - # Verify rows exist - roles_count = await RoleCrud.count(session) - users_count = await UserCrud.count(session) - assert roles_count == 1 - assert users_count == 1 - - await cleanup_tables(session, Base) - - # Verify tables are empty - roles_count = await RoleCrud.count(session) - users_count = await UserCrud.count(session) - assert roles_count == 0 - assert users_count == 0 - - @pytest.mark.anyio - async def test_noop_for_empty_metadata(self): - """cleanup_tables does not raise when metadata has no tables.""" - - class EmptyBase(DeclarativeBase): - pass - - async with create_db_session(DATABASE_URL, Base, drop_tables=True) as session: - # Should not raise - await cleanup_tables(session, EmptyBase)