mirror of
https://github.com/d3vyce/sqlalchemy-pgview.git
synced 2026-03-01 18:00:47 +01:00
370 lines
12 KiB
Python
370 lines
12 KiB
Python
"""Tests for auto-refresh functionality."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from decimal import Decimal
|
|
from typing import TYPE_CHECKING
|
|
|
|
import pytest
|
|
from sqlalchemy import (
|
|
Column,
|
|
Integer,
|
|
MetaData,
|
|
Numeric,
|
|
String,
|
|
Table,
|
|
create_engine,
|
|
func,
|
|
insert,
|
|
select,
|
|
text,
|
|
)
|
|
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column
|
|
|
|
from sqlalchemy_pgview import AutoRefreshContext, MaterializedView
|
|
from sqlalchemy_pgview.ddl import DropMaterializedView
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlalchemy.engine import Engine
|
|
|
|
|
|
@pytest.fixture
|
|
def pg_engine() -> Engine:
|
|
"""Create a PostgreSQL engine for testing."""
|
|
url = os.environ.get("POSTGRES_URL", "postgresql://test:test@localhost:5432/testdb")
|
|
return create_engine(url)
|
|
|
|
|
|
class TestAutoRefreshContext:
|
|
"""Tests for AutoRefreshContext (Core usage)."""
|
|
|
|
def test_auto_refresh_context_refreshes_on_exit(self, pg_engine: Engine) -> None:
|
|
"""Test that AutoRefreshContext refreshes view on successful exit."""
|
|
metadata = MetaData()
|
|
|
|
orders = Table(
|
|
"test_orders_arc",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("total", Numeric(10, 2)),
|
|
)
|
|
|
|
order_stats = MaterializedView(
|
|
"test_order_stats_arc",
|
|
select(
|
|
func.count(orders.c.id).label("order_count"),
|
|
func.sum(orders.c.total).label("total_revenue"),
|
|
),
|
|
metadata=metadata,
|
|
)
|
|
|
|
try:
|
|
metadata.create_all(pg_engine)
|
|
|
|
with pg_engine.begin() as conn:
|
|
# Initial state - empty
|
|
result = conn.execute(select(order_stats.as_table())).fetchone()
|
|
assert result.order_count == 0
|
|
|
|
# Use AutoRefreshContext to auto-refresh after changes
|
|
with AutoRefreshContext(conn, order_stats, orders):
|
|
conn.execute(insert(orders).values(id=1, total=100))
|
|
conn.execute(insert(orders).values(id=2, total=200))
|
|
# View is refreshed here
|
|
|
|
# Check the view was refreshed
|
|
result = conn.execute(select(order_stats.as_table())).fetchone()
|
|
assert result.order_count == 2
|
|
assert result.total_revenue == Decimal("300")
|
|
|
|
finally:
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(DropMaterializedView(order_stats, if_exists=True))
|
|
metadata.drop_all(pg_engine)
|
|
|
|
def test_auto_refresh_context_no_refresh_on_exception(
|
|
self, pg_engine: Engine
|
|
) -> None:
|
|
"""Test that AutoRefreshContext doesn't refresh on exception."""
|
|
metadata = MetaData()
|
|
|
|
items = Table(
|
|
"test_items_arc",
|
|
metadata,
|
|
Column("id", Integer, primary_key=True),
|
|
Column("value", Integer),
|
|
)
|
|
|
|
item_stats = MaterializedView(
|
|
"test_item_stats_arc",
|
|
select(func.count(items.c.id).label("item_count")),
|
|
metadata=metadata,
|
|
)
|
|
|
|
try:
|
|
metadata.create_all(pg_engine)
|
|
|
|
with pg_engine.begin() as conn:
|
|
# Insert initial data and refresh
|
|
conn.execute(insert(items).values(id=1, value=10))
|
|
item_stats.refresh(conn)
|
|
|
|
result = conn.execute(select(item_stats.as_table())).fetchone()
|
|
assert result.item_count == 1
|
|
|
|
# Try with exception - should not refresh
|
|
try:
|
|
with pg_engine.begin() as conn, AutoRefreshContext(conn, item_stats, items):
|
|
conn.execute(insert(items).values(id=2, value=20))
|
|
raise ValueError("Simulated error")
|
|
except ValueError:
|
|
pass
|
|
|
|
# View should still show old data (not refreshed due to exception)
|
|
with pg_engine.connect() as conn:
|
|
result = conn.execute(select(item_stats.as_table())).fetchone()
|
|
# Note: The insert was rolled back, so count is still 1
|
|
assert result.item_count == 1
|
|
|
|
finally:
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(DropMaterializedView(item_stats, if_exists=True))
|
|
metadata.drop_all(pg_engine)
|
|
|
|
|
|
class TestAutoRefreshORM:
|
|
"""Tests for auto_refresh_on with ORM."""
|
|
|
|
def test_auto_refresh_on_commit(self, pg_engine: Engine) -> None:
|
|
"""Test that materialized view refreshes after ORM commit."""
|
|
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
class Product(Base):
|
|
__tablename__ = "test_products_ar"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
name: Mapped[str] = mapped_column(String(100))
|
|
price: Mapped[Decimal] = mapped_column(Numeric(10, 2))
|
|
|
|
metadata = Base.metadata
|
|
|
|
# Create MV based on products table
|
|
product_stats = MaterializedView(
|
|
"test_product_stats_ar",
|
|
select(
|
|
func.count(Product.id).label("product_count"),
|
|
func.avg(Product.price).label("avg_price"),
|
|
),
|
|
metadata=metadata,
|
|
)
|
|
|
|
# Create a custom Session class for this test
|
|
class TestSession(Session):
|
|
pass
|
|
|
|
# Enable auto-refresh
|
|
product_stats.auto_refresh_on(TestSession, Product.__table__)
|
|
|
|
try:
|
|
metadata.create_all(pg_engine)
|
|
|
|
# Initial state
|
|
with pg_engine.connect() as conn:
|
|
result = conn.execute(select(product_stats.as_table())).fetchone()
|
|
assert result.product_count == 0
|
|
|
|
# Add product via ORM
|
|
with TestSession(pg_engine) as session:
|
|
session.add(Product(id=1, name="Widget", price=Decimal("19.99")))
|
|
session.commit() # Should trigger refresh
|
|
|
|
# Check view was refreshed
|
|
with pg_engine.connect() as conn:
|
|
result = conn.execute(select(product_stats.as_table())).fetchone()
|
|
assert result.product_count == 1
|
|
assert result.avg_price == Decimal("19.99")
|
|
|
|
# Add more products
|
|
with TestSession(pg_engine) as session:
|
|
session.add(Product(id=2, name="Gadget", price=Decimal("29.99")))
|
|
session.add(Product(id=3, name="Gizmo", price=Decimal("9.99")))
|
|
session.commit()
|
|
|
|
# Check view updated
|
|
with pg_engine.connect() as conn:
|
|
result = conn.execute(select(product_stats.as_table())).fetchone()
|
|
assert result.product_count == 3
|
|
|
|
finally:
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(DropMaterializedView(product_stats, if_exists=True))
|
|
metadata.drop_all(pg_engine)
|
|
|
|
def test_auto_refresh_on_update(self, pg_engine: Engine) -> None:
|
|
"""Test that materialized view refreshes after ORM update."""
|
|
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
class Counter(Base):
|
|
__tablename__ = "test_counters_ar"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
value: Mapped[int] = mapped_column(Integer)
|
|
|
|
metadata = Base.metadata
|
|
|
|
counter_sum = MaterializedView(
|
|
"test_counter_sum_ar",
|
|
select(func.sum(Counter.value).label("total")),
|
|
metadata=metadata,
|
|
)
|
|
|
|
class TestSession(Session):
|
|
pass
|
|
|
|
counter_sum.auto_refresh_on(TestSession, Counter.__table__)
|
|
|
|
try:
|
|
metadata.create_all(pg_engine)
|
|
|
|
# Add initial data
|
|
with TestSession(pg_engine) as session:
|
|
session.add(Counter(id=1, value=10))
|
|
session.add(Counter(id=2, value=20))
|
|
session.commit()
|
|
|
|
with pg_engine.connect() as conn:
|
|
result = conn.execute(select(counter_sum.as_table())).fetchone()
|
|
assert result.total == 30
|
|
|
|
# Update via ORM
|
|
with TestSession(pg_engine) as session:
|
|
counter = session.get(Counter, 1)
|
|
counter.value = 50
|
|
session.commit()
|
|
|
|
with pg_engine.connect() as conn:
|
|
result = conn.execute(select(counter_sum.as_table())).fetchone()
|
|
assert result.total == 70 # 50 + 20
|
|
|
|
finally:
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(DropMaterializedView(counter_sum, if_exists=True))
|
|
metadata.drop_all(pg_engine)
|
|
|
|
def test_auto_refresh_on_delete(self, pg_engine: Engine) -> None:
|
|
"""Test that materialized view refreshes after ORM delete."""
|
|
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
class Item(Base):
|
|
__tablename__ = "test_items_ar_del"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
|
|
metadata = Base.metadata
|
|
|
|
item_count = MaterializedView(
|
|
"test_item_count_ar",
|
|
select(func.count(Item.id).label("count")),
|
|
metadata=metadata,
|
|
)
|
|
|
|
class TestSession(Session):
|
|
pass
|
|
|
|
item_count.auto_refresh_on(TestSession, Item.__table__)
|
|
|
|
try:
|
|
metadata.create_all(pg_engine)
|
|
|
|
# Add initial data
|
|
with TestSession(pg_engine) as session:
|
|
session.add(Item(id=1))
|
|
session.add(Item(id=2))
|
|
session.add(Item(id=3))
|
|
session.commit()
|
|
|
|
with pg_engine.connect() as conn:
|
|
result = conn.execute(select(item_count.as_table())).fetchone()
|
|
assert result.count == 3
|
|
|
|
# Delete via ORM
|
|
with TestSession(pg_engine) as session:
|
|
item = session.get(Item, 2)
|
|
session.delete(item)
|
|
session.commit()
|
|
|
|
with pg_engine.connect() as conn:
|
|
result = conn.execute(select(item_count.as_table())).fetchone()
|
|
assert result.count == 2
|
|
|
|
finally:
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(DropMaterializedView(item_count, if_exists=True))
|
|
metadata.drop_all(pg_engine)
|
|
|
|
def test_no_refresh_on_unrelated_table(self, pg_engine: Engine) -> None:
|
|
"""Test that unrelated table changes don't trigger refresh."""
|
|
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
class TableA(Base):
|
|
__tablename__ = "test_table_a_ar"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
value: Mapped[int] = mapped_column(Integer)
|
|
|
|
class TableB(Base):
|
|
__tablename__ = "test_table_b_ar"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
|
|
metadata = Base.metadata
|
|
|
|
# MV only watches TableA
|
|
table_a_sum = MaterializedView(
|
|
"test_table_a_sum_ar",
|
|
select(func.sum(TableA.value).label("total")),
|
|
metadata=metadata,
|
|
)
|
|
|
|
class TestSession(Session):
|
|
pass
|
|
|
|
# Only watch TableA
|
|
table_a_sum.auto_refresh_on(TestSession, TableA.__table__)
|
|
|
|
try:
|
|
metadata.create_all(pg_engine)
|
|
|
|
# Add to TableA
|
|
with TestSession(pg_engine) as session:
|
|
session.add(TableA(id=1, value=100))
|
|
session.commit()
|
|
|
|
with pg_engine.connect() as conn:
|
|
result = conn.execute(select(table_a_sum.as_table())).fetchone()
|
|
assert result.total == 100
|
|
|
|
# Add to TableB (should not trigger refresh)
|
|
# First, manually make the view stale
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(text("INSERT INTO test_table_a_ar (id, value) VALUES (2, 200)"))
|
|
|
|
with TestSession(pg_engine) as session:
|
|
session.add(TableB(id=1))
|
|
session.commit()
|
|
|
|
# View should still show stale data (100, not 300)
|
|
# because TableB changes don't trigger refresh
|
|
with pg_engine.connect() as conn:
|
|
result = conn.execute(select(table_a_sum.as_table())).fetchone()
|
|
assert result.total == 100 # Still stale
|
|
|
|
finally:
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(DropMaterializedView(table_a_sum, if_exists=True))
|
|
metadata.drop_all(pg_engine)
|