Initial commit

This commit is contained in:
2026-01-25 16:11:44 +01:00
commit 762ed35341
29 changed files with 5072 additions and 0 deletions

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Tests for fastapi-utils package."""

199
tests/conftest.py Normal file
View File

@@ -0,0 +1,199 @@
"""Shared pytest fixtures for fastapi-utils tests."""
import os
import pytest
from pydantic import BaseModel
from sqlalchemy import ForeignKey, String
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from fastapi_toolsets.crud import CrudFactory
# PostgreSQL connection URL from environment or default for local development
DATABASE_URL = os.getenv(
"TEST_DATABASE_URL",
"postgresql+asyncpg://postgres:postgres@localhost:5432/fastapi_toolsets_test",
)
# =============================================================================
# Test Models
# =============================================================================
class Base(DeclarativeBase):
"""Base class for test models."""
pass
class Role(Base):
"""Test role model."""
__tablename__ = "roles"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
users: Mapped[list["User"]] = relationship(back_populates="role")
class User(Base):
"""Test user model."""
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True)
email: Mapped[str] = mapped_column(String(100), unique=True)
is_active: Mapped[bool] = mapped_column(default=True)
role_id: Mapped[int | None] = mapped_column(ForeignKey("roles.id"), nullable=True)
role: Mapped[Role | None] = relationship(back_populates="users")
class Post(Base):
"""Test post model."""
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str] = mapped_column(String(1000), default="")
is_published: Mapped[bool] = mapped_column(default=False)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
# =============================================================================
# Test Schemas
# =============================================================================
class RoleCreate(BaseModel):
"""Schema for creating a role."""
id: int | None = None
name: str
class RoleUpdate(BaseModel):
"""Schema for updating a role."""
name: str | None = None
class UserCreate(BaseModel):
"""Schema for creating a user."""
id: int | None = None
username: str
email: str
is_active: bool = True
role_id: int | None = None
class UserUpdate(BaseModel):
"""Schema for updating a user."""
username: str | None = None
email: str | None = None
is_active: bool | None = None
role_id: int | None = None
class PostCreate(BaseModel):
"""Schema for creating a post."""
id: int | None = None
title: str
content: str = ""
is_published: bool = False
author_id: int
class PostUpdate(BaseModel):
"""Schema for updating a post."""
title: str | None = None
content: str | None = None
is_published: bool | None = None
# =============================================================================
# CRUD Classes
# =============================================================================
RoleCrud = CrudFactory(Role)
UserCrud = CrudFactory(User)
PostCrud = CrudFactory(Post)
# =============================================================================
# Fixtures
# =============================================================================
@pytest.fixture
def anyio_backend():
"""Use asyncio for async tests."""
return "asyncio"
@pytest.fixture(scope="function")
async def engine():
"""Create a PostgreSQL test database engine."""
engine = create_async_engine(DATABASE_URL, echo=False)
yield engine
await engine.dispose()
@pytest.fixture(scope="function")
async def db_session(engine) -> AsyncSession:
"""Create a test database session with tables.
Creates all tables before the test and drops them after.
Each test gets a clean database state.
"""
# Create tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# Create session
session_factory = async_sessionmaker(engine, expire_on_commit=False)
session = session_factory()
try:
yield session
finally:
await session.close()
# Drop tables after test
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
@pytest.fixture
def sample_role_data() -> RoleCreate:
"""Sample role creation data."""
return RoleCreate(name="admin")
@pytest.fixture
def sample_user_data() -> UserCreate:
"""Sample user creation data."""
return UserCreate(
username="testuser",
email="test@example.com",
is_active=True,
)
@pytest.fixture
def sample_post_data() -> PostCreate:
"""Sample post creation data."""
return PostCreate(
title="Test Post",
content="Test content",
is_published=True,
author_id=1,
)

475
tests/test_crud.py Normal file
View File

@@ -0,0 +1,475 @@
"""Tests for fastapi_toolsets.crud module."""
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi_toolsets.crud import AsyncCrud, CrudFactory
from fastapi_toolsets.exceptions import NotFoundError
from .conftest import (
Role,
RoleCreate,
RoleCrud,
RoleUpdate,
User,
UserCreate,
UserCrud,
UserUpdate,
)
class TestCrudFactory:
"""Tests for CrudFactory."""
def test_creates_crud_class(self):
"""CrudFactory creates a properly configured CRUD class."""
crud = CrudFactory(User)
assert issubclass(crud, AsyncCrud)
assert crud.model is User
def test_creates_unique_classes(self):
"""Each call creates a unique class."""
crud1 = CrudFactory(User)
crud2 = CrudFactory(User)
assert crud1 is not crud2
def test_class_name_includes_model(self):
"""Generated class name includes model name."""
crud = CrudFactory(User)
assert "User" in crud.__name__
class TestCrudCreate:
"""Tests for CRUD create operations."""
@pytest.mark.anyio
async def test_create_single_record(self, db_session: AsyncSession):
"""Create a single record."""
data = RoleCreate(name="admin")
role = await RoleCrud.create(db_session, data)
assert role.id is not None
assert role.name == "admin"
@pytest.mark.anyio
async def test_create_with_relationship(self, db_session: AsyncSession):
"""Create records with foreign key relationships."""
role = await RoleCrud.create(db_session, RoleCreate(name="user"))
user_data = UserCreate(
username="john",
email="john@example.com",
role_id=role.id,
)
user = await UserCrud.create(db_session, user_data)
assert user.role_id == role.id
@pytest.mark.anyio
async def test_create_with_defaults(self, db_session: AsyncSession):
"""Create uses model defaults."""
user_data = UserCreate(username="jane", email="jane@example.com")
user = await UserCrud.create(db_session, user_data)
assert user.is_active is True
class TestCrudGet:
"""Tests for CRUD get operations."""
@pytest.mark.anyio
async def test_get_existing_record(self, db_session: AsyncSession):
"""Get an existing record by filter."""
created = await RoleCrud.create(db_session, RoleCreate(name="admin"))
fetched = await RoleCrud.get(db_session, [Role.id == created.id])
assert fetched.id == created.id
assert fetched.name == "admin"
@pytest.mark.anyio
async def test_get_raises_not_found(self, db_session: AsyncSession):
"""Get raises NotFoundError for missing records."""
with pytest.raises(NotFoundError):
await RoleCrud.get(db_session, [Role.id == 99999])
@pytest.mark.anyio
async def test_get_with_multiple_filters(self, db_session: AsyncSession):
"""Get with multiple filter conditions."""
await UserCrud.create(
db_session,
UserCreate(username="active", email="active@test.com", is_active=True),
)
await UserCrud.create(
db_session,
UserCreate(username="inactive", email="inactive@test.com", is_active=False),
)
user = await UserCrud.get(
db_session,
[User.username == "active", User.is_active == True], # noqa: E712
)
assert user.username == "active"
class TestCrudFirst:
"""Tests for CRUD first operations."""
@pytest.mark.anyio
async def test_first_returns_record(self, db_session: AsyncSession):
"""First returns the first matching record."""
await RoleCrud.create(db_session, RoleCreate(name="admin"))
role = await RoleCrud.first(db_session, [Role.name == "admin"])
assert role is not None
assert role.name == "admin"
@pytest.mark.anyio
async def test_first_returns_none_when_not_found(self, db_session: AsyncSession):
"""First returns None for missing records."""
role = await RoleCrud.first(db_session, [Role.name == "nonexistent"])
assert role is None
@pytest.mark.anyio
async def test_first_without_filters(self, db_session: AsyncSession):
"""First without filters returns any record."""
await RoleCrud.create(db_session, RoleCreate(name="role1"))
await RoleCrud.create(db_session, RoleCreate(name="role2"))
role = await RoleCrud.first(db_session)
assert role is not None
class TestCrudGetMulti:
"""Tests for CRUD get_multi operations."""
@pytest.mark.anyio
async def test_get_multi_returns_all(self, db_session: AsyncSession):
"""Get multiple records."""
await RoleCrud.create(db_session, RoleCreate(name="admin"))
await RoleCrud.create(db_session, RoleCreate(name="user"))
await RoleCrud.create(db_session, RoleCreate(name="guest"))
roles = await RoleCrud.get_multi(db_session)
assert len(roles) == 3
@pytest.mark.anyio
async def test_get_multi_with_filters(self, db_session: AsyncSession):
"""Get multiple with filter."""
await UserCrud.create(
db_session,
UserCreate(username="active1", email="a1@test.com", is_active=True),
)
await UserCrud.create(
db_session,
UserCreate(username="active2", email="a2@test.com", is_active=True),
)
await UserCrud.create(
db_session,
UserCreate(username="inactive", email="i@test.com", is_active=False),
)
active_users = await UserCrud.get_multi(
db_session,
filters=[User.is_active == True], # noqa: E712
)
assert len(active_users) == 2
@pytest.mark.anyio
async def test_get_multi_with_limit(self, db_session: AsyncSession):
"""Get multiple with limit."""
for i in range(5):
await RoleCrud.create(db_session, RoleCreate(name=f"role{i}"))
roles = await RoleCrud.get_multi(db_session, limit=3)
assert len(roles) == 3
@pytest.mark.anyio
async def test_get_multi_with_offset(self, db_session: AsyncSession):
"""Get multiple with offset."""
for i in range(5):
await RoleCrud.create(db_session, RoleCreate(name=f"role{i}"))
roles = await RoleCrud.get_multi(db_session, offset=2)
assert len(roles) == 3
@pytest.mark.anyio
async def test_get_multi_with_order_by(self, db_session: AsyncSession):
"""Get multiple with ordering."""
await RoleCrud.create(db_session, RoleCreate(name="charlie"))
await RoleCrud.create(db_session, RoleCreate(name="alpha"))
await RoleCrud.create(db_session, RoleCreate(name="bravo"))
roles = await RoleCrud.get_multi(db_session, order_by=Role.name)
names = [r.name for r in roles]
assert names == ["alpha", "bravo", "charlie"]
class TestCrudUpdate:
"""Tests for CRUD update operations."""
@pytest.mark.anyio
async def test_update_record(self, db_session: AsyncSession):
"""Update an existing record."""
role = await RoleCrud.create(db_session, RoleCreate(name="old_name"))
updated = await RoleCrud.update(
db_session,
RoleUpdate(name="new_name"),
[Role.id == role.id],
)
assert updated.name == "new_name"
assert updated.id == role.id
@pytest.mark.anyio
async def test_update_raises_not_found(self, db_session: AsyncSession):
"""Update raises NotFoundError for missing records."""
with pytest.raises(NotFoundError):
await RoleCrud.update(
db_session,
RoleUpdate(name="new"),
[Role.id == 99999],
)
@pytest.mark.anyio
async def test_update_excludes_unset(self, db_session: AsyncSession):
"""Update excludes unset fields by default."""
user = await UserCrud.create(
db_session,
UserCreate(username="john", email="john@test.com", is_active=True),
)
updated = await UserCrud.update(
db_session,
UserUpdate(username="johnny"),
[User.id == user.id],
)
assert updated.username == "johnny"
assert updated.email == "john@test.com"
assert updated.is_active is True
class TestCrudDelete:
"""Tests for CRUD delete operations."""
@pytest.mark.anyio
async def test_delete_record(self, db_session: AsyncSession):
"""Delete an existing record."""
role = await RoleCrud.create(db_session, RoleCreate(name="to_delete"))
result = await RoleCrud.delete(db_session, [Role.id == role.id])
assert result is True
assert await RoleCrud.first(db_session, [Role.id == role.id]) is None
@pytest.mark.anyio
async def test_delete_multiple_records(self, db_session: AsyncSession):
"""Delete multiple records with filter."""
await UserCrud.create(
db_session,
UserCreate(username="u1", email="u1@test.com", is_active=False),
)
await UserCrud.create(
db_session,
UserCreate(username="u2", email="u2@test.com", is_active=False),
)
await UserCrud.create(
db_session,
UserCreate(username="u3", email="u3@test.com", is_active=True),
)
await UserCrud.delete(db_session, [User.is_active == False]) # noqa: E712
remaining = await UserCrud.get_multi(db_session)
assert len(remaining) == 1
assert remaining[0].username == "u3"
class TestCrudExists:
"""Tests for CRUD exists operations."""
@pytest.mark.anyio
async def test_exists_returns_true(self, db_session: AsyncSession):
"""Exists returns True for existing records."""
await RoleCrud.create(db_session, RoleCreate(name="admin"))
assert await RoleCrud.exists(db_session, [Role.name == "admin"]) is True
@pytest.mark.anyio
async def test_exists_returns_false(self, db_session: AsyncSession):
"""Exists returns False for missing records."""
assert await RoleCrud.exists(db_session, [Role.name == "nonexistent"]) is False
class TestCrudCount:
"""Tests for CRUD count operations."""
@pytest.mark.anyio
async def test_count_all(self, db_session: AsyncSession):
"""Count all records."""
await RoleCrud.create(db_session, RoleCreate(name="role1"))
await RoleCrud.create(db_session, RoleCreate(name="role2"))
await RoleCrud.create(db_session, RoleCreate(name="role3"))
count = await RoleCrud.count(db_session)
assert count == 3
@pytest.mark.anyio
async def test_count_with_filter(self, db_session: AsyncSession):
"""Count records with filter."""
await UserCrud.create(
db_session,
UserCreate(username="a1", email="a1@test.com", is_active=True),
)
await UserCrud.create(
db_session,
UserCreate(username="a2", email="a2@test.com", is_active=True),
)
await UserCrud.create(
db_session,
UserCreate(username="i1", email="i1@test.com", is_active=False),
)
active_count = await UserCrud.count(
db_session,
filters=[User.is_active == True], # noqa: E712
)
assert active_count == 2
class TestCrudUpsert:
"""Tests for CRUD upsert operations (PostgreSQL-specific)."""
@pytest.mark.anyio
async def test_upsert_insert_new_record(self, db_session: AsyncSession):
"""Upsert inserts a new record when it doesn't exist."""
data = RoleCreate(id=1, name="upsert_new")
role = await RoleCrud.upsert(
db_session,
data,
index_elements=["id"],
)
assert role is not None
assert role.name == "upsert_new"
@pytest.mark.anyio
async def test_upsert_update_existing_record(self, db_session: AsyncSession):
"""Upsert updates an existing record."""
# First insert
data = RoleCreate(id=100, name="original_name")
await RoleCrud.upsert(db_session, data, index_elements=["id"])
# Upsert with update
updated_data = RoleCreate(id=100, name="updated_name")
role = await RoleCrud.upsert(
db_session,
updated_data,
index_elements=["id"],
set_=RoleUpdate(name="updated_name"),
)
assert role is not None
assert role.name == "updated_name"
# Verify only one record exists
count = await RoleCrud.count(db_session, [Role.id == 100])
assert count == 1
@pytest.mark.anyio
async def test_upsert_do_nothing_on_conflict(self, db_session: AsyncSession):
"""Upsert does nothing on conflict when set_ is not provided."""
# First insert
data = RoleCreate(id=200, name="do_nothing_original")
await RoleCrud.upsert(db_session, data, index_elements=["id"])
# Upsert without set_ (do nothing)
conflict_data = RoleCreate(id=200, name="do_nothing_conflict")
await RoleCrud.upsert(db_session, conflict_data, index_elements=["id"])
# Original value should be preserved
role = await RoleCrud.first(db_session, [Role.id == 200])
assert role is not None
assert role.name == "do_nothing_original"
@pytest.mark.anyio
async def test_upsert_with_unique_constraint(self, db_session: AsyncSession):
"""Upsert works with unique constraint columns."""
# Insert first role
data1 = RoleCreate(name="unique_role")
await RoleCrud.upsert(db_session, data1, index_elements=["name"])
# Upsert with same name - should update (or do nothing)
data2 = RoleCreate(name="unique_role")
role = await RoleCrud.upsert(db_session, data2, index_elements=["name"])
assert role is not None
assert role.name == "unique_role"
# Should still be only one record
count = await RoleCrud.count(db_session, [Role.name == "unique_role"])
assert count == 1
class TestCrudPaginate:
"""Tests for CRUD pagination."""
@pytest.mark.anyio
async def test_paginate_first_page(self, db_session: AsyncSession):
"""Paginate returns first page."""
for i in range(25):
await RoleCrud.create(db_session, RoleCreate(name=f"role{i:02d}"))
result = await RoleCrud.paginate(db_session, page=1, items_per_page=10)
assert len(result["data"]) == 10
assert result["pagination"]["total_count"] == 25
assert result["pagination"]["page"] == 1
assert result["pagination"]["items_per_page"] == 10
assert result["pagination"]["has_more"] is True
@pytest.mark.anyio
async def test_paginate_last_page(self, db_session: AsyncSession):
"""Paginate returns last page with has_more=False."""
for i in range(25):
await RoleCrud.create(db_session, RoleCreate(name=f"role{i:02d}"))
result = await RoleCrud.paginate(db_session, page=3, items_per_page=10)
assert len(result["data"]) == 5
assert result["pagination"]["has_more"] is False
@pytest.mark.anyio
async def test_paginate_with_filters(self, db_session: AsyncSession):
"""Paginate with filter conditions."""
for i in range(10):
await UserCrud.create(
db_session,
UserCreate(
username=f"user{i}",
email=f"user{i}@test.com",
is_active=i % 2 == 0,
),
)
result = await UserCrud.paginate(
db_session,
filters=[User.is_active == True], # noqa: E712
page=1,
items_per_page=10,
)
assert result["pagination"]["total_count"] == 5
@pytest.mark.anyio
async def test_paginate_with_ordering(self, db_session: AsyncSession):
"""Paginate with custom ordering."""
await RoleCrud.create(db_session, RoleCreate(name="charlie"))
await RoleCrud.create(db_session, RoleCreate(name="alpha"))
await RoleCrud.create(db_session, RoleCreate(name="bravo"))
result = await RoleCrud.paginate(
db_session,
order_by=Role.name,
page=1,
items_per_page=10,
)
names = [r.name for r in result["data"]]
assert names == ["alpha", "bravo", "charlie"]

243
tests/test_db.py Normal file
View File

@@ -0,0 +1,243 @@
"""Tests for fastapi_toolsets.db module."""
import pytest
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from fastapi_toolsets.db import (
LockMode,
create_db_context,
create_db_dependency,
get_transaction,
lock_tables,
)
from .conftest import DATABASE_URL, Base, Role, RoleCrud, User
class TestCreateDbDependency:
"""Tests for create_db_dependency."""
@pytest.mark.anyio
async def test_yields_session(self):
"""Dependency yields a valid session."""
engine = create_async_engine(DATABASE_URL, echo=False)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
get_db = create_db_dependency(session_factory)
async for session in get_db():
assert isinstance(session, AsyncSession)
break
await engine.dispose()
@pytest.mark.anyio
async def test_auto_commits_transaction(self):
"""Dependency auto-commits if transaction is active."""
engine = create_async_engine(DATABASE_URL, echo=False)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
try:
get_db = create_db_dependency(session_factory)
async for session in get_db():
role = Role(name="test_role_dep")
session.add(role)
await session.flush()
async with session_factory() as verify_session:
result = await RoleCrud.first(
verify_session, [Role.name == "test_role_dep"]
)
assert result is not None
finally:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
class TestCreateDbContext:
"""Tests for create_db_context."""
@pytest.mark.anyio
async def test_context_manager_yields_session(self):
"""Context manager yields a valid session."""
engine = create_async_engine(DATABASE_URL, echo=False)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
get_db_context = create_db_context(session_factory)
async with get_db_context() as session:
assert isinstance(session, AsyncSession)
await engine.dispose()
@pytest.mark.anyio
async def test_context_manager_commits(self):
"""Context manager commits on exit."""
engine = create_async_engine(DATABASE_URL, echo=False)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
try:
get_db_context = create_db_context(session_factory)
async with get_db_context() as session:
role = Role(name="context_role")
session.add(role)
await session.flush()
async with session_factory() as verify_session:
result = await RoleCrud.first(
verify_session, [Role.name == "context_role"]
)
assert result is not None
finally:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
class TestGetTransaction:
"""Tests for get_transaction context manager."""
@pytest.mark.anyio
async def test_starts_transaction(self, db_session: AsyncSession):
"""get_transaction starts a new transaction."""
async with get_transaction(db_session):
role = Role(name="tx_role")
db_session.add(role)
result = await RoleCrud.first(db_session, [Role.name == "tx_role"])
assert result is not None
@pytest.mark.anyio
async def test_nested_transaction_uses_savepoint(self, db_session: AsyncSession):
"""Nested transactions use savepoints."""
async with get_transaction(db_session):
role1 = Role(name="outer_role")
db_session.add(role1)
await db_session.flush()
async with get_transaction(db_session):
role2 = Role(name="inner_role")
db_session.add(role2)
results = await RoleCrud.get_multi(db_session)
names = {r.name for r in results}
assert "outer_role" in names
assert "inner_role" in names
@pytest.mark.anyio
async def test_rollback_on_exception(self, db_session: AsyncSession):
"""Transaction rolls back on exception."""
try:
async with get_transaction(db_session):
role = Role(name="rollback_role")
db_session.add(role)
await db_session.flush()
raise ValueError("Simulated error")
except ValueError:
pass
result = await RoleCrud.first(db_session, [Role.name == "rollback_role"])
assert result is None
@pytest.mark.anyio
async def test_nested_rollback_preserves_outer(self, db_session: AsyncSession):
"""Nested rollback preserves outer transaction."""
async with get_transaction(db_session):
role1 = Role(name="preserved_role")
db_session.add(role1)
await db_session.flush()
try:
async with get_transaction(db_session):
role2 = Role(name="rolled_back_role")
db_session.add(role2)
await db_session.flush()
raise ValueError("Inner error")
except ValueError:
pass
outer = await RoleCrud.first(db_session, [Role.name == "preserved_role"])
inner = await RoleCrud.first(db_session, [Role.name == "rolled_back_role"])
assert outer is not None
assert inner is None
class TestLockMode:
"""Tests for LockMode enum."""
def test_lock_modes_exist(self):
"""All expected lock modes are defined."""
assert LockMode.ACCESS_SHARE == "ACCESS SHARE"
assert LockMode.ROW_SHARE == "ROW SHARE"
assert LockMode.ROW_EXCLUSIVE == "ROW EXCLUSIVE"
assert LockMode.SHARE_UPDATE_EXCLUSIVE == "SHARE UPDATE EXCLUSIVE"
assert LockMode.SHARE == "SHARE"
assert LockMode.SHARE_ROW_EXCLUSIVE == "SHARE ROW EXCLUSIVE"
assert LockMode.EXCLUSIVE == "EXCLUSIVE"
assert LockMode.ACCESS_EXCLUSIVE == "ACCESS EXCLUSIVE"
def test_lock_mode_is_string(self):
"""Lock modes are string enums."""
assert isinstance(LockMode.EXCLUSIVE, str)
assert LockMode.EXCLUSIVE.value == "EXCLUSIVE"
class TestLockTables:
"""Tests for lock_tables context manager (PostgreSQL-specific)."""
@pytest.mark.anyio
async def test_lock_single_table(self, db_session: AsyncSession):
"""Lock a single table."""
async with lock_tables(db_session, [Role]):
# Inside the lock, we can still perform operations
role = Role(name="locked_role")
db_session.add(role)
await db_session.flush()
# After lock is released, verify the data was committed
result = await RoleCrud.first(db_session, [Role.name == "locked_role"])
assert result is not None
@pytest.mark.anyio
async def test_lock_multiple_tables(self, db_session: AsyncSession):
"""Lock multiple tables."""
async with lock_tables(db_session, [Role, User]):
role = Role(name="multi_lock_role")
db_session.add(role)
await db_session.flush()
result = await RoleCrud.first(db_session, [Role.name == "multi_lock_role"])
assert result is not None
@pytest.mark.anyio
async def test_lock_with_custom_mode(self, db_session: AsyncSession):
"""Lock with custom lock mode."""
async with lock_tables(db_session, [Role], mode=LockMode.EXCLUSIVE):
role = Role(name="exclusive_lock_role")
db_session.add(role)
await db_session.flush()
result = await RoleCrud.first(db_session, [Role.name == "exclusive_lock_role"])
assert result is not None
@pytest.mark.anyio
async def test_lock_rollback_on_exception(self, db_session: AsyncSession):
"""Lock context rolls back on exception."""
try:
async with lock_tables(db_session, [Role]):
role = Role(name="lock_rollback_role")
db_session.add(role)
await db_session.flush()
raise ValueError("Simulated error")
except ValueError:
pass
result = await RoleCrud.first(db_session, [Role.name == "lock_rollback_role"])
assert result is None

265
tests/test_exceptions.py Normal file
View File

@@ -0,0 +1,265 @@
"""Tests for fastapi_toolsets.exceptions module."""
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from fastapi_toolsets.exceptions import (
ApiException,
ConflictError,
ForbiddenError,
NotFoundError,
UnauthorizedError,
generate_error_responses,
init_exceptions_handlers,
)
from fastapi_toolsets.schemas import ApiError
class TestApiException:
"""Tests for ApiException base class."""
def test_subclass_with_api_error(self):
"""Subclasses can define api_error."""
class CustomError(ApiException):
api_error = ApiError(
code=418,
msg="I'm a teapot",
desc="The server is a teapot.",
err_code="TEA-418",
)
error = CustomError()
assert error.api_error.code == 418
assert error.api_error.msg == "I'm a teapot"
assert str(error) == "I'm a teapot"
def test_custom_detail_message(self):
"""Custom detail overrides default message."""
class CustomError(ApiException):
api_error = ApiError(
code=400,
msg="Bad Request",
desc="Request was bad.",
err_code="BAD-400",
)
error = CustomError("Custom message")
assert str(error) == "Custom message"
class TestBuiltInExceptions:
"""Tests for built-in exception classes."""
def test_unauthorized_error(self):
"""UnauthorizedError has correct attributes."""
error = UnauthorizedError()
assert error.api_error.code == 401
assert error.api_error.err_code == "AUTH-401"
def test_forbidden_error(self):
"""ForbiddenError has correct attributes."""
error = ForbiddenError()
assert error.api_error.code == 403
assert error.api_error.err_code == "AUTH-403"
def test_not_found_error(self):
"""NotFoundError has correct attributes."""
error = NotFoundError()
assert error.api_error.code == 404
assert error.api_error.err_code == "RES-404"
def test_conflict_error(self):
"""ConflictError has correct attributes."""
error = ConflictError()
assert error.api_error.code == 409
assert error.api_error.err_code == "RES-409"
class TestGenerateErrorResponses:
"""Tests for generate_error_responses function."""
def test_generates_single_response(self):
"""Generates response for single exception."""
responses = generate_error_responses(NotFoundError)
assert 404 in responses
assert responses[404]["description"] == "Not Found"
def test_generates_multiple_responses(self):
"""Generates responses for multiple exceptions."""
responses = generate_error_responses(
UnauthorizedError,
ForbiddenError,
NotFoundError,
)
assert 401 in responses
assert 403 in responses
assert 404 in responses
def test_response_has_example(self):
"""Generated response includes example."""
responses = generate_error_responses(NotFoundError)
example = responses[404]["content"]["application/json"]["example"]
assert example["status"] == "FAIL"
assert example["error_code"] == "RES-404"
assert example["message"] == "Not Found"
class TestInitExceptionsHandlers:
"""Tests for init_exceptions_handlers function."""
def test_returns_app(self):
"""Returns the FastAPI app."""
app = FastAPI()
result = init_exceptions_handlers(app)
assert result is app
def test_handles_api_exception(self):
"""Handles ApiException with structured response."""
app = FastAPI()
init_exceptions_handlers(app)
@app.get("/error")
async def raise_error():
raise NotFoundError()
client = TestClient(app)
response = client.get("/error")
assert response.status_code == 404
data = response.json()
assert data["status"] == "FAIL"
assert data["error_code"] == "RES-404"
assert data["message"] == "Not Found"
def test_handles_validation_error(self):
"""Handles validation errors with structured response."""
from pydantic import BaseModel
app = FastAPI()
init_exceptions_handlers(app)
class Item(BaseModel):
name: str
price: float
@app.post("/items")
async def create_item(item: Item):
return item
client = TestClient(app)
response = client.post("/items", json={"name": 123})
assert response.status_code == 422
data = response.json()
assert data["status"] == "FAIL"
assert data["error_code"] == "VAL-422"
assert "errors" in data["data"]
def test_handles_generic_exception(self):
"""Handles unhandled exceptions with 500 response."""
app = FastAPI()
init_exceptions_handlers(app)
@app.get("/crash")
async def crash():
raise RuntimeError("Something went wrong")
client = TestClient(app, raise_server_exceptions=False)
response = client.get("/crash")
assert response.status_code == 500
data = response.json()
assert data["status"] == "FAIL"
assert data["error_code"] == "SERVER-500"
def test_custom_openapi_schema(self):
"""Customizes OpenAPI schema for 422 responses."""
app = FastAPI()
init_exceptions_handlers(app)
from pydantic import BaseModel
class Item(BaseModel):
name: str
@app.post("/items")
async def create_item(item: Item):
return item
openapi = app.openapi()
post_op = openapi["paths"]["/items"]["post"]
assert "422" in post_op["responses"]
resp_422 = post_op["responses"]["422"]
example = resp_422["content"]["application/json"]["example"]
assert example["error_code"] == "VAL-422"
class TestExceptionIntegration:
"""Integration tests for exception handling."""
@pytest.fixture
def app_with_routes(self):
"""Create app with test routes."""
app = FastAPI()
init_exceptions_handlers(app)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
if user_id == 404:
raise NotFoundError()
if user_id == 401:
raise UnauthorizedError()
if user_id == 403:
raise ForbiddenError()
if user_id == 409:
raise ConflictError()
return {"id": user_id}
return app
def test_not_found_response(self, app_with_routes):
"""NotFoundError returns 404."""
client = TestClient(app_with_routes)
response = client.get("/users/404")
assert response.status_code == 404
assert response.json()["error_code"] == "RES-404"
def test_unauthorized_response(self, app_with_routes):
"""UnauthorizedError returns 401."""
client = TestClient(app_with_routes)
response = client.get("/users/401")
assert response.status_code == 401
assert response.json()["error_code"] == "AUTH-401"
def test_forbidden_response(self, app_with_routes):
"""ForbiddenError returns 403."""
client = TestClient(app_with_routes)
response = client.get("/users/403")
assert response.status_code == 403
assert response.json()["error_code"] == "AUTH-403"
def test_conflict_response(self, app_with_routes):
"""ConflictError returns 409."""
client = TestClient(app_with_routes)
response = client.get("/users/409")
assert response.status_code == 409
assert response.json()["error_code"] == "RES-409"
def test_success_response(self, app_with_routes):
"""Successful requests return normally."""
client = TestClient(app_with_routes)
response = client.get("/users/1")
assert response.status_code == 200
assert response.json() == {"id": 1}

401
tests/test_fixtures.py Normal file
View File

@@ -0,0 +1,401 @@
"""Tests for fastapi_toolsets.fixtures module."""
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi_toolsets.fixtures import (
Context,
FixtureRegistry,
LoadStrategy,
load_fixtures,
load_fixtures_by_context,
)
from .conftest import Role, User
class TestContext:
"""Tests for Context enum."""
def test_base_context(self):
"""BASE context has correct value."""
assert Context.BASE.value == "base"
def test_production_context(self):
"""PRODUCTION context has correct value."""
assert Context.PRODUCTION.value == "production"
def test_development_context(self):
"""DEVELOPMENT context has correct value."""
assert Context.DEVELOPMENT.value == "development"
def test_testing_context(self):
"""TESTING context has correct value."""
assert Context.TESTING.value == "testing"
class TestLoadStrategy:
"""Tests for LoadStrategy enum."""
def test_insert_strategy(self):
"""INSERT strategy has correct value."""
assert LoadStrategy.INSERT.value == "insert"
def test_merge_strategy(self):
"""MERGE strategy has correct value."""
assert LoadStrategy.MERGE.value == "merge"
def test_skip_existing_strategy(self):
"""SKIP_EXISTING strategy has correct value."""
assert LoadStrategy.SKIP_EXISTING.value == "skip_existing"
class TestFixtureRegistry:
"""Tests for FixtureRegistry class."""
def test_register_with_decorator(self):
"""Register fixture with decorator."""
registry = FixtureRegistry()
@registry.register
def roles():
return [Role(id=1, name="admin")]
assert "roles" in [f.name for f in registry.get_all()]
def test_register_with_custom_name(self):
"""Register fixture with custom name."""
registry = FixtureRegistry()
@registry.register(name="custom_roles")
def roles():
return [Role(id=1, name="admin")]
fixture = registry.get("custom_roles")
assert fixture.name == "custom_roles"
def test_register_with_dependencies(self):
"""Register fixture with dependencies."""
registry = FixtureRegistry()
@registry.register
def roles():
return [Role(id=1, name="admin")]
@registry.register(depends_on=["roles"])
def users():
return [User(id=1, username="admin", email="admin@test.com", role_id=1)]
fixture = registry.get("users")
assert fixture.depends_on == ["roles"]
def test_register_with_contexts(self):
"""Register fixture with contexts."""
registry = FixtureRegistry()
@registry.register(contexts=[Context.TESTING])
def test_data():
return [Role(id=100, name="test")]
fixture = registry.get("test_data")
assert Context.TESTING.value in fixture.contexts
def test_get_raises_key_error(self):
"""Get raises KeyError for missing fixture."""
registry = FixtureRegistry()
with pytest.raises(KeyError, match="not found"):
registry.get("nonexistent")
def test_get_all(self):
"""Get all registered fixtures."""
registry = FixtureRegistry()
@registry.register
def fixture1():
return []
@registry.register
def fixture2():
return []
fixtures = registry.get_all()
names = {f.name for f in fixtures}
assert names == {"fixture1", "fixture2"}
def test_get_by_context(self):
"""Get fixtures by context."""
registry = FixtureRegistry()
@registry.register(contexts=[Context.BASE])
def base_data():
return []
@registry.register(contexts=[Context.TESTING])
def test_data():
return []
@registry.register(contexts=[Context.PRODUCTION])
def prod_data():
return []
testing_fixtures = registry.get_by_context(Context.TESTING)
names = {f.name for f in testing_fixtures}
assert names == {"test_data"}
class TestDependencyResolution:
"""Tests for fixture dependency resolution."""
def test_resolve_simple_dependency(self):
"""Resolve simple dependency chain."""
registry = FixtureRegistry()
@registry.register
def roles():
return []
@registry.register(depends_on=["roles"])
def users():
return []
order = registry.resolve_dependencies("users")
assert order == ["roles", "users"]
def test_resolve_multiple_dependencies(self):
"""Resolve multiple dependencies."""
registry = FixtureRegistry()
@registry.register
def roles():
return []
@registry.register
def permissions():
return []
@registry.register(depends_on=["roles", "permissions"])
def users():
return []
order = registry.resolve_dependencies("users")
assert "roles" in order
assert "permissions" in order
assert order.index("roles") < order.index("users")
assert order.index("permissions") < order.index("users")
def test_resolve_transitive_dependencies(self):
"""Resolve transitive dependencies."""
registry = FixtureRegistry()
@registry.register
def base():
return []
@registry.register(depends_on=["base"])
def middle():
return []
@registry.register(depends_on=["middle"])
def top():
return []
order = registry.resolve_dependencies("top")
assert order == ["base", "middle", "top"]
def test_detect_circular_dependency(self):
"""Detect circular dependencies."""
registry = FixtureRegistry()
@registry.register(depends_on=["b"])
def a():
return []
@registry.register(depends_on=["a"])
def b():
return []
with pytest.raises(ValueError, match="Circular dependency"):
registry.resolve_dependencies("a")
def test_resolve_context_dependencies(self):
"""Resolve all fixtures for a context with dependencies."""
registry = FixtureRegistry()
@registry.register(contexts=[Context.BASE])
def roles():
return []
@registry.register(depends_on=["roles"], contexts=[Context.TESTING])
def test_users():
return []
order = registry.resolve_context_dependencies(Context.BASE, Context.TESTING)
assert "roles" in order
assert "test_users" in order
assert order.index("roles") < order.index("test_users")
class TestLoadFixtures:
"""Tests for load_fixtures function."""
@pytest.mark.anyio
async def test_load_single_fixture(self, db_session: AsyncSession):
"""Load a single fixture."""
registry = FixtureRegistry()
@registry.register
def roles():
return [
Role(id=1, name="admin"),
Role(id=2, name="user"),
]
result = await load_fixtures(db_session, registry, "roles")
assert "roles" in result
assert len(result["roles"]) == 2
from .conftest import RoleCrud
count = await RoleCrud.count(db_session)
assert count == 2
@pytest.mark.anyio
async def test_load_with_dependencies(self, db_session: AsyncSession):
"""Load fixtures with dependencies."""
registry = FixtureRegistry()
@registry.register
def roles():
return [Role(id=1, name="admin")]
@registry.register(depends_on=["roles"])
def users():
return [User(id=1, username="admin", email="admin@test.com", role_id=1)]
result = await load_fixtures(db_session, registry, "users")
assert "roles" in result
assert "users" in result
from .conftest import RoleCrud, UserCrud
assert await RoleCrud.count(db_session) == 1
assert await UserCrud.count(db_session) == 1
@pytest.mark.anyio
async def test_load_with_merge_strategy(self, db_session: AsyncSession):
"""Load fixtures with MERGE strategy updates existing."""
registry = FixtureRegistry()
@registry.register
def roles():
return [Role(id=1, name="admin")]
await load_fixtures(db_session, registry, "roles", strategy=LoadStrategy.MERGE)
await load_fixtures(db_session, registry, "roles", strategy=LoadStrategy.MERGE)
from .conftest import RoleCrud
count = await RoleCrud.count(db_session)
assert count == 1
@pytest.mark.anyio
async def test_load_with_skip_existing_strategy(self, db_session: AsyncSession):
"""Load fixtures with SKIP_EXISTING strategy."""
registry = FixtureRegistry()
@registry.register
def roles():
return [Role(id=1, name="original")]
await load_fixtures(
db_session, registry, "roles", strategy=LoadStrategy.SKIP_EXISTING
)
@registry.register(name="roles_updated")
def roles_v2():
return [Role(id=1, name="updated")]
registry._fixtures["roles"] = registry._fixtures.pop("roles_updated")
await load_fixtures(
db_session, registry, "roles", strategy=LoadStrategy.SKIP_EXISTING
)
from .conftest import RoleCrud
role = await RoleCrud.first(db_session, [Role.id == 1])
assert role is not None
assert role.name == "original"
class TestLoadFixturesByContext:
"""Tests for load_fixtures_by_context function."""
@pytest.mark.anyio
async def test_load_by_single_context(self, db_session: AsyncSession):
"""Load fixtures by single context."""
registry = FixtureRegistry()
@registry.register(contexts=[Context.BASE])
def base_roles():
return [Role(id=1, name="base_role")]
@registry.register(contexts=[Context.TESTING])
def test_roles():
return [Role(id=100, name="test_role")]
await load_fixtures_by_context(db_session, registry, Context.BASE)
from .conftest import RoleCrud
count = await RoleCrud.count(db_session)
assert count == 1
role = await RoleCrud.first(db_session, [Role.id == 1])
assert role is not None
assert role.name == "base_role"
@pytest.mark.anyio
async def test_load_by_multiple_contexts(self, db_session: AsyncSession):
"""Load fixtures by multiple contexts."""
registry = FixtureRegistry()
@registry.register(contexts=[Context.BASE])
def base_roles():
return [Role(id=1, name="base_role")]
@registry.register(contexts=[Context.TESTING])
def test_roles():
return [Role(id=100, name="test_role")]
await load_fixtures_by_context(
db_session, registry, Context.BASE, Context.TESTING
)
from .conftest import RoleCrud
count = await RoleCrud.count(db_session)
assert count == 2
@pytest.mark.anyio
async def test_load_context_with_dependencies(self, db_session: AsyncSession):
"""Load context fixtures with cross-context dependencies."""
registry = FixtureRegistry()
@registry.register(contexts=[Context.BASE])
def roles():
return [Role(id=1, name="admin")]
@registry.register(depends_on=["roles"], contexts=[Context.TESTING])
def test_users():
return [User(id=1, username="tester", email="test@test.com", role_id=1)]
await load_fixtures_by_context(db_session, registry, Context.TESTING)
from .conftest import RoleCrud, UserCrud
assert await RoleCrud.count(db_session) == 1
assert await UserCrud.count(db_session) == 1

160
tests/test_pytest_plugin.py Normal file
View File

@@ -0,0 +1,160 @@
"""Tests for fastapi_toolsets.pytest_plugin module."""
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from fastapi_toolsets.fixtures import Context, FixtureRegistry, register_fixtures
from .conftest import Role, RoleCrud, User, UserCrud
test_registry = FixtureRegistry()
@test_registry.register(contexts=[Context.BASE])
def roles() -> list[Role]:
return [
Role(id=1000, name="plugin_admin"),
Role(id=1001, name="plugin_user"),
]
@test_registry.register(depends_on=["roles"], contexts=[Context.BASE])
def users() -> list[User]:
return [
User(id=1000, username="plugin_admin", email="padmin@test.com", role_id=1000),
User(id=1001, username="plugin_user", email="puser@test.com", role_id=1001),
]
@test_registry.register(depends_on=["users"], contexts=[Context.TESTING])
def extra_users() -> list[User]:
return [
User(id=1002, username="plugin_extra", email="pextra@test.com", role_id=1001),
]
register_fixtures(test_registry, globals())
class TestRegisterFixtures:
"""Tests for register_fixtures function."""
def test_creates_fixtures_in_namespace(self):
"""Fixtures are created in the namespace."""
assert "fixture_roles" in globals()
assert "fixture_users" in globals()
assert "fixture_extra_users" in globals()
def test_fixtures_are_callable(self):
"""Created fixtures are callable."""
assert callable(globals()["fixture_roles"])
assert callable(globals()["fixture_users"])
class TestGeneratedFixtures:
"""Tests for the generated pytest fixtures."""
@pytest.mark.anyio
async def test_fixture_loads_data(
self, db_session: AsyncSession, fixture_roles: list[Role]
):
"""Fixture loads data into database and returns it."""
assert len(fixture_roles) == 2
assert fixture_roles[0].name == "plugin_admin"
assert fixture_roles[1].name == "plugin_user"
# Verify data is in database
count = await RoleCrud.count(db_session, [Role.id >= 1000])
assert count == 2
@pytest.mark.anyio
async def test_fixture_with_dependency(
self, db_session: AsyncSession, fixture_users: list[User]
):
"""Fixture with dependency loads parent fixture first."""
# fixture_users depends on fixture_roles
# Both should be loaded
assert len(fixture_users) == 2
# Roles should also be in database
roles_count = await RoleCrud.count(db_session, [Role.id >= 1000])
assert roles_count == 2
# Users should be in database
users_count = await UserCrud.count(db_session, [User.id >= 1000])
assert users_count == 2
@pytest.mark.anyio
async def test_fixture_returns_models(
self, db_session: AsyncSession, fixture_users: list[User]
):
"""Fixture returns actual model instances."""
user = fixture_users[0]
assert isinstance(user, User)
assert user.id == 1000
assert user.username == "plugin_admin"
@pytest.mark.anyio
async def test_fixture_relationships_work(
self, db_session: AsyncSession, fixture_users: list[User]
):
"""Loaded fixtures have working relationships."""
# Load user with role relationship
user = await UserCrud.get(
db_session,
[User.id == 1000],
load_options=[selectinload(User.role)],
)
assert user.role is not None
assert user.role.name == "plugin_admin"
@pytest.mark.anyio
async def test_chained_dependencies(
self, db_session: AsyncSession, fixture_extra_users: list[User]
):
"""Chained dependencies are resolved correctly."""
# fixture_extra_users -> fixture_users -> fixture_roles
assert len(fixture_extra_users) == 1
# All fixtures should be loaded
roles_count = await RoleCrud.count(db_session, [Role.id >= 1000])
users_count = await UserCrud.count(db_session, [User.id >= 1000])
assert roles_count == 2
assert users_count == 3 # 2 from users + 1 from extra_users
@pytest.mark.anyio
async def test_can_query_loaded_data(
self, db_session: AsyncSession, fixture_users: list[User]
):
"""Can query the loaded fixture data."""
# Get all users loaded by fixture
users = await UserCrud.get_multi(
db_session,
filters=[User.id >= 1000],
order_by=User.id,
)
assert len(users) == 2
assert users[0].username == "plugin_admin"
assert users[1].username == "plugin_user"
@pytest.mark.anyio
async def test_multiple_fixtures_in_same_test(
self,
db_session: AsyncSession,
fixture_roles: list[Role],
fixture_users: list[User],
):
"""Multiple fixtures can be used in the same test."""
assert len(fixture_roles) == 2
assert len(fixture_users) == 2
# Both should be in database
roles = await RoleCrud.get_multi(db_session, filters=[Role.id >= 1000])
users = await UserCrud.get_multi(db_session, filters=[User.id >= 1000])
assert len(roles) == 2
assert len(users) == 2

284
tests/test_schemas.py Normal file
View File

@@ -0,0 +1,284 @@
"""Tests for fastapi_toolsets.schemas module."""
import pytest
from pydantic import ValidationError
from fastapi_toolsets.schemas import (
ApiError,
ErrorResponse,
PaginatedResponse,
Pagination,
Response,
ResponseStatus,
)
class TestResponseStatus:
"""Tests for ResponseStatus enum."""
def test_success_value(self):
"""SUCCESS has correct value."""
assert ResponseStatus.SUCCESS.value == "SUCCESS"
def test_fail_value(self):
"""FAIL has correct value."""
assert ResponseStatus.FAIL.value == "FAIL"
def test_is_string_enum(self):
"""ResponseStatus is a string enum."""
assert isinstance(ResponseStatus.SUCCESS, str)
class TestApiError:
"""Tests for ApiError schema."""
def test_create_api_error(self):
"""Create ApiError with all fields."""
error = ApiError(
code=404,
msg="Not Found",
desc="The resource was not found.",
err_code="RES-404",
)
assert error.code == 404
assert error.msg == "Not Found"
assert error.desc == "The resource was not found."
assert error.err_code == "RES-404"
def test_requires_all_fields(self):
"""ApiError requires all fields."""
with pytest.raises(ValidationError):
ApiError(code=404, msg="Not Found") # type: ignore
class TestResponse:
"""Tests for Response schema."""
def test_create_with_data(self):
"""Create Response with data."""
response = Response(data={"id": 1, "name": "test"})
assert response.data == {"id": 1, "name": "test"}
assert response.status == ResponseStatus.SUCCESS
assert response.message == "Success"
assert response.error_code is None
def test_create_with_custom_message(self):
"""Create Response with custom message."""
response = Response(data="result", message="Operation completed")
assert response.message == "Operation completed"
def test_create_with_none_data(self):
"""Create Response with None data."""
response = Response[dict](data=None)
assert response.data is None
assert response.status == ResponseStatus.SUCCESS
def test_generic_type_hint(self):
"""Response supports generic type hints."""
response: Response[list[str]] = Response(data=["a", "b", "c"])
assert response.data == ["a", "b", "c"]
def test_serialization(self):
"""Response serializes correctly."""
response = Response(data={"key": "value"}, message="Test")
data = response.model_dump()
assert data["status"] == "SUCCESS"
assert data["message"] == "Test"
assert data["data"] == {"key": "value"}
assert data["error_code"] is None
class TestErrorResponse:
"""Tests for ErrorResponse schema."""
def test_default_values(self):
"""ErrorResponse has correct defaults."""
response = ErrorResponse()
assert response.status == ResponseStatus.FAIL
assert response.data is None
def test_with_description(self):
"""ErrorResponse with description."""
response = ErrorResponse(
message="Bad Request",
description="The request was invalid.",
error_code="BAD-400",
)
assert response.message == "Bad Request"
assert response.description == "The request was invalid."
assert response.error_code == "BAD-400"
def test_serialization(self):
"""ErrorResponse serializes correctly."""
response = ErrorResponse(
message="Error",
description="Details",
error_code="ERR-500",
)
data = response.model_dump()
assert data["status"] == "FAIL"
assert data["description"] == "Details"
class TestPagination:
"""Tests for Pagination schema."""
def test_create_pagination(self):
"""Create Pagination with all fields."""
pagination = Pagination(
total_count=100,
items_per_page=10,
page=1,
has_more=True,
)
assert pagination.total_count == 100
assert pagination.items_per_page == 10
assert pagination.page == 1
assert pagination.has_more is True
def test_last_page_has_more_false(self):
"""Last page has has_more=False."""
pagination = Pagination(
total_count=25,
items_per_page=10,
page=3,
has_more=False,
)
assert pagination.has_more is False
def test_serialization(self):
"""Pagination serializes correctly."""
pagination = Pagination(
total_count=50,
items_per_page=20,
page=2,
has_more=True,
)
data = pagination.model_dump()
assert data["total_count"] == 50
assert data["items_per_page"] == 20
assert data["page"] == 2
assert data["has_more"] is True
class TestPaginatedResponse:
"""Tests for PaginatedResponse schema."""
def test_create_paginated_response(self):
"""Create PaginatedResponse with data and pagination."""
pagination = Pagination(
total_count=30,
items_per_page=10,
page=1,
has_more=True,
)
response = PaginatedResponse(
data=[{"id": 1}, {"id": 2}],
pagination=pagination,
)
assert len(response.data) == 2
assert response.pagination.total_count == 30
assert response.status == ResponseStatus.SUCCESS
def test_with_custom_message(self):
"""PaginatedResponse with custom message."""
pagination = Pagination(
total_count=5,
items_per_page=10,
page=1,
has_more=False,
)
response = PaginatedResponse(
data=[1, 2, 3, 4, 5],
pagination=pagination,
message="Found 5 items",
)
assert response.message == "Found 5 items"
def test_empty_data(self):
"""PaginatedResponse with empty data."""
pagination = Pagination(
total_count=0,
items_per_page=10,
page=1,
has_more=False,
)
response = PaginatedResponse[dict](
data=[],
pagination=pagination,
)
assert response.data == []
assert response.pagination.total_count == 0
def test_generic_type_hint(self):
"""PaginatedResponse supports generic type hints."""
class UserOut:
id: int
name: str
pagination = Pagination(
total_count=1,
items_per_page=10,
page=1,
has_more=False,
)
response: PaginatedResponse[dict] = PaginatedResponse(
data=[{"id": 1, "name": "test"}],
pagination=pagination,
)
assert response.data[0]["id"] == 1
def test_serialization(self):
"""PaginatedResponse serializes correctly."""
pagination = Pagination(
total_count=100,
items_per_page=10,
page=5,
has_more=True,
)
response = PaginatedResponse(
data=["item1", "item2"],
pagination=pagination,
message="Page 5",
)
data = response.model_dump()
assert data["status"] == "SUCCESS"
assert data["message"] == "Page 5"
assert data["data"] == ["item1", "item2"]
assert data["pagination"]["page"] == 5
class TestFromAttributes:
"""Tests for from_attributes config (ORM mode)."""
def test_response_from_orm_object(self):
"""Response can accept ORM-like objects."""
class FakeOrmObject:
def __init__(self):
self.id = 1
self.name = "test"
obj = FakeOrmObject()
response = Response(data=obj)
assert response.data.id == 1 # type: ignore
assert response.data.name == "test" # type: ignore