mirror of
https://github.com/d3vyce/fastapi-toolsets.git
synced 2026-04-16 14:46:24 +02:00
Compare commits
5 Commits
51d3917421
...
06cec296a5
| Author | SHA1 | Date | |
|---|---|---|---|
|
06cec296a5
|
|||
|
ea9c31956f
|
|||
|
181ca35137
|
|||
|
|
5c1487c24a | ||
|
|
ebaa61525f |
@@ -1,6 +1,10 @@
|
|||||||
"""Generic async CRUD operations for SQLAlchemy models."""
|
"""Generic async CRUD operations for SQLAlchemy models."""
|
||||||
|
|
||||||
from ..exceptions import InvalidFacetFilterError, NoSearchableFieldsError
|
from ..exceptions import (
|
||||||
|
InvalidFacetFilterError,
|
||||||
|
NoSearchableFieldsError,
|
||||||
|
UnsupportedFacetTypeError,
|
||||||
|
)
|
||||||
from ..schemas import PaginationType
|
from ..schemas import PaginationType
|
||||||
from ..types import (
|
from ..types import (
|
||||||
FacetFieldType,
|
FacetFieldType,
|
||||||
@@ -25,4 +29,5 @@ __all__ = [
|
|||||||
"PaginationType",
|
"PaginationType",
|
||||||
"SearchConfig",
|
"SearchConfig",
|
||||||
"SearchFieldType",
|
"SearchFieldType",
|
||||||
|
"UnsupportedFacetTypeError",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -6,12 +6,27 @@ from collections.abc import Sequence
|
|||||||
from dataclasses import dataclass, replace
|
from dataclasses import dataclass, replace
|
||||||
from typing import TYPE_CHECKING, Any, Literal
|
from typing import TYPE_CHECKING, Any, Literal
|
||||||
|
|
||||||
from sqlalchemy import String, and_, or_, select
|
from sqlalchemy import String, and_, func, or_, select
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
from sqlalchemy.orm import DeclarativeBase
|
from sqlalchemy.orm import DeclarativeBase
|
||||||
from sqlalchemy.orm.attributes import InstrumentedAttribute
|
from sqlalchemy.orm.attributes import InstrumentedAttribute
|
||||||
|
from sqlalchemy.types import (
|
||||||
|
ARRAY,
|
||||||
|
Boolean,
|
||||||
|
Date,
|
||||||
|
DateTime,
|
||||||
|
Enum,
|
||||||
|
Integer,
|
||||||
|
Numeric,
|
||||||
|
Time,
|
||||||
|
Uuid,
|
||||||
|
)
|
||||||
|
|
||||||
from ..exceptions import InvalidFacetFilterError, NoSearchableFieldsError
|
from ..exceptions import (
|
||||||
|
InvalidFacetFilterError,
|
||||||
|
NoSearchableFieldsError,
|
||||||
|
UnsupportedFacetTypeError,
|
||||||
|
)
|
||||||
from ..types import FacetFieldType, SearchFieldType
|
from ..types import FacetFieldType, SearchFieldType
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@@ -201,7 +216,14 @@ async def build_facets(
|
|||||||
rels = ()
|
rels = ()
|
||||||
column = field
|
column = field
|
||||||
|
|
||||||
q = select(column).select_from(model).distinct()
|
col_type = column.property.columns[0].type
|
||||||
|
is_array = isinstance(col_type, ARRAY)
|
||||||
|
|
||||||
|
if is_array:
|
||||||
|
unnested = func.unnest(column).label(column.key)
|
||||||
|
q = select(unnested).select_from(model).distinct()
|
||||||
|
else:
|
||||||
|
q = select(column).select_from(model).distinct()
|
||||||
|
|
||||||
# Apply base joins (already done on main query, but needed here independently)
|
# Apply base joins (already done on main query, but needed here independently)
|
||||||
for rel in base_joins or []:
|
for rel in base_joins or []:
|
||||||
@@ -215,7 +237,10 @@ async def build_facets(
|
|||||||
if base_filters:
|
if base_filters:
|
||||||
q = q.where(and_(*base_filters))
|
q = q.where(and_(*base_filters))
|
||||||
|
|
||||||
q = q.order_by(column)
|
if is_array:
|
||||||
|
q = q.order_by(unnested)
|
||||||
|
else:
|
||||||
|
q = q.order_by(column)
|
||||||
result = await session.execute(q)
|
result = await session.execute(q)
|
||||||
values = [row[0] for row in result.all() if row[0] is not None]
|
values = [row[0] for row in result.all() if row[0] is not None]
|
||||||
return key, values
|
return key, values
|
||||||
@@ -226,6 +251,10 @@ async def build_facets(
|
|||||||
return dict(pairs)
|
return dict(pairs)
|
||||||
|
|
||||||
|
|
||||||
|
_EQUALITY_TYPES = (String, Integer, Numeric, Date, DateTime, Time, Enum, Uuid)
|
||||||
|
"""Column types that support equality / IN filtering in build_filter_by."""
|
||||||
|
|
||||||
|
|
||||||
def build_filter_by(
|
def build_filter_by(
|
||||||
filter_by: dict[str, Any],
|
filter_by: dict[str, Any],
|
||||||
facet_fields: Sequence[FacetFieldType],
|
facet_fields: Sequence[FacetFieldType],
|
||||||
@@ -271,9 +300,23 @@ def build_filter_by(
|
|||||||
joins.append(rel)
|
joins.append(rel)
|
||||||
added_join_keys.add(rel_key)
|
added_join_keys.add(rel_key)
|
||||||
|
|
||||||
if isinstance(value, list):
|
col_type = column.property.columns[0].type
|
||||||
filters.append(column.in_(value))
|
if isinstance(col_type, ARRAY):
|
||||||
|
if isinstance(value, list):
|
||||||
|
filters.append(column.overlap(value))
|
||||||
|
else:
|
||||||
|
filters.append(column.any(value))
|
||||||
|
elif isinstance(col_type, Boolean):
|
||||||
|
if isinstance(value, list):
|
||||||
|
filters.append(column.in_(value))
|
||||||
|
else:
|
||||||
|
filters.append(column.is_(value))
|
||||||
|
elif isinstance(col_type, _EQUALITY_TYPES):
|
||||||
|
if isinstance(value, list):
|
||||||
|
filters.append(column.in_(value))
|
||||||
|
else:
|
||||||
|
filters.append(column == value)
|
||||||
else:
|
else:
|
||||||
filters.append(column == value)
|
raise UnsupportedFacetTypeError(key, type(col_type).__name__)
|
||||||
|
|
||||||
return filters, joins
|
return filters, joins
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ from .exceptions import (
|
|||||||
NoSearchableFieldsError,
|
NoSearchableFieldsError,
|
||||||
NotFoundError,
|
NotFoundError,
|
||||||
UnauthorizedError,
|
UnauthorizedError,
|
||||||
|
UnsupportedFacetTypeError,
|
||||||
generate_error_responses,
|
generate_error_responses,
|
||||||
)
|
)
|
||||||
from .handler import init_exceptions_handlers
|
from .handler import init_exceptions_handlers
|
||||||
@@ -26,4 +27,5 @@ __all__ = [
|
|||||||
"NoSearchableFieldsError",
|
"NoSearchableFieldsError",
|
||||||
"NotFoundError",
|
"NotFoundError",
|
||||||
"UnauthorizedError",
|
"UnauthorizedError",
|
||||||
|
"UnsupportedFacetTypeError",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -144,6 +144,34 @@ class InvalidFacetFilterError(ApiException):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class UnsupportedFacetTypeError(ApiException):
|
||||||
|
"""Raised when a facet field has a column type not supported by filter_by."""
|
||||||
|
|
||||||
|
api_error = ApiError(
|
||||||
|
code=400,
|
||||||
|
msg="Unsupported Facet Type",
|
||||||
|
desc="The column type is not supported for facet filtering.",
|
||||||
|
err_code="FACET-TYPE-400",
|
||||||
|
)
|
||||||
|
|
||||||
|
def __init__(self, key: str, col_type: str) -> None:
|
||||||
|
"""Initialize the exception.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key: The facet field key.
|
||||||
|
col_type: The unsupported column type name.
|
||||||
|
"""
|
||||||
|
self.key = key
|
||||||
|
self.col_type = col_type
|
||||||
|
super().__init__(
|
||||||
|
desc=(
|
||||||
|
f"Facet field '{key}' has unsupported column type '{col_type}'. "
|
||||||
|
f"Supported types: String, Integer, Numeric, Boolean, "
|
||||||
|
f"Date, DateTime, Time, Enum, Uuid, ARRAY."
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class InvalidOrderFieldError(ApiException):
|
class InvalidOrderFieldError(ApiException):
|
||||||
"""Raised when order_by contains a field not in the allowed order fields."""
|
"""Raised when order_by contains a field not in the allowed order fields."""
|
||||||
|
|
||||||
|
|||||||
@@ -122,7 +122,7 @@ def _format_validation_error(
|
|||||||
)
|
)
|
||||||
|
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
|
||||||
content=error_response.model_dump(),
|
content=error_response.model_dump(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
"""Prometheus metrics endpoint for FastAPI applications."""
|
"""Prometheus metrics endpoint for FastAPI applications."""
|
||||||
|
|
||||||
import asyncio
|
import inspect
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
@@ -55,10 +55,10 @@ def init_metrics(
|
|||||||
|
|
||||||
# Partition collectors and cache env check at startup — both are stable for the app lifetime.
|
# Partition collectors and cache env check at startup — both are stable for the app lifetime.
|
||||||
async_collectors = [
|
async_collectors = [
|
||||||
c for c in registry.get_collectors() if asyncio.iscoroutinefunction(c.func)
|
c for c in registry.get_collectors() if inspect.iscoroutinefunction(c.func)
|
||||||
]
|
]
|
||||||
sync_collectors = [
|
sync_collectors = [
|
||||||
c for c in registry.get_collectors() if not asyncio.iscoroutinefunction(c.func)
|
c for c in registry.get_collectors() if not inspect.iscoroutinefunction(c.func)
|
||||||
]
|
]
|
||||||
multiprocess_mode = _is_multiprocess()
|
multiprocess_mode = _is_multiprocess()
|
||||||
|
|
||||||
|
|||||||
@@ -231,6 +231,13 @@ class EventSession(AsyncSession):
|
|||||||
k: v for k, v in field_changes.items() if k not in transient_ids
|
k: v for k, v in field_changes.items() if k not in transient_ids
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Suppress updates for deleted objects (row is gone, refresh would fail).
|
||||||
|
if deletes and field_changes:
|
||||||
|
deleted_ids = {id(o) for o, _ in deletes}
|
||||||
|
field_changes = {
|
||||||
|
k: v for k, v in field_changes.items() if k not in deleted_ids
|
||||||
|
}
|
||||||
|
|
||||||
# Suppress updates for newly created objects (CREATE-only semantics).
|
# Suppress updates for newly created objects (CREATE-only semantics).
|
||||||
if creates and field_changes:
|
if creates and field_changes:
|
||||||
create_ids = {id(o) for o in creates}
|
create_ids = {id(o) for o in creates}
|
||||||
|
|||||||
@@ -14,11 +14,13 @@ from sqlalchemy import (
|
|||||||
DateTime,
|
DateTime,
|
||||||
ForeignKey,
|
ForeignKey,
|
||||||
Integer,
|
Integer,
|
||||||
|
JSON,
|
||||||
Numeric,
|
Numeric,
|
||||||
String,
|
String,
|
||||||
Table,
|
Table,
|
||||||
Uuid,
|
Uuid,
|
||||||
)
|
)
|
||||||
|
from sqlalchemy.dialects.postgresql import ARRAY
|
||||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||||
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
|
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
|
||||||
|
|
||||||
@@ -137,6 +139,17 @@ class Post(Base):
|
|||||||
tags: Mapped[list[Tag]] = relationship(secondary=post_tags)
|
tags: Mapped[list[Tag]] = relationship(secondary=post_tags)
|
||||||
|
|
||||||
|
|
||||||
|
class Article(Base):
|
||||||
|
"""Test article model with ARRAY and JSON columns."""
|
||||||
|
|
||||||
|
__tablename__ = "articles"
|
||||||
|
|
||||||
|
id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)
|
||||||
|
title: Mapped[str] = mapped_column(String(200))
|
||||||
|
labels: Mapped[list[str]] = mapped_column(ARRAY(String))
|
||||||
|
metadata_: Mapped[dict | None] = mapped_column("metadata", JSON, nullable=True)
|
||||||
|
|
||||||
|
|
||||||
class RoleCreate(BaseModel):
|
class RoleCreate(BaseModel):
|
||||||
"""Schema for creating a role."""
|
"""Schema for creating a role."""
|
||||||
|
|
||||||
@@ -271,6 +284,23 @@ class ProductCreate(BaseModel):
|
|||||||
price: decimal.Decimal
|
price: decimal.Decimal
|
||||||
|
|
||||||
|
|
||||||
|
class ArticleCreate(BaseModel):
|
||||||
|
"""Schema for creating an article."""
|
||||||
|
|
||||||
|
id: uuid.UUID | None = None
|
||||||
|
title: str
|
||||||
|
labels: list[str] = []
|
||||||
|
|
||||||
|
|
||||||
|
class ArticleRead(PydanticBase):
|
||||||
|
"""Schema for reading an article."""
|
||||||
|
|
||||||
|
id: uuid.UUID
|
||||||
|
title: str
|
||||||
|
labels: list[str]
|
||||||
|
|
||||||
|
|
||||||
|
ArticleCrud = CrudFactory(Article)
|
||||||
RoleCrud = CrudFactory(Role)
|
RoleCrud = CrudFactory(Role)
|
||||||
RoleCursorCrud = CrudFactory(Role, cursor_column=Role.id)
|
RoleCursorCrud = CrudFactory(Role, cursor_column=Role.id)
|
||||||
IntRoleCursorCrud = CrudFactory(IntRole, cursor_column=IntRole.id)
|
IntRoleCursorCrud = CrudFactory(IntRole, cursor_column=IntRole.id)
|
||||||
|
|||||||
@@ -11,12 +11,17 @@ from fastapi_toolsets.crud import (
|
|||||||
CrudFactory,
|
CrudFactory,
|
||||||
InvalidFacetFilterError,
|
InvalidFacetFilterError,
|
||||||
SearchConfig,
|
SearchConfig,
|
||||||
|
UnsupportedFacetTypeError,
|
||||||
get_searchable_fields,
|
get_searchable_fields,
|
||||||
)
|
)
|
||||||
from fastapi_toolsets.exceptions import InvalidOrderFieldError
|
from fastapi_toolsets.exceptions import InvalidOrderFieldError
|
||||||
from fastapi_toolsets.schemas import OffsetPagination, PaginationType
|
from fastapi_toolsets.schemas import OffsetPagination, PaginationType
|
||||||
|
|
||||||
from .conftest import (
|
from .conftest import (
|
||||||
|
Article,
|
||||||
|
ArticleCreate,
|
||||||
|
ArticleCrud,
|
||||||
|
ArticleRead,
|
||||||
Role,
|
Role,
|
||||||
RoleCreate,
|
RoleCreate,
|
||||||
RoleCrud,
|
RoleCrud,
|
||||||
@@ -902,6 +907,128 @@ class TestFilterBy:
|
|||||||
assert len(result.data) == 1
|
assert len(result.data) == 1
|
||||||
assert result.data[0].username == "alice"
|
assert result.data[0].username == "alice"
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_bool_filter_false(self, db_session: AsyncSession):
|
||||||
|
"""filter_by with a boolean False value correctly filters rows."""
|
||||||
|
UserBoolCrud = CrudFactory(User, facet_fields=[User.is_active])
|
||||||
|
await UserCrud.create(
|
||||||
|
db_session, UserCreate(username="alice", email="a@test.com", is_active=True)
|
||||||
|
)
|
||||||
|
await UserCrud.create(
|
||||||
|
db_session,
|
||||||
|
UserCreate(username="bob", email="b@test.com", is_active=False),
|
||||||
|
)
|
||||||
|
|
||||||
|
result = await UserBoolCrud.offset_paginate(
|
||||||
|
db_session, filter_by={"is_active": False}, schema=UserRead
|
||||||
|
)
|
||||||
|
|
||||||
|
assert isinstance(result.pagination, OffsetPagination)
|
||||||
|
assert result.pagination.total_count == 1
|
||||||
|
assert result.data[0].username == "bob"
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_bool_filter_true(self, db_session: AsyncSession):
|
||||||
|
"""filter_by with a boolean True value correctly filters rows."""
|
||||||
|
UserBoolCrud = CrudFactory(User, facet_fields=[User.is_active])
|
||||||
|
await UserCrud.create(
|
||||||
|
db_session, UserCreate(username="alice", email="a@test.com", is_active=True)
|
||||||
|
)
|
||||||
|
await UserCrud.create(
|
||||||
|
db_session,
|
||||||
|
UserCreate(username="bob", email="b@test.com", is_active=False),
|
||||||
|
)
|
||||||
|
|
||||||
|
result = await UserBoolCrud.offset_paginate(
|
||||||
|
db_session, filter_by={"is_active": True}, schema=UserRead
|
||||||
|
)
|
||||||
|
|
||||||
|
assert isinstance(result.pagination, OffsetPagination)
|
||||||
|
assert result.pagination.total_count == 1
|
||||||
|
assert result.data[0].username == "alice"
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_bool_filter_list(self, db_session: AsyncSession):
|
||||||
|
"""filter_by with a list of booleans produces an IN clause."""
|
||||||
|
UserBoolCrud = CrudFactory(User, facet_fields=[User.is_active])
|
||||||
|
await UserCrud.create(
|
||||||
|
db_session, UserCreate(username="alice", email="a@test.com", is_active=True)
|
||||||
|
)
|
||||||
|
await UserCrud.create(
|
||||||
|
db_session,
|
||||||
|
UserCreate(username="bob", email="b@test.com", is_active=False),
|
||||||
|
)
|
||||||
|
|
||||||
|
result = await UserBoolCrud.offset_paginate(
|
||||||
|
db_session, filter_by={"is_active": [True, False]}, schema=UserRead
|
||||||
|
)
|
||||||
|
|
||||||
|
assert isinstance(result.pagination, OffsetPagination)
|
||||||
|
assert result.pagination.total_count == 2
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_array_contains_single_value(self, db_session: AsyncSession):
|
||||||
|
"""filter_by on an ARRAY column with a scalar checks containment."""
|
||||||
|
ArticleFacetCrud = CrudFactory(Article, facet_fields=[Article.labels])
|
||||||
|
await ArticleCrud.create(
|
||||||
|
db_session, ArticleCreate(title="Post 1", labels=["python", "fastapi"])
|
||||||
|
)
|
||||||
|
await ArticleCrud.create(
|
||||||
|
db_session, ArticleCreate(title="Post 2", labels=["rust", "axum"])
|
||||||
|
)
|
||||||
|
await ArticleCrud.create(
|
||||||
|
db_session, ArticleCreate(title="Post 3", labels=["python", "django"])
|
||||||
|
)
|
||||||
|
|
||||||
|
result = await ArticleFacetCrud.offset_paginate(
|
||||||
|
db_session, filter_by={"labels": "python"}, schema=ArticleRead
|
||||||
|
)
|
||||||
|
|
||||||
|
assert isinstance(result.pagination, OffsetPagination)
|
||||||
|
assert result.pagination.total_count == 2
|
||||||
|
titles = {a.title for a in result.data}
|
||||||
|
assert titles == {"Post 1", "Post 3"}
|
||||||
|
# facet returns individual unnested values, not whole arrays
|
||||||
|
assert result.filter_attributes == {"labels": ["django", "fastapi", "python"]}
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_array_overlap_list_value(self, db_session: AsyncSession):
|
||||||
|
"""filter_by on an ARRAY column with a list checks overlap."""
|
||||||
|
ArticleFacetCrud = CrudFactory(Article, facet_fields=[Article.labels])
|
||||||
|
await ArticleCrud.create(
|
||||||
|
db_session, ArticleCreate(title="Post 1", labels=["python", "fastapi"])
|
||||||
|
)
|
||||||
|
await ArticleCrud.create(
|
||||||
|
db_session, ArticleCreate(title="Post 2", labels=["rust", "axum"])
|
||||||
|
)
|
||||||
|
await ArticleCrud.create(
|
||||||
|
db_session, ArticleCreate(title="Post 3", labels=["python", "django"])
|
||||||
|
)
|
||||||
|
|
||||||
|
result = await ArticleFacetCrud.offset_paginate(
|
||||||
|
db_session, filter_by={"labels": ["rust", "django"]}, schema=ArticleRead
|
||||||
|
)
|
||||||
|
|
||||||
|
assert isinstance(result.pagination, OffsetPagination)
|
||||||
|
assert result.pagination.total_count == 2
|
||||||
|
titles = {a.title for a in result.data}
|
||||||
|
assert titles == {"Post 2", "Post 3"}
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_unsupported_column_type_raises(self, db_session: AsyncSession):
|
||||||
|
"""filter_by on a JSON column raises UnsupportedFacetTypeError."""
|
||||||
|
ArticleJsonCrud = CrudFactory(Article, facet_fields=[Article.metadata_])
|
||||||
|
|
||||||
|
with pytest.raises(UnsupportedFacetTypeError) as exc_info:
|
||||||
|
await ArticleJsonCrud.offset_paginate(
|
||||||
|
db_session,
|
||||||
|
filter_by={"metadata_": {"key": "value"}},
|
||||||
|
schema=ArticleRead,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert exc_info.value.key == "metadata_"
|
||||||
|
assert "JSON" in exc_info.value.col_type
|
||||||
|
|
||||||
|
|
||||||
class TestFilterParamsSchema:
|
class TestFilterParamsSchema:
|
||||||
"""Tests for AsyncCrud.filter_params()."""
|
"""Tests for AsyncCrud.filter_params()."""
|
||||||
|
|||||||
@@ -1041,6 +1041,25 @@ class TestTransientObject:
|
|||||||
assert len(creates) == 1
|
assert len(creates) == 1
|
||||||
assert len(deletes) == 1
|
assert len(deletes) == 1
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_update_then_delete_suppresses_update_callback(self, mixin_session):
|
||||||
|
"""UPDATE callback is suppressed when the object is also deleted in the same transaction."""
|
||||||
|
obj = WatchedModel(status="initial", other="x")
|
||||||
|
mixin_session.add(obj)
|
||||||
|
await mixin_session.commit()
|
||||||
|
|
||||||
|
_test_events.clear()
|
||||||
|
|
||||||
|
obj.status = "changed"
|
||||||
|
await mixin_session.flush()
|
||||||
|
await mixin_session.delete(obj)
|
||||||
|
await mixin_session.commit()
|
||||||
|
|
||||||
|
updates = [e for e in _test_events if e["event"] == "update"]
|
||||||
|
deletes = [e for e in _test_events if e["event"] == "delete"]
|
||||||
|
assert updates == []
|
||||||
|
assert len(deletes) == 1
|
||||||
|
|
||||||
|
|
||||||
class TestPolymorphism:
|
class TestPolymorphism:
|
||||||
"""Event dispatch with STI (Single Table Inheritance)."""
|
"""Event dispatch with STI (Single Table Inheritance)."""
|
||||||
|
|||||||
Reference in New Issue
Block a user