feat: add support for pytest-xdist (#47)

This commit is contained in:
d3vyce
2026-02-10 21:29:17 +01:00
committed by GitHub
parent 290b2a06ec
commit ced1a655f2
5 changed files with 357 additions and 5 deletions

View File

@@ -5,16 +5,21 @@ import uuid
import pytest
from fastapi import FastAPI
from httpx import AsyncClient
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from sqlalchemy import select, text
from sqlalchemy.engine import make_url
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, selectinload
from fastapi_toolsets.fixtures import Context, FixtureRegistry
from fastapi_toolsets.pytest import (
cleanup_tables,
create_async_client,
create_db_session,
create_worker_database,
register_fixtures,
worker_database_url,
)
from fastapi_toolsets.pytest.utils import _get_xdist_worker
from .conftest import DATABASE_URL, Base, Role, RoleCrud, User, UserCrud
@@ -291,3 +296,164 @@ class TestCreateDbSession:
# Cleanup: drop tables manually
async with create_db_session(DATABASE_URL, Base, drop_tables=True) as _:
pass
class TestGetXdistWorker:
"""Tests for get_xdist_worker helper."""
def test_returns_none_without_env_var(self, monkeypatch: pytest.MonkeyPatch):
"""Returns None when PYTEST_XDIST_WORKER is not set."""
monkeypatch.delenv("PYTEST_XDIST_WORKER", raising=False)
assert _get_xdist_worker() is None
def test_returns_worker_name(self, monkeypatch: pytest.MonkeyPatch):
"""Returns the worker name from the environment variable."""
monkeypatch.setenv("PYTEST_XDIST_WORKER", "gw0")
assert _get_xdist_worker() == "gw0"
class TestWorkerDatabaseUrl:
"""Tests for worker_database_url helper."""
def test_returns_original_url_without_xdist(self, monkeypatch: pytest.MonkeyPatch):
"""URL is returned unchanged when not running under xdist."""
monkeypatch.delenv("PYTEST_XDIST_WORKER", raising=False)
url = "postgresql+asyncpg://user:pass@localhost:5432/mydb"
assert worker_database_url(url) == url
def test_appends_worker_id_to_database_name(self, monkeypatch: pytest.MonkeyPatch):
"""Worker name is appended to the database name."""
monkeypatch.setenv("PYTEST_XDIST_WORKER", "gw0")
url = "postgresql+asyncpg://user:pass@localhost:5432/db"
result = worker_database_url(url)
assert make_url(result).database == "db_gw0"
def test_preserves_url_components(self, monkeypatch: pytest.MonkeyPatch):
"""Host, port, username, password, and driver are preserved."""
monkeypatch.setenv("PYTEST_XDIST_WORKER", "gw2")
url = "postgresql+asyncpg://myuser:secret@dbhost:6543/testdb"
result = make_url(worker_database_url(url))
assert result.drivername == "postgresql+asyncpg"
assert result.username == "myuser"
assert result.password == "secret"
assert result.host == "dbhost"
assert result.port == 6543
assert result.database == "testdb_gw2"
class TestCreateWorkerDatabase:
"""Tests for create_worker_database context manager."""
@pytest.mark.anyio
async def test_yields_original_url_without_xdist(
self, monkeypatch: pytest.MonkeyPatch
):
"""Without xdist, yields the original URL without database operations."""
monkeypatch.delenv("PYTEST_XDIST_WORKER", raising=False)
async with create_worker_database(DATABASE_URL) as url:
assert url == DATABASE_URL
@pytest.mark.anyio
async def test_creates_and_drops_worker_database(
self, monkeypatch: pytest.MonkeyPatch
):
"""Worker database exists inside the context and is dropped after."""
monkeypatch.setenv("PYTEST_XDIST_WORKER", "gw_test_create")
expected_db = make_url(worker_database_url(DATABASE_URL)).database
async with create_worker_database(DATABASE_URL) as url:
assert make_url(url).database == expected_db
# Verify the database exists while inside the context
engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT")
async with engine.connect() as conn:
result = await conn.execute(
text("SELECT 1 FROM pg_database WHERE datname = :name"),
{"name": expected_db},
)
assert result.scalar() == 1
await engine.dispose()
# After context exit the database should be dropped
engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT")
async with engine.connect() as conn:
result = await conn.execute(
text("SELECT 1 FROM pg_database WHERE datname = :name"),
{"name": expected_db},
)
assert result.scalar() is None
await engine.dispose()
@pytest.mark.anyio
async def test_cleans_up_stale_database(self, monkeypatch: pytest.MonkeyPatch):
"""A pre-existing worker database is dropped and recreated."""
monkeypatch.setenv("PYTEST_XDIST_WORKER", "gw_test_stale")
expected_db = make_url(worker_database_url(DATABASE_URL)).database
# Pre-create the database to simulate a stale leftover
engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT")
async with engine.connect() as conn:
await conn.execute(text(f"DROP DATABASE IF EXISTS {expected_db}"))
await conn.execute(text(f"CREATE DATABASE {expected_db}"))
await engine.dispose()
# Should succeed despite the database already existing
async with create_worker_database(DATABASE_URL) as url:
assert make_url(url).database == expected_db
# Verify cleanup after context exit
engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT")
async with engine.connect() as conn:
result = await conn.execute(
text("SELECT 1 FROM pg_database WHERE datname = :name"),
{"name": expected_db},
)
assert result.scalar() is None
await engine.dispose()
class TestCleanupTables:
"""Tests for cleanup_tables helper."""
@pytest.mark.anyio
async def test_truncates_all_tables(self):
"""All table rows are removed after cleanup_tables."""
async with create_db_session(DATABASE_URL, Base, drop_tables=True) as session:
role = Role(id=uuid.uuid4(), name="cleanup_role")
session.add(role)
await session.flush()
user = User(
id=uuid.uuid4(),
username="cleanup_user",
email="cleanup@test.com",
role_id=role.id,
)
session.add(user)
await session.commit()
# Verify rows exist
roles_count = await RoleCrud.count(session)
users_count = await UserCrud.count(session)
assert roles_count == 1
assert users_count == 1
await cleanup_tables(session, Base)
# Verify tables are empty
roles_count = await RoleCrud.count(session)
users_count = await UserCrud.count(session)
assert roles_count == 0
assert users_count == 0
@pytest.mark.anyio
async def test_noop_for_empty_metadata(self):
"""cleanup_tables does not raise when metadata has no tables."""
class EmptyBase(DeclarativeBase):
pass
async with create_db_session(DATABASE_URL, Base, drop_tables=True) as session:
# Should not raise
await cleanup_tables(session, EmptyBase)