Chapter 21: Advanced Query Patterns
The storefront needs rich browsing capabilities: filter books by author,
sort by price, paginate results, and show inventory availability
alongside book information. In this chapter we will explore the full
power of domain.query_for() and build a cross-aggregate projection.
ReadOnlyQuerySet Deep Dive
domain.query_for() returns a ReadOnlyQuerySet that supports:
Filtering
# Exact match
results = domain.query_for(BookCatalog).filter(author="George Orwell").all()
# Multiple conditions (AND)
results = domain.query_for(BookCatalog).filter(
author="George Orwell",
price__lte=15.00,
).all()
Excluding
# Exclude specific values
results = domain.query_for(BookCatalog).exclude(author="Unknown").all()
Ordering
# Ascending
results = domain.query_for(BookCatalog).order_by("price").all()
# Descending (prefix with -)
results = domain.query_for(BookCatalog).order_by("-price").all()
# Multiple fields
results = domain.query_for(BookCatalog).order_by("author", "-price").all()
Pagination
# First page (10 items)
page1 = domain.query_for(BookCatalog).limit(10).offset(0).all()
# Second page
page2 = domain.query_for(BookCatalog).limit(10).offset(10).all()
Chaining
All methods can be chained:
results = (
domain.query_for(BookCatalog)
.filter(author="George Orwell")
.exclude(price__gt=20.00)
.order_by("-price")
.limit(10)
.all()
)
Read-Only Enforcement
ReadOnlyQuerySet blocks all mutation operations:
# These all raise ReadOnlyQuerySetError
domain.query_for(BookCatalog).update(price=0) # blocked
domain.query_for(BookCatalog).delete() # blocked
This enforces the CQRS principle: read models are read-only. Changes flow through commands on the write side.
Cross-Aggregate Projections
The storefront needs a unified view showing books with their stock levels — data from two aggregates (Book and Inventory).
The StorefrontView Projection
@domain.projection
class StorefrontView:
"""Cross-aggregate projection combining Book and Inventory data."""
book_id: Identifier(identifier=True, required=True)
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
price: Float()
quantity: Integer(default=0)
in_stock: Boolean(default=False)
The StorefrontProjector
@domain.projector(
projector_for=StorefrontView,
aggregates=[Book, Inventory],
)
class StorefrontProjector:
"""Maintains the StorefrontView from Book and Inventory events."""
@on(BookAdded)
def on_book_added(self, event: BookAdded):
entry = StorefrontView(
book_id=event.book_id,
title=event.title,
author=event.author,
price=event.price,
quantity=0,
in_stock=False,
)
current_domain.repository_for(StorefrontView).add(entry)
@on(InventoryStocked)
def on_inventory_stocked(self, event: InventoryStocked):
repo = current_domain.repository_for(StorefrontView)
try:
entry = repo.get(event.book_id)
entry.quantity = event.quantity
entry.in_stock = event.quantity > 0
repo.add(entry)
except Exception:
pass # Book not yet in storefront
The projector subscribes to events from both Book and Inventory aggregates. It maintains a single denormalized view that the storefront can query directly.
Enhanced API Endpoints
Update the API with rich query support:
# bookshelf/api.py — enhanced storefront endpoint
# @app.get("/storefront")
def browse_storefront(
author: str = None,
in_stock: bool = None,
sort: str = "title",
limit: int = 20,
offset: int = 0,
):
qs = domain.query_for(StorefrontView)
if author:
qs = qs.filter(author=author)
if in_stock is not None:
qs = qs.filter(in_stock=in_stock)
qs = qs.order_by(sort).limit(limit).offset(offset)
results = qs.all()
return {
"entries": [
{
"book_id": str(e.book_id),
"title": e.title,
"author": e.author,
"price": e.price,
"quantity": e.quantity,
"in_stock": e.in_stock,
}
for e in results.items
],
"total": results.total,
}
Now the storefront can browse with filtering, sorting, and pagination:
# Browse all books in stock
$ curl "http://localhost:8000/storefront?in_stock=true"
# Search by author, sorted by price
$ curl "http://localhost:8000/storefront?author=Orwell&sort=-price"
# Paginate results
$ curl "http://localhost:8000/storefront?limit=10&offset=20"
What We Built
- A deep understanding of
ReadOnlyQuerySet— filtering, sorting, pagination, and read-only enforcement. - A
StorefrontViewcross-aggregate projection combining Book and Inventory data. - Enhanced API endpoints with query parameters for rich browsing.
In the final chapter, we will step back and look at the complete architecture.
Full Source
from protean import Domain
from protean.core.projector import on
from protean.fields import Boolean, Float, Identifier, Integer, String
from protean.utils.globals import current_domain
domain = Domain("bookshelf")
domain.config["event_processing"] = "sync"
@domain.aggregate
class Book:
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
isbn: String(max_length=13)
price: Float(default=0.0)
@domain.aggregate
class Inventory:
book_id: Identifier(required=True)
title: String(max_length=200, required=True)
quantity: Integer(default=0)
@domain.event(part_of=Book)
class BookAdded:
book_id: Identifier(required=True)
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
price: Float()
@domain.event(part_of=Inventory)
class InventoryStocked:
book_id: Identifier(required=True)
title: String(max_length=200)
quantity: Integer()
@domain.projection
class StorefrontView:
"""Cross-aggregate projection combining Book and Inventory data."""
book_id: Identifier(identifier=True, required=True)
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
price: Float()
quantity: Integer(default=0)
in_stock: Boolean(default=False)
@domain.projector(
projector_for=StorefrontView,
aggregates=[Book, Inventory],
)
class StorefrontProjector:
"""Maintains the StorefrontView from Book and Inventory events."""
@on(BookAdded)
def on_book_added(self, event: BookAdded):
entry = StorefrontView(
book_id=event.book_id,
title=event.title,
author=event.author,
price=event.price,
quantity=0,
in_stock=False,
)
current_domain.repository_for(StorefrontView).add(entry)
@on(InventoryStocked)
def on_inventory_stocked(self, event: InventoryStocked):
repo = current_domain.repository_for(StorefrontView)
try:
entry = repo.get(event.book_id)
entry.quantity = event.quantity
entry.in_stock = event.quantity > 0
repo.add(entry)
except Exception:
pass # Book not yet in storefront
domain.init(traverse=False)
# bookshelf/api.py — enhanced storefront endpoint
# @app.get("/storefront")
def browse_storefront(
author: str = None,
in_stock: bool = None,
sort: str = "title",
limit: int = 20,
offset: int = 0,
):
qs = domain.query_for(StorefrontView)
if author:
qs = qs.filter(author=author)
if in_stock is not None:
qs = qs.filter(in_stock=in_stock)
qs = qs.order_by(sort).limit(limit).offset(offset)
results = qs.all()
return {
"entries": [
{
"book_id": str(e.book_id),
"title": e.title,
"author": e.author,
"price": e.price,
"quantity": e.quantity,
"in_stock": e.in_stock,
}
for e in results.items
],
"total": results.total,
}