From ced1a655f24e91778c777159df5ebd013a43137a Mon Sep 17 00:00:00 2001 From: d3vyce <44915747+d3vyce@users.noreply.github.com> Date: Tue, 10 Feb 2026 21:29:17 +0100 Subject: [PATCH] feat: add support for pytest-xdist (#47) --- pyproject.toml | 1 + src/fastapi_toolsets/pytest/__init__.py | 11 +- src/fastapi_toolsets/pytest/utils.py | 153 ++++++++++++++++++++- tests/test_pytest.py | 172 +++++++++++++++++++++++- uv.lock | 25 ++++ 5 files changed, 357 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index f14821e..b8d3ab4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,6 +49,7 @@ Issues = "https://github.com/d3vyce/fastapi-toolsets/issues" test = [ "pytest>=8.0.0", "pytest-anyio>=0.0.0", + "pytest-xdist>=3.0.0", "coverage>=7.0.0", "pytest-cov>=4.0.0", ] diff --git a/src/fastapi_toolsets/pytest/__init__.py b/src/fastapi_toolsets/pytest/__init__.py index 7040c89..086a176 100644 --- a/src/fastapi_toolsets/pytest/__init__.py +++ b/src/fastapi_toolsets/pytest/__init__.py @@ -1,8 +1,17 @@ from .plugin import register_fixtures -from .utils import create_async_client, create_db_session +from .utils import ( + cleanup_tables, + create_async_client, + create_db_session, + create_worker_database, + worker_database_url, +) __all__ = [ + "cleanup_tables", "create_async_client", "create_db_session", + "create_worker_database", "register_fixtures", + "worker_database_url", ] diff --git a/src/fastapi_toolsets/pytest/utils.py b/src/fastapi_toolsets/pytest/utils.py index c327738..887f488 100644 --- a/src/fastapi_toolsets/pytest/utils.py +++ b/src/fastapi_toolsets/pytest/utils.py @@ -1,11 +1,18 @@ """Pytest helper utilities for FastAPI testing.""" +import os from collections.abc import AsyncGenerator from contextlib import asynccontextmanager from typing import Any from httpx import ASGITransport, AsyncClient -from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy import text +from sqlalchemy.engine import make_url +from sqlalchemy.ext.asyncio import ( + AsyncSession, + async_sessionmaker, + create_async_engine, +) from sqlalchemy.orm import DeclarativeBase from ..db import create_db_context @@ -108,3 +115,147 @@ async def create_db_session( await conn.run_sync(base.metadata.drop_all) finally: await engine.dispose() + + +def _get_xdist_worker() -> str | None: + """Return the pytest-xdist worker name, or ``None`` when not running under xdist. + + Reads the ``PYTEST_XDIST_WORKER`` environment variable that xdist sets + automatically in each worker process (e.g. ``"gw0"``, ``"gw1"``). + When xdist is not installed or not active, the variable is absent and + ``None`` is returned. + """ + return os.environ.get("PYTEST_XDIST_WORKER") + + +def worker_database_url(database_url: str) -> str: + """Derive a per-worker database URL for pytest-xdist parallel runs. + + Appends ``_{worker_name}`` to the database name so each xdist worker + operates on its own database. When not running under xdist the + original URL is returned unchanged. + + The worker name is read from the ``PYTEST_XDIST_WORKER`` environment + variable (set automatically by xdist in each worker process). + + Args: + database_url: Original database connection URL. + + Returns: + A database URL with the worker-specific database name, or the + original URL when not running under xdist. + + Example: + ```python + # With PYTEST_XDIST_WORKER="gw0": + url = worker_database_url( + "postgresql+asyncpg://user:pass@localhost/test_db" + ) + # "postgresql+asyncpg://user:pass@localhost/test_db_gw0" + ``` + """ + worker = _get_xdist_worker() + if worker is None: + return database_url + + url = make_url(database_url) + url = url.set(database=f"{url.database}_{worker}") + return url.render_as_string(hide_password=False) + + +@asynccontextmanager +async def create_worker_database( + database_url: str, +) -> AsyncGenerator[str, None]: + """Create and drop a per-worker database for pytest-xdist isolation. + + Intended for use as a **session-scoped** fixture. Connects to the server + using the original *database_url* (with ``AUTOCOMMIT`` isolation for DDL), + creates a dedicated database for the worker, and yields the worker-specific + URL. On cleanup the worker database is dropped. + + When not running under xdist (``PYTEST_XDIST_WORKER`` is unset), the + original URL is yielded without any database creation or teardown. + + Args: + database_url: Original database connection URL. + + Yields: + The worker-specific database URL. + + Example: + ```python + from fastapi_toolsets.pytest import ( + create_worker_database, create_db_session, cleanup_tables + ) + + DATABASE_URL = "postgresql+asyncpg://postgres:postgres@localhost/test_db" + + @pytest.fixture(scope="session") + async def worker_db_url(): + async with create_worker_database(DATABASE_URL) as url: + yield url + + @pytest.fixture + async def db_session(worker_db_url): + async with create_db_session(worker_db_url, Base) as session: + yield session + await cleanup_tables(session, Base) + ``` + """ + if _get_xdist_worker() is None: + yield database_url + return + + worker_url = worker_database_url(database_url) + worker_db_name = make_url(worker_url).database + + engine = create_async_engine( + database_url, + isolation_level="AUTOCOMMIT", + ) + try: + async with engine.connect() as conn: + await conn.execute(text(f"DROP DATABASE IF EXISTS {worker_db_name}")) + await conn.execute(text(f"CREATE DATABASE {worker_db_name}")) + + yield worker_url + + async with engine.connect() as conn: + await conn.execute(text(f"DROP DATABASE IF EXISTS {worker_db_name}")) + finally: + await engine.dispose() + + +async def cleanup_tables( + session: AsyncSession, + base: type[DeclarativeBase], +) -> None: + """Truncate all tables for fast between-test cleanup. + + Executes a single ``TRUNCATE … RESTART IDENTITY CASCADE`` statement + across every table in *base*'s metadata, which is significantly faster + than dropping and re-creating tables between tests. + + This is a no-op when the metadata contains no tables. + + Args: + session: An active async database session. + base: SQLAlchemy DeclarativeBase class containing model metadata. + + Example: + ```python + @pytest.fixture + async def db_session(worker_db_url): + async with create_db_session(worker_db_url, Base) as session: + yield session + await cleanup_tables(session, Base) + ``` + """ + tables = base.metadata.sorted_tables + if not tables: + return + + table_names = ", ".join(f'"{t.name}"' for t in tables) + await session.execute(text(f"TRUNCATE {table_names} RESTART IDENTITY CASCADE")) + await session.commit() diff --git a/tests/test_pytest.py b/tests/test_pytest.py index 453861c..35e75b6 100644 --- a/tests/test_pytest.py +++ b/tests/test_pytest.py @@ -5,16 +5,21 @@ import uuid import pytest from fastapi import FastAPI from httpx import AsyncClient -from sqlalchemy import select -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import selectinload +from sqlalchemy import select, text +from sqlalchemy.engine import make_url +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlalchemy.orm import DeclarativeBase, selectinload from fastapi_toolsets.fixtures import Context, FixtureRegistry from fastapi_toolsets.pytest import ( + cleanup_tables, create_async_client, create_db_session, + create_worker_database, register_fixtures, + worker_database_url, ) +from fastapi_toolsets.pytest.utils import _get_xdist_worker from .conftest import DATABASE_URL, Base, Role, RoleCrud, User, UserCrud @@ -291,3 +296,164 @@ class TestCreateDbSession: # Cleanup: drop tables manually async with create_db_session(DATABASE_URL, Base, drop_tables=True) as _: pass + + +class TestGetXdistWorker: + """Tests for get_xdist_worker helper.""" + + def test_returns_none_without_env_var(self, monkeypatch: pytest.MonkeyPatch): + """Returns None when PYTEST_XDIST_WORKER is not set.""" + monkeypatch.delenv("PYTEST_XDIST_WORKER", raising=False) + assert _get_xdist_worker() is None + + def test_returns_worker_name(self, monkeypatch: pytest.MonkeyPatch): + """Returns the worker name from the environment variable.""" + monkeypatch.setenv("PYTEST_XDIST_WORKER", "gw0") + assert _get_xdist_worker() == "gw0" + + +class TestWorkerDatabaseUrl: + """Tests for worker_database_url helper.""" + + def test_returns_original_url_without_xdist(self, monkeypatch: pytest.MonkeyPatch): + """URL is returned unchanged when not running under xdist.""" + monkeypatch.delenv("PYTEST_XDIST_WORKER", raising=False) + url = "postgresql+asyncpg://user:pass@localhost:5432/mydb" + assert worker_database_url(url) == url + + def test_appends_worker_id_to_database_name(self, monkeypatch: pytest.MonkeyPatch): + """Worker name is appended to the database name.""" + monkeypatch.setenv("PYTEST_XDIST_WORKER", "gw0") + url = "postgresql+asyncpg://user:pass@localhost:5432/db" + result = worker_database_url(url) + assert make_url(result).database == "db_gw0" + + def test_preserves_url_components(self, monkeypatch: pytest.MonkeyPatch): + """Host, port, username, password, and driver are preserved.""" + monkeypatch.setenv("PYTEST_XDIST_WORKER", "gw2") + url = "postgresql+asyncpg://myuser:secret@dbhost:6543/testdb" + result = make_url(worker_database_url(url)) + + assert result.drivername == "postgresql+asyncpg" + assert result.username == "myuser" + assert result.password == "secret" + assert result.host == "dbhost" + assert result.port == 6543 + assert result.database == "testdb_gw2" + + +class TestCreateWorkerDatabase: + """Tests for create_worker_database context manager.""" + + @pytest.mark.anyio + async def test_yields_original_url_without_xdist( + self, monkeypatch: pytest.MonkeyPatch + ): + """Without xdist, yields the original URL without database operations.""" + monkeypatch.delenv("PYTEST_XDIST_WORKER", raising=False) + async with create_worker_database(DATABASE_URL) as url: + assert url == DATABASE_URL + + @pytest.mark.anyio + async def test_creates_and_drops_worker_database( + self, monkeypatch: pytest.MonkeyPatch + ): + """Worker database exists inside the context and is dropped after.""" + monkeypatch.setenv("PYTEST_XDIST_WORKER", "gw_test_create") + expected_db = make_url(worker_database_url(DATABASE_URL)).database + + async with create_worker_database(DATABASE_URL) as url: + assert make_url(url).database == expected_db + + # Verify the database exists while inside the context + engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT") + async with engine.connect() as conn: + result = await conn.execute( + text("SELECT 1 FROM pg_database WHERE datname = :name"), + {"name": expected_db}, + ) + assert result.scalar() == 1 + await engine.dispose() + + # After context exit the database should be dropped + engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT") + async with engine.connect() as conn: + result = await conn.execute( + text("SELECT 1 FROM pg_database WHERE datname = :name"), + {"name": expected_db}, + ) + assert result.scalar() is None + await engine.dispose() + + @pytest.mark.anyio + async def test_cleans_up_stale_database(self, monkeypatch: pytest.MonkeyPatch): + """A pre-existing worker database is dropped and recreated.""" + monkeypatch.setenv("PYTEST_XDIST_WORKER", "gw_test_stale") + expected_db = make_url(worker_database_url(DATABASE_URL)).database + + # Pre-create the database to simulate a stale leftover + engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT") + async with engine.connect() as conn: + await conn.execute(text(f"DROP DATABASE IF EXISTS {expected_db}")) + await conn.execute(text(f"CREATE DATABASE {expected_db}")) + await engine.dispose() + + # Should succeed despite the database already existing + async with create_worker_database(DATABASE_URL) as url: + assert make_url(url).database == expected_db + + # Verify cleanup after context exit + engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT") + async with engine.connect() as conn: + result = await conn.execute( + text("SELECT 1 FROM pg_database WHERE datname = :name"), + {"name": expected_db}, + ) + assert result.scalar() is None + await engine.dispose() + + +class TestCleanupTables: + """Tests for cleanup_tables helper.""" + + @pytest.mark.anyio + async def test_truncates_all_tables(self): + """All table rows are removed after cleanup_tables.""" + async with create_db_session(DATABASE_URL, Base, drop_tables=True) as session: + role = Role(id=uuid.uuid4(), name="cleanup_role") + session.add(role) + await session.flush() + + user = User( + id=uuid.uuid4(), + username="cleanup_user", + email="cleanup@test.com", + role_id=role.id, + ) + session.add(user) + await session.commit() + + # Verify rows exist + roles_count = await RoleCrud.count(session) + users_count = await UserCrud.count(session) + assert roles_count == 1 + assert users_count == 1 + + await cleanup_tables(session, Base) + + # Verify tables are empty + roles_count = await RoleCrud.count(session) + users_count = await UserCrud.count(session) + assert roles_count == 0 + assert users_count == 0 + + @pytest.mark.anyio + async def test_noop_for_empty_metadata(self): + """cleanup_tables does not raise when metadata has no tables.""" + + class EmptyBase(DeclarativeBase): + pass + + async with create_db_session(DATABASE_URL, Base, drop_tables=True) as session: + # Should not raise + await cleanup_tables(session, EmptyBase) diff --git a/uv.lock b/uv.lock index 0af5b3e..fc4d60d 100644 --- a/uv.lock +++ b/uv.lock @@ -203,6 +203,15 @@ toml = [ { name = "tomli", marker = "python_full_version <= '3.11'" }, ] +[[package]] +name = "execnet" +version = "2.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/bf/89/780e11f9588d9e7128a3f87788354c7946a9cbb1401ad38a48c4db9a4f07/execnet-2.1.2.tar.gz", hash = "sha256:63d83bfdd9a23e35b9c6a3261412324f964c2ec8dcd8d3c6916ee9373e0befcd", size = 166622, upload-time = "2025-11-12T09:56:37.75Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ab/84/02fc1827e8cdded4aa65baef11296a9bbe595c474f0d6d758af082d849fd/execnet-2.1.2-py3-none-any.whl", hash = "sha256:67fba928dd5a544b783f6056f449e5e3931a5c378b128bc18501f7ea79e296ec", size = 40708, upload-time = "2025-11-12T09:56:36.333Z" }, +] + [[package]] name = "fastapi" version = "0.128.1" @@ -237,6 +246,7 @@ dev = [ { name = "pytest" }, { name = "pytest-anyio" }, { name = "pytest-cov" }, + { name = "pytest-xdist" }, { name = "ruff" }, { name = "ty" }, ] @@ -245,6 +255,7 @@ test = [ { name = "pytest" }, { name = "pytest-anyio" }, { name = "pytest-cov" }, + { name = "pytest-xdist" }, ] [package.metadata] @@ -258,6 +269,7 @@ requires-dist = [ { name = "pytest", marker = "extra == 'test'", specifier = ">=8.0.0" }, { name = "pytest-anyio", marker = "extra == 'test'", specifier = ">=0.0.0" }, { name = "pytest-cov", marker = "extra == 'test'", specifier = ">=4.0.0" }, + { name = "pytest-xdist", marker = "extra == 'test'", specifier = ">=3.0.0" }, { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.1.0" }, { name = "sqlalchemy", extras = ["asyncio"], specifier = ">=2.0" }, { name = "ty", marker = "extra == 'dev'", specifier = ">=0.0.1a0" }, @@ -575,6 +587,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" }, ] +[[package]] +name = "pytest-xdist" +version = "3.8.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "execnet" }, + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/78/b4/439b179d1ff526791eb921115fca8e44e596a13efeda518b9d845a619450/pytest_xdist-3.8.0.tar.gz", hash = "sha256:7e578125ec9bc6050861aa93f2d59f1d8d085595d6551c2c90b6f4fad8d3a9f1", size = 88069, upload-time = "2025-07-01T13:30:59.346Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ca/31/d4e37e9e550c2b92a9cbc2e4d0b7420a27224968580b5a447f420847c975/pytest_xdist-3.8.0-py3-none-any.whl", hash = "sha256:202ca578cfeb7370784a8c33d6d05bc6e13b4f25b5053c30a152269fd10f0b88", size = 46396, upload-time = "2025-07-01T13:30:56.632Z" }, +] + [[package]] name = "rich" version = "14.3.2"