Chapter 7: Projections and Projectors
In this chapter we will create a BookCatalog projection — a
read-optimized view that stays in sync with our Book aggregate through
events.
CQRS Concept
Projections are a CQRS-specific pattern — they separate your read model from your write model. In the pure DDD approach, you query aggregates directly through repositories. Projections become valuable when your read and write needs diverge.
Why Projections?
Our aggregates enforce business rules, but for listing books in a catalog we want flat, fast data — no nested value objects, no business logic. Projections give us that.
Defining a Projection
A projection is a flat data structure optimized for queries:
@domain.projection
class BookCatalog:
"""A read-optimized view of the book catalog for browsing."""
book_id: Identifier(identifier=True, required=True)
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
price: Float()
isbn: String(max_length=13)
Notice that the projection has only simple fields — String, Float,
Identifier. No associations or references. (ValueObject fields are also
allowed in projections when you need to group related attributes.) The
identifier=True on book_id marks it as the primary key.
Building a Projector
A projector listens to events and maintains the projection:
@domain.projector(projector_for=BookCatalog, aggregates=[Book])
class BookCatalogProjector:
"""Maintains the BookCatalog projection from Book events."""
@on(BookAdded)
def on_book_added(self, event: BookAdded):
catalog_entry = BookCatalog(
book_id=event.book_id,
title=event.title,
author=event.author,
price=event.price,
isbn=event.isbn,
)
current_domain.repository_for(BookCatalog).add(catalog_entry)
@on(BookPriceUpdated)
def on_price_updated(self, event: BookPriceUpdated):
repo = current_domain.repository_for(BookCatalog)
entry = repo.get(event.book_id)
entry.price = event.new_price
repo.add(entry)
The projector is registered with projector_for=BookCatalog (the
projection it maintains) and aggregates=[Book] (the aggregates whose
events it listens to). The @on() decorator specifies which event
triggers each method.
When a BookAdded event fires, on_book_added creates a catalog entry.
When BookPriceUpdated fires, on_price_updated updates the existing
entry.
Querying the Projection
Projections are queried using domain.view_for(), which returns a
ReadView — the CQRS read-side API:
if __name__ == "__main__":
with domain.domain_context():
book_repo = domain.repository_for(Book)
# Add books — events trigger the projector
print("=== Adding Books ===")
gatsby = Book.add_to_catalog(
title="The Great Gatsby",
author="F. Scott Fitzgerald",
isbn="9780743273565",
price=12.99,
)
book_repo.add(gatsby)
brave = Book.add_to_catalog(
title="Brave New World",
author="Aldous Huxley",
isbn="9780060850524",
price=14.99,
)
book_repo.add(brave)
orwell = Book.add_to_catalog(
title="1984",
author="George Orwell",
isbn="9780451524935",
price=11.99,
)
book_repo.add(orwell)
# Query the projection — optimized for browsing
print("\n=== Book Catalog (Projection) ===")
catalog = domain.view_for(BookCatalog)
all_entries = catalog.query.all()
print(f"Total entries: {all_entries.total}")
for entry in all_entries.items:
print(f" {entry.title} by {entry.author} — ${entry.price}")
# Update a price — projector updates the catalog
print("\n=== Updating Price ===")
gatsby.update_price(15.99)
book_repo.add(gatsby)
updated_entry = catalog.get(gatsby.id)
print(f"Updated: {updated_entry.title} — ${updated_entry.price}")
# Verify
assert all_entries.total == 3
assert updated_entry.price == 15.99
print("\nAll checks passed!")
domain.view_for(BookCatalog) returns a ReadView with typed
read-only methods: get() for a single record, query for a
ReadOnlyQuerySet (filtering, sorting, pagination), find_by() for
criteria-based lookup, count(), and exists(). All mutation
operations are blocked — this enforces the CQRS principle: projections
are read-only.
Run it:
$ python bookshelf.py
=== Adding Books ===
=== Book Catalog (Projection) ===
Total entries: 3
The Great Gatsby by F. Scott Fitzgerald — $12.99
Brave New World by Aldous Huxley — $14.99
1984 by George Orwell — $11.99
=== Updating Price ===
Updated: The Great Gatsby — $15.99
All checks passed!
Notice that we never updated the projection directly — the projector reacted to events and kept it in sync automatically. The catalog always reflects the latest state of the Book aggregate.
What We Built
- A
BookCatalogprojection — flat, query-optimized data. - A
BookCatalogProjector— listens to Book events and maintains the projection. domain.view_for()— the read-only view API for projections (withget(),query,find_by(),count(),exists()).- Automatic sync: adding or updating a book immediately updates the catalog.
In the next chapter, we will switch from in-memory storage to a real PostgreSQL database.
Full Source
from protean import Domain
from protean.core.projector import on
from protean.fields import Float, Identifier, String, Text
from protean.utils.globals import current_domain
domain = Domain()
domain.config["event_processing"] = "sync"
@domain.aggregate
class Book:
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
isbn: String(max_length=13)
price: Float(default=0.0)
description: Text()
@classmethod
def add_to_catalog(cls, title, author, isbn=None, price=0.0, description=""):
book = cls(
title=title,
author=author,
isbn=isbn,
price=price,
description=description,
)
book.raise_(
BookAdded(
book_id=book.id,
title=book.title,
author=book.author,
price=price,
isbn=isbn or "",
)
)
return book
def update_price(self, new_price: float):
self.price = new_price
self.raise_(
BookPriceUpdated(
book_id=self.id,
new_price=new_price,
)
)
@domain.event(part_of=Book)
class BookAdded:
book_id: Identifier(required=True)
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
price: Float()
isbn: String(max_length=13)
@domain.event(part_of=Book)
class BookPriceUpdated:
book_id: Identifier(required=True)
new_price: Float(required=True)
@domain.projection
class BookCatalog:
"""A read-optimized view of the book catalog for browsing."""
book_id: Identifier(identifier=True, required=True)
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
price: Float()
isbn: String(max_length=13)
@domain.projector(projector_for=BookCatalog, aggregates=[Book])
class BookCatalogProjector:
"""Maintains the BookCatalog projection from Book events."""
@on(BookAdded)
def on_book_added(self, event: BookAdded):
catalog_entry = BookCatalog(
book_id=event.book_id,
title=event.title,
author=event.author,
price=event.price,
isbn=event.isbn,
)
current_domain.repository_for(BookCatalog).add(catalog_entry)
@on(BookPriceUpdated)
def on_price_updated(self, event: BookPriceUpdated):
repo = current_domain.repository_for(BookCatalog)
entry = repo.get(event.book_id)
entry.price = event.new_price
repo.add(entry)
domain.init(traverse=False)
if __name__ == "__main__":
with domain.domain_context():
book_repo = domain.repository_for(Book)
# Add books — events trigger the projector
print("=== Adding Books ===")
gatsby = Book.add_to_catalog(
title="The Great Gatsby",
author="F. Scott Fitzgerald",
isbn="9780743273565",
price=12.99,
)
book_repo.add(gatsby)
brave = Book.add_to_catalog(
title="Brave New World",
author="Aldous Huxley",
isbn="9780060850524",
price=14.99,
)
book_repo.add(brave)
orwell = Book.add_to_catalog(
title="1984",
author="George Orwell",
isbn="9780451524935",
price=11.99,
)
book_repo.add(orwell)
# Query the projection — optimized for browsing
print("\n=== Book Catalog (Projection) ===")
catalog = domain.view_for(BookCatalog)
all_entries = catalog.query.all()
print(f"Total entries: {all_entries.total}")
for entry in all_entries.items:
print(f" {entry.title} by {entry.author} — ${entry.price}")
# Update a price — projector updates the catalog
print("\n=== Updating Price ===")
gatsby.update_price(15.99)
book_repo.add(gatsby)
updated_entry = catalog.get(gatsby.id)
print(f"Updated: {updated_entry.title} — ${updated_entry.price}")
# Verify
assert all_entries.total == 3
assert updated_entry.price == 15.99
print("\nAll checks passed!")