1 Commits

Author SHA1 Message Date
d3vyce
c8c263ca8f fix: don't use default DB if pytest-xdist is not present (#51) 2026-02-11 17:07:15 +01:00
2 changed files with 84 additions and 36 deletions

View File

@@ -117,46 +117,57 @@ async def create_db_session(
await engine.dispose()
def _get_xdist_worker() -> str | None:
"""Return the pytest-xdist worker name, or ``None`` when not running under xdist.
def _get_xdist_worker(default_test_db: str) -> str:
"""Return the pytest-xdist worker name, or *default_test_db* when not running under xdist.
Reads the ``PYTEST_XDIST_WORKER`` environment variable that xdist sets
automatically in each worker process (e.g. ``"gw0"``, ``"gw1"``).
When xdist is not installed or not active, the variable is absent and
``None`` is returned.
*default_test_db* is returned instead.
Args:
default_test_db: Fallback value returned when ``PYTEST_XDIST_WORKER``
is not set.
"""
return os.environ.get("PYTEST_XDIST_WORKER")
return os.environ.get("PYTEST_XDIST_WORKER", default_test_db)
def worker_database_url(database_url: str) -> str:
def worker_database_url(database_url: str, default_test_db: str) -> str:
"""Derive a per-worker database URL for pytest-xdist parallel runs.
Appends ``_{worker_name}`` to the database name so each xdist worker
operates on its own database. When not running under xdist the
original URL is returned unchanged.
operates on its own database. When not running under xdist,
``_{default_test_db}`` is appended instead.
The worker name is read from the ``PYTEST_XDIST_WORKER`` environment
variable (set automatically by xdist in each worker process).
Args:
database_url: Original database connection URL.
default_test_db: Suffix appended to the database name when
``PYTEST_XDIST_WORKER`` is not set.
Returns:
A database URL with the worker-specific database name, or the
original URL when not running under xdist.
A database URL with a worker- or default-specific database name.
Example:
```python
# With PYTEST_XDIST_WORKER="gw0":
url = worker_database_url(
"postgresql+asyncpg://user:pass@localhost/test_db"
"postgresql+asyncpg://user:pass@localhost/test_db",
default_test_db="test",
)
# "postgresql+asyncpg://user:pass@localhost/test_db_gw0"
# Without PYTEST_XDIST_WORKER:
url = worker_database_url(
"postgresql+asyncpg://user:pass@localhost/test_db",
default_test_db="test",
)
# "postgresql+asyncpg://user:pass@localhost/test_db_test"
```
"""
worker = _get_xdist_worker()
if worker is None:
return database_url
worker = _get_xdist_worker(default_test_db=default_test_db)
url = make_url(database_url)
url = url.set(database=f"{url.database}_{worker}")
@@ -166,6 +177,7 @@ def worker_database_url(database_url: str) -> str:
@asynccontextmanager
async def create_worker_database(
database_url: str,
default_test_db: str = "test_db",
) -> AsyncGenerator[str, None]:
"""Create and drop a per-worker database for pytest-xdist isolation.
@@ -174,11 +186,13 @@ async def create_worker_database(
creates a dedicated database for the worker, and yields the worker-specific
URL. On cleanup the worker database is dropped.
When not running under xdist (``PYTEST_XDIST_WORKER`` is unset), the
original URL is yielded without any database creation or teardown.
When running under xdist the database name is suffixed with the worker
name (e.g. ``_gw0``). Otherwise it is suffixed with *default_test_db*.
Args:
database_url: Original database connection URL.
default_test_db: Suffix appended to the database name when
``PYTEST_XDIST_WORKER`` is not set. Defaults to ``"test_db"``.
Yields:
The worker-specific database URL.
@@ -203,11 +217,9 @@ async def create_worker_database(
await cleanup_tables(session, Base)
```
"""
if _get_xdist_worker() is None:
yield database_url
return
worker_url = worker_database_url(database_url)
worker_url = worker_database_url(
database_url=database_url, default_test_db=default_test_db
)
worker_db_name = make_url(worker_url).database
engine = create_async_engine(

View File

@@ -299,40 +299,45 @@ class TestCreateDbSession:
class TestGetXdistWorker:
"""Tests for get_xdist_worker helper."""
"""Tests for _get_xdist_worker helper."""
def test_returns_none_without_env_var(self, monkeypatch: pytest.MonkeyPatch):
"""Returns None when PYTEST_XDIST_WORKER is not set."""
def test_returns_default_test_db_without_env_var(
self, monkeypatch: pytest.MonkeyPatch
):
"""Returns default_test_db when PYTEST_XDIST_WORKER is not set."""
monkeypatch.delenv("PYTEST_XDIST_WORKER", raising=False)
assert _get_xdist_worker() is None
assert _get_xdist_worker("my_default") == "my_default"
def test_returns_worker_name(self, monkeypatch: pytest.MonkeyPatch):
"""Returns the worker name from the environment variable."""
monkeypatch.setenv("PYTEST_XDIST_WORKER", "gw0")
assert _get_xdist_worker() == "gw0"
assert _get_xdist_worker("ignored") == "gw0"
class TestWorkerDatabaseUrl:
"""Tests for worker_database_url helper."""
def test_returns_original_url_without_xdist(self, monkeypatch: pytest.MonkeyPatch):
"""URL is returned unchanged when not running under xdist."""
def test_appends_default_test_db_without_xdist(
self, monkeypatch: pytest.MonkeyPatch
):
"""default_test_db is appended when not running under xdist."""
monkeypatch.delenv("PYTEST_XDIST_WORKER", raising=False)
url = "postgresql+asyncpg://user:pass@localhost:5432/mydb"
assert worker_database_url(url) == url
result = worker_database_url(url, default_test_db="fallback")
assert make_url(result).database == "mydb_fallback"
def test_appends_worker_id_to_database_name(self, monkeypatch: pytest.MonkeyPatch):
"""Worker name is appended to the database name."""
monkeypatch.setenv("PYTEST_XDIST_WORKER", "gw0")
url = "postgresql+asyncpg://user:pass@localhost:5432/db"
result = worker_database_url(url)
result = worker_database_url(url, default_test_db="unused")
assert make_url(result).database == "db_gw0"
def test_preserves_url_components(self, monkeypatch: pytest.MonkeyPatch):
"""Host, port, username, password, and driver are preserved."""
monkeypatch.setenv("PYTEST_XDIST_WORKER", "gw2")
url = "postgresql+asyncpg://myuser:secret@dbhost:6543/testdb"
result = make_url(worker_database_url(url))
result = make_url(worker_database_url(url, default_test_db="unused"))
assert result.drivername == "postgresql+asyncpg"
assert result.username == "myuser"
@@ -346,13 +351,40 @@ class TestCreateWorkerDatabase:
"""Tests for create_worker_database context manager."""
@pytest.mark.anyio
async def test_yields_original_url_without_xdist(
async def test_creates_default_db_without_xdist(
self, monkeypatch: pytest.MonkeyPatch
):
"""Without xdist, yields the original URL without database operations."""
"""Without xdist, creates a database suffixed with default_test_db."""
monkeypatch.delenv("PYTEST_XDIST_WORKER", raising=False)
async with create_worker_database(DATABASE_URL) as url:
assert url == DATABASE_URL
default_test_db = "no_xdist_default"
expected_db = make_url(
worker_database_url(DATABASE_URL, default_test_db=default_test_db)
).database
async with create_worker_database(
DATABASE_URL, default_test_db=default_test_db
) as url:
assert make_url(url).database == expected_db
# Verify the database exists while inside the context
engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT")
async with engine.connect() as conn:
result = await conn.execute(
text("SELECT 1 FROM pg_database WHERE datname = :name"),
{"name": expected_db},
)
assert result.scalar() == 1
await engine.dispose()
# After context exit the database should be dropped
engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT")
async with engine.connect() as conn:
result = await conn.execute(
text("SELECT 1 FROM pg_database WHERE datname = :name"),
{"name": expected_db},
)
assert result.scalar() is None
await engine.dispose()
@pytest.mark.anyio
async def test_creates_and_drops_worker_database(
@@ -360,7 +392,9 @@ class TestCreateWorkerDatabase:
):
"""Worker database exists inside the context and is dropped after."""
monkeypatch.setenv("PYTEST_XDIST_WORKER", "gw_test_create")
expected_db = make_url(worker_database_url(DATABASE_URL)).database
expected_db = make_url(
worker_database_url(DATABASE_URL, default_test_db="unused")
).database
async with create_worker_database(DATABASE_URL) as url:
assert make_url(url).database == expected_db
@@ -389,7 +423,9 @@ class TestCreateWorkerDatabase:
async def test_cleans_up_stale_database(self, monkeypatch: pytest.MonkeyPatch):
"""A pre-existing worker database is dropped and recreated."""
monkeypatch.setenv("PYTEST_XDIST_WORKER", "gw_test_stale")
expected_db = make_url(worker_database_url(DATABASE_URL)).database
expected_db = make_url(
worker_database_url(DATABASE_URL, default_test_db="unused")
).database
# Pre-create the database to simulate a stale leftover
engine = create_async_engine(DATABASE_URL, isolation_level="AUTOCOMMIT")