Files
sqlalchemy-pgview/tests/test_auto_refresh.py
2026-02-08 10:09:48 +01:00

370 lines
12 KiB
Python

"""Tests for auto-refresh functionality."""
from __future__ import annotations
import os
from decimal import Decimal
from typing import TYPE_CHECKING
import pytest
from sqlalchemy import (
Column,
Integer,
MetaData,
Numeric,
String,
Table,
create_engine,
func,
insert,
select,
text,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column
from sqlalchemy_pgview import AutoRefreshContext, MaterializedView
from sqlalchemy_pgview.ddl import DropMaterializedView
if TYPE_CHECKING:
from sqlalchemy.engine import Engine
@pytest.fixture
def pg_engine() -> Engine:
"""Create a PostgreSQL engine for testing."""
url = os.environ.get("POSTGRES_URL", "postgresql://test:test@localhost:5432/testdb")
return create_engine(url)
class TestAutoRefreshContext:
"""Tests for AutoRefreshContext (Core usage)."""
def test_auto_refresh_context_refreshes_on_exit(self, pg_engine: Engine) -> None:
"""Test that AutoRefreshContext refreshes view on successful exit."""
metadata = MetaData()
orders = Table(
"test_orders_arc",
metadata,
Column("id", Integer, primary_key=True),
Column("total", Numeric(10, 2)),
)
order_stats = MaterializedView(
"test_order_stats_arc",
select(
func.count(orders.c.id).label("order_count"),
func.sum(orders.c.total).label("total_revenue"),
),
metadata=metadata,
)
try:
metadata.create_all(pg_engine)
with pg_engine.begin() as conn:
# Initial state - empty
result = conn.execute(select(order_stats.as_table())).fetchone()
assert result.order_count == 0
# Use AutoRefreshContext to auto-refresh after changes
with AutoRefreshContext(conn, order_stats, orders):
conn.execute(insert(orders).values(id=1, total=100))
conn.execute(insert(orders).values(id=2, total=200))
# View is refreshed here
# Check the view was refreshed
result = conn.execute(select(order_stats.as_table())).fetchone()
assert result.order_count == 2
assert result.total_revenue == Decimal("300")
finally:
with pg_engine.begin() as conn:
conn.execute(DropMaterializedView(order_stats, if_exists=True))
metadata.drop_all(pg_engine)
def test_auto_refresh_context_no_refresh_on_exception(
self, pg_engine: Engine
) -> None:
"""Test that AutoRefreshContext doesn't refresh on exception."""
metadata = MetaData()
items = Table(
"test_items_arc",
metadata,
Column("id", Integer, primary_key=True),
Column("value", Integer),
)
item_stats = MaterializedView(
"test_item_stats_arc",
select(func.count(items.c.id).label("item_count")),
metadata=metadata,
)
try:
metadata.create_all(pg_engine)
with pg_engine.begin() as conn:
# Insert initial data and refresh
conn.execute(insert(items).values(id=1, value=10))
item_stats.refresh(conn)
result = conn.execute(select(item_stats.as_table())).fetchone()
assert result.item_count == 1
# Try with exception - should not refresh
try:
with pg_engine.begin() as conn, AutoRefreshContext(conn, item_stats, items):
conn.execute(insert(items).values(id=2, value=20))
raise ValueError("Simulated error")
except ValueError:
pass
# View should still show old data (not refreshed due to exception)
with pg_engine.connect() as conn:
result = conn.execute(select(item_stats.as_table())).fetchone()
# Note: The insert was rolled back, so count is still 1
assert result.item_count == 1
finally:
with pg_engine.begin() as conn:
conn.execute(DropMaterializedView(item_stats, if_exists=True))
metadata.drop_all(pg_engine)
class TestAutoRefreshORM:
"""Tests for auto_refresh_on with ORM."""
def test_auto_refresh_on_commit(self, pg_engine: Engine) -> None:
"""Test that materialized view refreshes after ORM commit."""
class Base(DeclarativeBase):
pass
class Product(Base):
__tablename__ = "test_products_ar"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
price: Mapped[Decimal] = mapped_column(Numeric(10, 2))
metadata = Base.metadata
# Create MV based on products table
product_stats = MaterializedView(
"test_product_stats_ar",
select(
func.count(Product.id).label("product_count"),
func.avg(Product.price).label("avg_price"),
),
metadata=metadata,
)
# Create a custom Session class for this test
class TestSession(Session):
pass
# Enable auto-refresh
product_stats.auto_refresh_on(TestSession, Product.__table__)
try:
metadata.create_all(pg_engine)
# Initial state
with pg_engine.connect() as conn:
result = conn.execute(select(product_stats.as_table())).fetchone()
assert result.product_count == 0
# Add product via ORM
with TestSession(pg_engine) as session:
session.add(Product(id=1, name="Widget", price=Decimal("19.99")))
session.commit() # Should trigger refresh
# Check view was refreshed
with pg_engine.connect() as conn:
result = conn.execute(select(product_stats.as_table())).fetchone()
assert result.product_count == 1
assert result.avg_price == Decimal("19.99")
# Add more products
with TestSession(pg_engine) as session:
session.add(Product(id=2, name="Gadget", price=Decimal("29.99")))
session.add(Product(id=3, name="Gizmo", price=Decimal("9.99")))
session.commit()
# Check view updated
with pg_engine.connect() as conn:
result = conn.execute(select(product_stats.as_table())).fetchone()
assert result.product_count == 3
finally:
with pg_engine.begin() as conn:
conn.execute(DropMaterializedView(product_stats, if_exists=True))
metadata.drop_all(pg_engine)
def test_auto_refresh_on_update(self, pg_engine: Engine) -> None:
"""Test that materialized view refreshes after ORM update."""
class Base(DeclarativeBase):
pass
class Counter(Base):
__tablename__ = "test_counters_ar"
id: Mapped[int] = mapped_column(primary_key=True)
value: Mapped[int] = mapped_column(Integer)
metadata = Base.metadata
counter_sum = MaterializedView(
"test_counter_sum_ar",
select(func.sum(Counter.value).label("total")),
metadata=metadata,
)
class TestSession(Session):
pass
counter_sum.auto_refresh_on(TestSession, Counter.__table__)
try:
metadata.create_all(pg_engine)
# Add initial data
with TestSession(pg_engine) as session:
session.add(Counter(id=1, value=10))
session.add(Counter(id=2, value=20))
session.commit()
with pg_engine.connect() as conn:
result = conn.execute(select(counter_sum.as_table())).fetchone()
assert result.total == 30
# Update via ORM
with TestSession(pg_engine) as session:
counter = session.get(Counter, 1)
counter.value = 50
session.commit()
with pg_engine.connect() as conn:
result = conn.execute(select(counter_sum.as_table())).fetchone()
assert result.total == 70 # 50 + 20
finally:
with pg_engine.begin() as conn:
conn.execute(DropMaterializedView(counter_sum, if_exists=True))
metadata.drop_all(pg_engine)
def test_auto_refresh_on_delete(self, pg_engine: Engine) -> None:
"""Test that materialized view refreshes after ORM delete."""
class Base(DeclarativeBase):
pass
class Item(Base):
__tablename__ = "test_items_ar_del"
id: Mapped[int] = mapped_column(primary_key=True)
metadata = Base.metadata
item_count = MaterializedView(
"test_item_count_ar",
select(func.count(Item.id).label("count")),
metadata=metadata,
)
class TestSession(Session):
pass
item_count.auto_refresh_on(TestSession, Item.__table__)
try:
metadata.create_all(pg_engine)
# Add initial data
with TestSession(pg_engine) as session:
session.add(Item(id=1))
session.add(Item(id=2))
session.add(Item(id=3))
session.commit()
with pg_engine.connect() as conn:
result = conn.execute(select(item_count.as_table())).fetchone()
assert result.count == 3
# Delete via ORM
with TestSession(pg_engine) as session:
item = session.get(Item, 2)
session.delete(item)
session.commit()
with pg_engine.connect() as conn:
result = conn.execute(select(item_count.as_table())).fetchone()
assert result.count == 2
finally:
with pg_engine.begin() as conn:
conn.execute(DropMaterializedView(item_count, if_exists=True))
metadata.drop_all(pg_engine)
def test_no_refresh_on_unrelated_table(self, pg_engine: Engine) -> None:
"""Test that unrelated table changes don't trigger refresh."""
class Base(DeclarativeBase):
pass
class TableA(Base):
__tablename__ = "test_table_a_ar"
id: Mapped[int] = mapped_column(primary_key=True)
value: Mapped[int] = mapped_column(Integer)
class TableB(Base):
__tablename__ = "test_table_b_ar"
id: Mapped[int] = mapped_column(primary_key=True)
metadata = Base.metadata
# MV only watches TableA
table_a_sum = MaterializedView(
"test_table_a_sum_ar",
select(func.sum(TableA.value).label("total")),
metadata=metadata,
)
class TestSession(Session):
pass
# Only watch TableA
table_a_sum.auto_refresh_on(TestSession, TableA.__table__)
try:
metadata.create_all(pg_engine)
# Add to TableA
with TestSession(pg_engine) as session:
session.add(TableA(id=1, value=100))
session.commit()
with pg_engine.connect() as conn:
result = conn.execute(select(table_a_sum.as_table())).fetchone()
assert result.total == 100
# Add to TableB (should not trigger refresh)
# First, manually make the view stale
with pg_engine.begin() as conn:
conn.execute(text("INSERT INTO test_table_a_ar (id, value) VALUES (2, 200)"))
with TestSession(pg_engine) as session:
session.add(TableB(id=1))
session.commit()
# View should still show stale data (100, not 300)
# because TableB changes don't trigger refresh
with pg_engine.connect() as conn:
result = conn.execute(select(table_a_sum.as_table())).fetchone()
assert result.total == 100 # Still stale
finally:
with pg_engine.begin() as conn:
conn.execute(DropMaterializedView(table_a_sum, if_exists=True))
metadata.drop_all(pg_engine)