Chapter 11: Projections and Projectors — Read-Optimized Views
Our domain model is great for enforcing business rules, but it may not be optimal for queries. A book catalog listing does not need invariants or associations — it needs fast, flat data. Projections provide read-optimized views, and projectors build them from events.
Why Projections?
In CQRS (Command Query Responsibility Segregation), the write side and read side have different needs:
| Write Side (Aggregates) | Read Side (Projections) |
|---|---|
| Rich domain model | Flat, denormalized data |
| Business rules & invariants | Fast queries |
| Normalized structure | Shaped for specific consumers |
| Modified through commands | Built from events |
A projection is a denormalized view — data pre-shaped for a specific query pattern. Instead of joining tables or traversing associations, you query the projection directly.
Defining Projections
A projection looks like a simplified aggregate — just fields and an identifier:
@domain.projection
class BookCatalog:
"""A read-optimized view of the book catalog for browsing."""
book_id = Identifier(identifier=True, required=True)
title = String(max_length=200, required=True)
author = String(max_length=150, required=True)
price = Float()
Key points:
@domain.projection— registers a read modelidentifier=True— every projection needs an identity field- No associations — projections are flat. No
HasMany,HasOne, orValueObjectfields allowed - No business logic — projections are pure data containers
Building Projectors
A projector listens for events and builds/updates projections:
@domain.projector(projector_for=BookCatalog, aggregates=[Book])
class BookCatalogProjector:
"""Maintains the BookCatalog projection from Book events."""
@on(BookAdded)
def on_book_added(self, event: BookAdded):
catalog_entry = BookCatalog(
book_id=event.book_id,
title=event.title,
author=event.author,
price=event.price,
isbn=event.isbn,
)
current_domain.repository_for(BookCatalog).add(catalog_entry)
@on(BookPriceUpdated)
def on_price_updated(self, event: BookPriceUpdated):
repo = current_domain.repository_for(BookCatalog)
Key points:
@domain.projector(projector_for=BookCatalog, aggregates=[Book])— binds the projector to a projection and specifies which aggregates' events it processes.@on(BookAdded)— the handler that runs when aBookAddedevent arrives.@onis an alias for@handle, specific to projectors.- Create on add, update on change —
on_book_addedcreates a new catalog entry;on_price_updatedmodifies an existing one.
The projector acts as a translator: it takes events from the write side and transforms them into the shape needed by the read side.
Multiple Projections from Same Events
One of the great benefits of projections is building different views from the same events. For example:
- BookCatalog — for customers browsing the store (title, author, price)
- BookInventory — for warehouse staff (title, stock count, warehouse location)
- AuthorDirectory — for the "Authors" page (author name, book count)
Each projection has its own projector that listens to the same events but builds a different view.
Querying Projections
Projections use the same repository and query API as aggregates:
# Get a specific entry
entry = domain.repository_for(BookCatalog).get(book_id)
# Query all entries
entries = domain.repository_for(BookCatalog)._dao.query.all()
# Filter
fiction = domain.repository_for(BookCatalog)._dao.query.filter(
author="George Orwell"
).all()
Because projections are flat and denormalized, queries are fast — no joins, no traversals.
End-to-End: Write → Event → Projection → Read
Here is the complete CQRS flow:
sequenceDiagram
participant App
participant Book as Book Aggregate
participant Repo as Repository
participant P as BookCatalogProjector
participant Cat as BookCatalog
App->>Book: Book.add_to_catalog(...)
Book->>Book: raise_(BookAdded)
App->>Repo: repo.add(book)
Repo->>P: Dispatch BookAdded
P->>Cat: Create catalog entry
App->>Cat: catalog_repo.get(book_id)
Cat-->>App: {title, author, price}
Putting It Together
domain.init(traverse=False)
if __name__ == "__main__":
with domain.domain_context():
book_repo = domain.repository_for(Book)
catalog_repo = domain.repository_for(BookCatalog)
# Add books — events trigger the projector
print("=== Adding Books ===")
gatsby = Book.add_to_catalog(
title="The Great Gatsby",
author="F. Scott Fitzgerald",
isbn="9780743273565",
price=12.99,
)
book_repo.add(gatsby)
brave = Book.add_to_catalog(
title="Brave New World",
author="Aldous Huxley",
isbn="9780060850524",
price=14.99,
)
book_repo.add(brave)
orwell = Book.add_to_catalog(
title="1984",
author="George Orwell",
isbn="9780451524935",
price=11.99,
)
book_repo.add(orwell)
# Query the projection — optimized for browsing
print("\n=== Book Catalog (Projection) ===")
all_entries = catalog_repo._dao.query.all()
print(f"Total entries: {all_entries.total}")
for entry in all_entries.items:
print(f" {entry.title} by {entry.author} — ${entry.price}")
# Update a price — projector updates the catalog
print("\n=== Updating Price ===")
gatsby.update_price(15.99)
book_repo.add(gatsby)
updated_entry = catalog_repo.get(gatsby.id)
print(f"Updated: {updated_entry.title} — ${updated_entry.price}")
Run it:
$ python bookshelf.py
=== Adding Books ===
=== Book Catalog (Projection) ===
Total entries: 3
The Great Gatsby by F. Scott Fitzgerald — $12.99
Brave New World by Aldous Huxley — $14.99
1984 by George Orwell — $11.99
=== Updating Price ===
Updated: The Great Gatsby — $15.99
All checks passed!
The catalog projection is automatically maintained by the projector. When a book is added or its price changes, the catalog updates immediately (in sync mode) or shortly after (in async mode).
Full Source
from protean import Domain
from protean.core.projector import on
from protean.fields import Float, Identifier, String, Text
from protean.utils.globals import current_domain
domain = Domain()
domain.config["event_processing"] = "sync"
@domain.aggregate
class Book:
title = String(max_length=200, required=True)
author = String(max_length=150, required=True)
isbn = String(max_length=13)
price = Float(default=0.0)
description = Text()
@classmethod
def add_to_catalog(cls, title, author, isbn=None, price=0.0, description=""):
book = cls(
title=title,
author=author,
isbn=isbn,
price=price,
description=description,
)
book.raise_(
BookAdded(
book_id=book.id,
title=book.title,
author=book.author,
price=price,
isbn=isbn or "",
)
)
return book
def update_price(self, new_price: float):
self.price = new_price
self.raise_(
BookPriceUpdated(
book_id=self.id,
new_price=new_price,
)
)
@domain.event(part_of=Book)
class BookAdded:
book_id = Identifier(required=True)
title = String(max_length=200, required=True)
author = String(max_length=150, required=True)
price = Float()
isbn = String(max_length=13)
@domain.event(part_of=Book)
class BookPriceUpdated:
book_id = Identifier(required=True)
new_price = Float(required=True)
@domain.projection
class BookCatalog:
"""A read-optimized view of the book catalog for browsing."""
book_id = Identifier(identifier=True, required=True)
title = String(max_length=200, required=True)
author = String(max_length=150, required=True)
price = Float()
isbn = String(max_length=13)
@domain.projector(projector_for=BookCatalog, aggregates=[Book])
class BookCatalogProjector:
"""Maintains the BookCatalog projection from Book events."""
@on(BookAdded)
def on_book_added(self, event: BookAdded):
catalog_entry = BookCatalog(
book_id=event.book_id,
title=event.title,
author=event.author,
price=event.price,
isbn=event.isbn,
)
current_domain.repository_for(BookCatalog).add(catalog_entry)
@on(BookPriceUpdated)
def on_price_updated(self, event: BookPriceUpdated):
repo = current_domain.repository_for(BookCatalog)
entry = repo.get(event.book_id)
entry.price = event.new_price
repo.add(entry)
domain.init(traverse=False)
if __name__ == "__main__":
with domain.domain_context():
book_repo = domain.repository_for(Book)
catalog_repo = domain.repository_for(BookCatalog)
# Add books — events trigger the projector
print("=== Adding Books ===")
gatsby = Book.add_to_catalog(
title="The Great Gatsby",
author="F. Scott Fitzgerald",
isbn="9780743273565",
price=12.99,
)
book_repo.add(gatsby)
brave = Book.add_to_catalog(
title="Brave New World",
author="Aldous Huxley",
isbn="9780060850524",
price=14.99,
)
book_repo.add(brave)
orwell = Book.add_to_catalog(
title="1984",
author="George Orwell",
isbn="9780451524935",
price=11.99,
)
book_repo.add(orwell)
# Query the projection — optimized for browsing
print("\n=== Book Catalog (Projection) ===")
all_entries = catalog_repo._dao.query.all()
print(f"Total entries: {all_entries.total}")
for entry in all_entries.items:
print(f" {entry.title} by {entry.author} — ${entry.price}")
# Update a price — projector updates the catalog
print("\n=== Updating Price ===")
gatsby.update_price(15.99)
book_repo.add(gatsby)
updated_entry = catalog_repo.get(gatsby.id)
print(f"Updated: {updated_entry.title} — ${updated_entry.price}")
# Verify
assert all_entries.total == 3
assert updated_entry.price == 15.99
print("\nAll checks passed!")
Summary
In this chapter you learned:
- Projections are read-optimized, denormalized views defined with
@domain.projection. - Projectors listen for events and build/update projections, using
@domain.projectorwith@on(EventClass)handlers. - Projections have no business logic, no associations — just flat data optimized for queries.
- Multiple projections can be built from the same events for different consumers.
- This is the CQRS pattern — separating the write model (aggregates) from the read model (projections).
We have now covered all the domain elements. In the next part, we will move from in-memory to real infrastructure — databases, message brokers, and the Protean server.