Chapter 7: Projections and Projectors
In this chapter we will create a BookCatalog projection — a
read-optimized view that stays in sync with our Book aggregate through
events.
Why Projections?
Our aggregates enforce business rules, but for listing books in a catalog we want flat, fast data — no nested value objects, no business logic. Projections give us that.
Defining a Projection
A projection is a flat data structure optimized for queries:
@domain.projection
class BookCatalog:
"""A read-optimized view of the book catalog for browsing."""
book_id: Identifier(identifier=True, required=True)
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
price: Float()
isbn: String(max_length=13)
Notice that the projection has only simple fields — String, Float,
Identifier. No associations, no value objects. The identifier=True
on book_id marks it as the primary key.
Building a Projector
A projector listens to events and maintains the projection:
@domain.projector(projector_for=BookCatalog, aggregates=[Book])
class BookCatalogProjector:
"""Maintains the BookCatalog projection from Book events."""
@on(BookAdded)
def on_book_added(self, event: BookAdded):
catalog_entry = BookCatalog(
book_id=event.book_id,
title=event.title,
author=event.author,
price=event.price,
isbn=event.isbn,
)
current_domain.repository_for(BookCatalog).add(catalog_entry)
@on(BookPriceUpdated)
def on_price_updated(self, event: BookPriceUpdated):
repo = current_domain.repository_for(BookCatalog)
entry = repo.get(event.book_id)
entry.price = event.new_price
repo.add(entry)
The projector is registered with projector_for=BookCatalog (the
projection it maintains) and aggregates=[Book] (the aggregates whose
events it listens to). The @on() decorator specifies which event
triggers each method.
When a BookAdded event fires, on_book_added creates a catalog entry.
When BookPriceUpdated fires, on_price_updated updates the existing
entry.
Querying the Projection
Projections are queried the same way as aggregates — through a repository:
if __name__ == "__main__":
with domain.domain_context():
book_repo = domain.repository_for(Book)
catalog_repo = domain.repository_for(BookCatalog)
# Add books — events trigger the projector
print("=== Adding Books ===")
gatsby = Book.add_to_catalog(
title="The Great Gatsby",
author="F. Scott Fitzgerald",
isbn="9780743273565",
price=12.99,
)
book_repo.add(gatsby)
brave = Book.add_to_catalog(
title="Brave New World",
author="Aldous Huxley",
isbn="9780060850524",
price=14.99,
)
book_repo.add(brave)
orwell = Book.add_to_catalog(
title="1984",
author="George Orwell",
isbn="9780451524935",
price=11.99,
)
book_repo.add(orwell)
# Query the projection — optimized for browsing
print("\n=== Book Catalog (Projection) ===")
all_entries = catalog_repo._dao.query.all()
print(f"Total entries: {all_entries.total}")
for entry in all_entries.items:
print(f" {entry.title} by {entry.author} — ${entry.price}")
# Update a price — projector updates the catalog
print("\n=== Updating Price ===")
gatsby.update_price(15.99)
book_repo.add(gatsby)
updated_entry = catalog_repo.get(gatsby.id)
print(f"Updated: {updated_entry.title} — ${updated_entry.price}")
# Verify
assert all_entries.total == 3
assert updated_entry.price == 15.99
print("\nAll checks passed!")
Run it:
$ python bookshelf.py
=== Adding Books ===
=== Book Catalog (Projection) ===
Total entries: 3
The Great Gatsby by F. Scott Fitzgerald — $12.99
Brave New World by Aldous Huxley — $14.99
1984 by George Orwell — $11.99
=== Updating Price ===
Updated: The Great Gatsby — $15.99
All checks passed!
Notice that we never updated the projection directly — the projector reacted to events and kept it in sync automatically. The catalog always reflects the latest state of the Book aggregate.
What We Built
- A
BookCatalogprojection — flat, query-optimized data. - A
BookCatalogProjector— listens to Book events and maintains the projection. - Automatic sync: adding or updating a book immediately updates the catalog.
In the next chapter, we will switch from in-memory storage to a real PostgreSQL database.
Full Source
from protean import Domain
from protean.core.projector import on
from protean.fields import Float, Identifier, String, Text
from protean.utils.globals import current_domain
domain = Domain()
domain.config["event_processing"] = "sync"
@domain.aggregate
class Book:
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
isbn: String(max_length=13)
price: Float(default=0.0)
description: Text()
@classmethod
def add_to_catalog(cls, title, author, isbn=None, price=0.0, description=""):
book = cls(
title=title,
author=author,
isbn=isbn,
price=price,
description=description,
)
book.raise_(
BookAdded(
book_id=book.id,
title=book.title,
author=book.author,
price=price,
isbn=isbn or "",
)
)
return book
def update_price(self, new_price: float):
self.price = new_price
self.raise_(
BookPriceUpdated(
book_id=self.id,
new_price=new_price,
)
)
@domain.event(part_of=Book)
class BookAdded:
book_id: Identifier(required=True)
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
price: Float()
isbn: String(max_length=13)
@domain.event(part_of=Book)
class BookPriceUpdated:
book_id: Identifier(required=True)
new_price: Float(required=True)
@domain.projection
class BookCatalog:
"""A read-optimized view of the book catalog for browsing."""
book_id: Identifier(identifier=True, required=True)
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
price: Float()
isbn: String(max_length=13)
@domain.projector(projector_for=BookCatalog, aggregates=[Book])
class BookCatalogProjector:
"""Maintains the BookCatalog projection from Book events."""
@on(BookAdded)
def on_book_added(self, event: BookAdded):
catalog_entry = BookCatalog(
book_id=event.book_id,
title=event.title,
author=event.author,
price=event.price,
isbn=event.isbn,
)
current_domain.repository_for(BookCatalog).add(catalog_entry)
@on(BookPriceUpdated)
def on_price_updated(self, event: BookPriceUpdated):
repo = current_domain.repository_for(BookCatalog)
entry = repo.get(event.book_id)
entry.price = event.new_price
repo.add(entry)
domain.init(traverse=False)
if __name__ == "__main__":
with domain.domain_context():
book_repo = domain.repository_for(Book)
catalog_repo = domain.repository_for(BookCatalog)
# Add books — events trigger the projector
print("=== Adding Books ===")
gatsby = Book.add_to_catalog(
title="The Great Gatsby",
author="F. Scott Fitzgerald",
isbn="9780743273565",
price=12.99,
)
book_repo.add(gatsby)
brave = Book.add_to_catalog(
title="Brave New World",
author="Aldous Huxley",
isbn="9780060850524",
price=14.99,
)
book_repo.add(brave)
orwell = Book.add_to_catalog(
title="1984",
author="George Orwell",
isbn="9780451524935",
price=11.99,
)
book_repo.add(orwell)
# Query the projection — optimized for browsing
print("\n=== Book Catalog (Projection) ===")
all_entries = catalog_repo._dao.query.all()
print(f"Total entries: {all_entries.total}")
for entry in all_entries.items:
print(f" {entry.title} by {entry.author} — ${entry.price}")
# Update a price — projector updates the catalog
print("\n=== Updating Price ===")
gatsby.update_price(15.99)
book_repo.add(gatsby)
updated_entry = catalog_repo.get(gatsby.id)
print(f"Updated: {updated_entry.title} — ${updated_entry.price}")
# Verify
assert all_entries.total == 3
assert updated_entry.price == 15.99
print("\nAll checks passed!")