mirror of
https://github.com/d3vyce/sqlalchemy-pgview.git
synced 2026-03-01 19:50:46 +01:00
449 lines
16 KiB
Python
449 lines
16 KiB
Python
"""Tests for views with one-to-many and many-to-many relationships."""
|
|
|
|
from decimal import Decimal
|
|
|
|
from sqlalchemy import Table, func, select
|
|
from sqlalchemy.engine import Engine
|
|
|
|
from sqlalchemy_pgview import (
|
|
CreateMaterializedView,
|
|
CreateView,
|
|
DropMaterializedView,
|
|
DropView,
|
|
MaterializedView,
|
|
View,
|
|
)
|
|
|
|
|
|
class TestOneToManyViews:
|
|
"""Tests for views based on one-to-many relationships."""
|
|
|
|
def test_author_book_count_view(
|
|
self, pg_engine: Engine, pg_one_to_many_tables: dict[str, Table]
|
|
) -> None:
|
|
"""Test view that counts books per author (1:N aggregation)."""
|
|
authors = pg_one_to_many_tables["authors"]
|
|
books = pg_one_to_many_tables["books"]
|
|
|
|
# Create view: author with book count
|
|
author_stats = View(
|
|
"author_book_stats",
|
|
select(
|
|
authors.c.id,
|
|
authors.c.name,
|
|
authors.c.country,
|
|
func.count(books.c.id).label("book_count"),
|
|
func.coalesce(func.sum(books.c.price), 0).label("total_price"),
|
|
)
|
|
.select_from(authors.outerjoin(books, authors.c.id == books.c.author_id))
|
|
.group_by(authors.c.id, authors.c.name, authors.c.country),
|
|
)
|
|
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(CreateView(author_stats, or_replace=True))
|
|
|
|
# Query the view
|
|
result = conn.execute(
|
|
select(author_stats.as_table()).order_by(author_stats.as_table().c.name)
|
|
).fetchall()
|
|
|
|
assert len(result) == 3
|
|
|
|
# Gabriel Garcia Marquez - 1 book
|
|
assert result[0].name == "Gabriel Garcia Marquez"
|
|
assert result[0].book_count == 1
|
|
|
|
# George Orwell - 2 books
|
|
assert result[1].name == "George Orwell"
|
|
assert result[1].book_count == 2
|
|
|
|
# Haruki Murakami - 2 books
|
|
assert result[2].name == "Haruki Murakami"
|
|
assert result[2].book_count == 2
|
|
|
|
conn.execute(DropView(author_stats, if_exists=True))
|
|
|
|
def test_book_review_stats_view(
|
|
self, pg_engine: Engine, pg_one_to_many_tables: dict[str, Table]
|
|
) -> None:
|
|
"""Test view with nested 1:N (books -> reviews) aggregation."""
|
|
books = pg_one_to_many_tables["books"]
|
|
reviews = pg_one_to_many_tables["reviews"]
|
|
|
|
# Create view: book with review stats
|
|
book_review_stats = View(
|
|
"book_review_stats",
|
|
select(
|
|
books.c.id,
|
|
books.c.title,
|
|
books.c.price,
|
|
func.count(reviews.c.id).label("review_count"),
|
|
func.coalesce(func.avg(reviews.c.rating), 0).label("avg_rating"),
|
|
)
|
|
.select_from(books.outerjoin(reviews, books.c.id == reviews.c.book_id))
|
|
.group_by(books.c.id, books.c.title, books.c.price),
|
|
)
|
|
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(CreateView(book_review_stats, or_replace=True))
|
|
|
|
result = conn.execute(
|
|
select(book_review_stats.as_table()).order_by(
|
|
book_review_stats.as_table().c.review_count.desc()
|
|
)
|
|
).fetchall()
|
|
|
|
assert len(result) == 5
|
|
|
|
# 1984 has 3 reviews
|
|
assert result[0].title == "1984"
|
|
assert result[0].review_count == 3
|
|
assert float(result[0].avg_rating) > 4.0
|
|
|
|
conn.execute(DropView(book_review_stats, if_exists=True))
|
|
|
|
def test_chained_one_to_many_view(
|
|
self, pg_engine: Engine, pg_one_to_many_tables: dict[str, Table]
|
|
) -> None:
|
|
"""Test view spanning multiple 1:N relationships (author -> books -> reviews)."""
|
|
authors = pg_one_to_many_tables["authors"]
|
|
books = pg_one_to_many_tables["books"]
|
|
reviews = pg_one_to_many_tables["reviews"]
|
|
|
|
# Create view: author with aggregated review stats across all their books
|
|
author_review_summary = View(
|
|
"author_review_summary",
|
|
select(
|
|
authors.c.id,
|
|
authors.c.name,
|
|
func.count(reviews.c.id).label("total_reviews"),
|
|
func.coalesce(func.avg(reviews.c.rating), 0).label("avg_rating"),
|
|
)
|
|
.select_from(
|
|
authors.outerjoin(books, authors.c.id == books.c.author_id).outerjoin(
|
|
reviews, books.c.id == reviews.c.book_id
|
|
)
|
|
)
|
|
.group_by(authors.c.id, authors.c.name),
|
|
)
|
|
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(CreateView(author_review_summary, or_replace=True))
|
|
|
|
result = conn.execute(
|
|
select(author_review_summary.as_table()).order_by(
|
|
author_review_summary.as_table().c.total_reviews.desc()
|
|
)
|
|
).fetchall()
|
|
|
|
assert len(result) == 3
|
|
|
|
# George Orwell has most reviews (1984: 3 + Animal Farm: 1 = 4)
|
|
assert result[0].name == "George Orwell"
|
|
assert result[0].total_reviews == 4
|
|
|
|
conn.execute(DropView(author_review_summary, if_exists=True))
|
|
|
|
def test_one_to_many_materialized_view(
|
|
self, pg_engine: Engine, pg_one_to_many_tables: dict[str, Table]
|
|
) -> None:
|
|
"""Test materialized view with 1:N relationship."""
|
|
authors = pg_one_to_many_tables["authors"]
|
|
books = pg_one_to_many_tables["books"]
|
|
|
|
# Create materialized view for author statistics
|
|
author_stats_mv = MaterializedView(
|
|
"author_stats_mv",
|
|
select(
|
|
authors.c.id,
|
|
authors.c.name,
|
|
func.count(books.c.id).label("book_count"),
|
|
)
|
|
.select_from(authors.outerjoin(books, authors.c.id == books.c.author_id))
|
|
.group_by(authors.c.id, authors.c.name),
|
|
with_data=True,
|
|
)
|
|
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(CreateMaterializedView(author_stats_mv))
|
|
|
|
result = conn.execute(
|
|
select(author_stats_mv.as_table()).order_by(
|
|
author_stats_mv.as_table().c.book_count.desc()
|
|
)
|
|
).fetchall()
|
|
|
|
assert len(result) == 3
|
|
assert result[0].book_count == 2 # Orwell or Murakami
|
|
|
|
conn.execute(DropMaterializedView(author_stats_mv, if_exists=True))
|
|
|
|
|
|
class TestManyToManyViews:
|
|
"""Tests for views based on many-to-many relationships."""
|
|
|
|
def test_student_course_count_view(
|
|
self, pg_engine: Engine, pg_many_to_many_tables: dict[str, Table]
|
|
) -> None:
|
|
"""Test view counting courses per student (M:N aggregation)."""
|
|
students = pg_many_to_many_tables["students"]
|
|
student_courses = pg_many_to_many_tables["student_courses"]
|
|
|
|
# Create view: student with course count and GPA
|
|
student_stats = View(
|
|
"student_stats",
|
|
select(
|
|
students.c.id,
|
|
students.c.name,
|
|
students.c.email,
|
|
func.count(student_courses.c.course_id).label("course_count"),
|
|
func.coalesce(func.avg(student_courses.c.grade), 0).label("gpa"),
|
|
)
|
|
.select_from(
|
|
students.outerjoin(
|
|
student_courses, students.c.id == student_courses.c.student_id
|
|
)
|
|
)
|
|
.group_by(students.c.id, students.c.name, students.c.email),
|
|
)
|
|
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(CreateView(student_stats, or_replace=True))
|
|
|
|
result = conn.execute(
|
|
select(student_stats.as_table()).order_by(
|
|
student_stats.as_table().c.course_count.desc()
|
|
)
|
|
).fetchall()
|
|
|
|
assert len(result) == 3
|
|
|
|
# Alice is enrolled in 3 courses
|
|
assert result[0].name == "Alice"
|
|
assert result[0].course_count == 3
|
|
|
|
# Bob is enrolled in 2 courses
|
|
assert result[1].name == "Bob"
|
|
assert result[1].course_count == 2
|
|
|
|
# Charlie is enrolled in 1 course
|
|
assert result[2].name == "Charlie"
|
|
assert result[2].course_count == 1
|
|
|
|
conn.execute(DropView(student_stats, if_exists=True))
|
|
|
|
def test_course_enrollment_view(
|
|
self, pg_engine: Engine, pg_many_to_many_tables: dict[str, Table]
|
|
) -> None:
|
|
"""Test view counting students per course (reverse M:N)."""
|
|
courses = pg_many_to_many_tables["courses"]
|
|
student_courses = pg_many_to_many_tables["student_courses"]
|
|
|
|
# Create view: course with enrollment count
|
|
course_enrollment = View(
|
|
"course_enrollment",
|
|
select(
|
|
courses.c.id,
|
|
courses.c.name,
|
|
courses.c.credits,
|
|
func.count(student_courses.c.student_id).label("student_count"),
|
|
func.coalesce(func.avg(student_courses.c.grade), 0).label("avg_grade"),
|
|
)
|
|
.select_from(
|
|
courses.outerjoin(
|
|
student_courses, courses.c.id == student_courses.c.course_id
|
|
)
|
|
)
|
|
.group_by(courses.c.id, courses.c.name, courses.c.credits),
|
|
)
|
|
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(CreateView(course_enrollment, or_replace=True))
|
|
|
|
result = conn.execute(
|
|
select(course_enrollment.as_table()).order_by(
|
|
course_enrollment.as_table().c.name
|
|
)
|
|
).fetchall()
|
|
|
|
assert len(result) == 3
|
|
|
|
# Database Systems - 2 students (Alice, Bob)
|
|
assert result[0].name == "Database Systems"
|
|
assert result[0].student_count == 2
|
|
|
|
# Machine Learning - 2 students (Alice, Bob)
|
|
assert result[1].name == "Machine Learning"
|
|
assert result[1].student_count == 2
|
|
|
|
# Web Development - 2 students (Alice, Charlie)
|
|
assert result[2].name == "Web Development"
|
|
assert result[2].student_count == 2
|
|
|
|
conn.execute(DropView(course_enrollment, if_exists=True))
|
|
|
|
def test_course_tags_view(
|
|
self, pg_engine: Engine, pg_many_to_many_tables: dict[str, Table]
|
|
) -> None:
|
|
"""Test view joining through M:N to aggregate tags."""
|
|
courses = pg_many_to_many_tables["courses"]
|
|
course_tags = pg_many_to_many_tables["course_tags"]
|
|
tags = pg_many_to_many_tables["tags"]
|
|
|
|
# Create view: course with concatenated tag names
|
|
course_with_tags = View(
|
|
"course_with_tags",
|
|
select(
|
|
courses.c.id,
|
|
courses.c.name,
|
|
func.count(tags.c.id).label("tag_count"),
|
|
func.string_agg(tags.c.name, ", ").label("tag_names"),
|
|
)
|
|
.select_from(
|
|
courses.outerjoin(course_tags, courses.c.id == course_tags.c.course_id).outerjoin(
|
|
tags, course_tags.c.tag_id == tags.c.id
|
|
)
|
|
)
|
|
.group_by(courses.c.id, courses.c.name),
|
|
)
|
|
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(CreateView(course_with_tags, or_replace=True))
|
|
|
|
result = conn.execute(
|
|
select(course_with_tags.as_table()).order_by(
|
|
course_with_tags.as_table().c.tag_count.desc()
|
|
)
|
|
).fetchall()
|
|
|
|
assert len(result) == 3
|
|
|
|
# Machine Learning has 2 tags (data, ai)
|
|
assert result[0].name == "Machine Learning"
|
|
assert result[0].tag_count == 2
|
|
|
|
conn.execute(DropView(course_with_tags, if_exists=True))
|
|
|
|
def test_student_courses_detailed_view(
|
|
self, pg_engine: Engine, pg_many_to_many_tables: dict[str, Table]
|
|
) -> None:
|
|
"""Test flattened view of M:N relationship (no aggregation)."""
|
|
students = pg_many_to_many_tables["students"]
|
|
courses = pg_many_to_many_tables["courses"]
|
|
student_courses = pg_many_to_many_tables["student_courses"]
|
|
|
|
# Create view: detailed enrollment records
|
|
enrollment_details = View(
|
|
"enrollment_details",
|
|
select(
|
|
students.c.name.label("student_name"),
|
|
students.c.email,
|
|
courses.c.name.label("course_name"),
|
|
courses.c.credits,
|
|
student_courses.c.grade,
|
|
).select_from(
|
|
student_courses.join(students, student_courses.c.student_id == students.c.id).join(
|
|
courses, student_courses.c.course_id == courses.c.id
|
|
)
|
|
),
|
|
)
|
|
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(CreateView(enrollment_details, or_replace=True))
|
|
|
|
result = conn.execute(select(enrollment_details.as_table())).fetchall()
|
|
|
|
# Total enrollments: 6
|
|
assert len(result) == 6
|
|
|
|
# Check Alice's Web Development grade
|
|
alice_web = [
|
|
r for r in result if r.student_name == "Alice" and r.course_name == "Web Development"
|
|
]
|
|
assert len(alice_web) == 1
|
|
assert alice_web[0].grade == Decimal("4.00")
|
|
|
|
conn.execute(DropView(enrollment_details, if_exists=True))
|
|
|
|
def test_many_to_many_materialized_view(
|
|
self, pg_engine: Engine, pg_many_to_many_tables: dict[str, Table]
|
|
) -> None:
|
|
"""Test materialized view with M:N relationship."""
|
|
students = pg_many_to_many_tables["students"]
|
|
student_courses = pg_many_to_many_tables["student_courses"]
|
|
|
|
# Create materialized view for student GPA
|
|
student_gpa_mv = MaterializedView(
|
|
"student_gpa_mv",
|
|
select(
|
|
students.c.id,
|
|
students.c.name,
|
|
func.round(func.avg(student_courses.c.grade), 2).label("gpa"),
|
|
)
|
|
.select_from(
|
|
students.join(student_courses, students.c.id == student_courses.c.student_id)
|
|
)
|
|
.group_by(students.c.id, students.c.name),
|
|
with_data=True,
|
|
)
|
|
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(CreateMaterializedView(student_gpa_mv))
|
|
|
|
result = conn.execute(
|
|
select(student_gpa_mv.as_table()).order_by(
|
|
student_gpa_mv.as_table().c.gpa.desc()
|
|
)
|
|
).fetchall()
|
|
|
|
assert len(result) == 3
|
|
|
|
# Alice has highest GPA (3.8 + 4.0 + 3.5) / 3 = 3.77
|
|
assert result[0].name == "Alice"
|
|
|
|
conn.execute(DropMaterializedView(student_gpa_mv, if_exists=True))
|
|
|
|
def test_double_many_to_many_view(
|
|
self, pg_engine: Engine, pg_many_to_many_tables: dict[str, Table]
|
|
) -> None:
|
|
"""Test view joining two M:N relationships."""
|
|
students = pg_many_to_many_tables["students"]
|
|
courses = pg_many_to_many_tables["courses"]
|
|
student_courses = pg_many_to_many_tables["student_courses"]
|
|
course_tags = pg_many_to_many_tables["course_tags"]
|
|
tags = pg_many_to_many_tables["tags"]
|
|
|
|
# Create view: students with their course tags
|
|
student_tags = View(
|
|
"student_tags",
|
|
select(
|
|
students.c.id.label("student_id"),
|
|
students.c.name.label("student_name"),
|
|
func.count(func.distinct(tags.c.id)).label("unique_tags"),
|
|
)
|
|
.select_from(
|
|
students.join(student_courses, students.c.id == student_courses.c.student_id)
|
|
.join(courses, student_courses.c.course_id == courses.c.id)
|
|
.join(course_tags, courses.c.id == course_tags.c.course_id)
|
|
.join(tags, course_tags.c.tag_id == tags.c.id)
|
|
)
|
|
.group_by(students.c.id, students.c.name),
|
|
)
|
|
|
|
with pg_engine.begin() as conn:
|
|
conn.execute(CreateView(student_tags, or_replace=True))
|
|
|
|
result = conn.execute(
|
|
select(student_tags.as_table()).order_by(
|
|
student_tags.as_table().c.unique_tags.desc()
|
|
)
|
|
).fetchall()
|
|
|
|
assert len(result) == 3
|
|
|
|
# Alice takes all 3 courses, which have tags: data, programming, ai
|
|
assert result[0].student_name == "Alice"
|
|
assert result[0].unique_tags == 3
|
|
|
|
conn.execute(DropView(student_tags, if_exists=True))
|