Skip to content

Chapter 11: Projections and Projectors — Read-Optimized Views

Our domain model is great for enforcing business rules, but it may not be optimal for queries. A book catalog listing does not need invariants or associations — it needs fast, flat data. Projections provide read-optimized views, and projectors build them from events.

Why Projections?

In CQRS (Command Query Responsibility Segregation), the write side and read side have different needs:

Write Side (Aggregates) Read Side (Projections)
Rich domain model Flat, denormalized data
Business rules & invariants Fast queries
Normalized structure Shaped for specific consumers
Modified through commands Built from events

A projection is a denormalized view — data pre-shaped for a specific query pattern. Instead of joining tables or traversing associations, you query the projection directly.

Defining Projections

A projection looks like a simplified aggregate — just fields and an identifier:

@domain.projection
class BookCatalog:
    """A read-optimized view of the book catalog for browsing."""

    book_id = Identifier(identifier=True, required=True)
    title = String(max_length=200, required=True)
    author = String(max_length=150, required=True)
    price = Float()

Key points:

  • @domain.projection — registers a read model
  • identifier=True — every projection needs an identity field
  • No associations — projections are flat. No HasMany, HasOne, or ValueObject fields allowed
  • No business logic — projections are pure data containers

Building Projectors

A projector listens for events and builds/updates projections:

@domain.projector(projector_for=BookCatalog, aggregates=[Book])
class BookCatalogProjector:
    """Maintains the BookCatalog projection from Book events."""

    @on(BookAdded)
    def on_book_added(self, event: BookAdded):
        catalog_entry = BookCatalog(
            book_id=event.book_id,
            title=event.title,
            author=event.author,
            price=event.price,
            isbn=event.isbn,
        )
        current_domain.repository_for(BookCatalog).add(catalog_entry)

    @on(BookPriceUpdated)
    def on_price_updated(self, event: BookPriceUpdated):
        repo = current_domain.repository_for(BookCatalog)

Key points:

  • @domain.projector(projector_for=BookCatalog, aggregates=[Book]) — binds the projector to a projection and specifies which aggregates' events it processes.
  • @on(BookAdded) — the handler that runs when a BookAdded event arrives. @on is an alias for @handle, specific to projectors.
  • Create on add, update on changeon_book_added creates a new catalog entry; on_price_updated modifies an existing one.

The projector acts as a translator: it takes events from the write side and transforms them into the shape needed by the read side.

Multiple Projections from Same Events

One of the great benefits of projections is building different views from the same events. For example:

  • BookCatalog — for customers browsing the store (title, author, price)
  • BookInventory — for warehouse staff (title, stock count, warehouse location)
  • AuthorDirectory — for the "Authors" page (author name, book count)

Each projection has its own projector that listens to the same events but builds a different view.

Querying Projections

Projections use the same repository and query API as aggregates:

# Get a specific entry
entry = domain.repository_for(BookCatalog).get(book_id)

# Query all entries
entries = domain.repository_for(BookCatalog)._dao.query.all()

# Filter
fiction = domain.repository_for(BookCatalog)._dao.query.filter(
    author="George Orwell"
).all()

Because projections are flat and denormalized, queries are fast — no joins, no traversals.

End-to-End: Write → Event → Projection → Read

Here is the complete CQRS flow:

sequenceDiagram
    participant App
    participant Book as Book Aggregate
    participant Repo as Repository
    participant P as BookCatalogProjector
    participant Cat as BookCatalog

    App->>Book: Book.add_to_catalog(...)
    Book->>Book: raise_(BookAdded)
    App->>Repo: repo.add(book)
    Repo->>P: Dispatch BookAdded
    P->>Cat: Create catalog entry
    App->>Cat: catalog_repo.get(book_id)
    Cat-->>App: {title, author, price}

Putting It Together

domain.init(traverse=False)


if __name__ == "__main__":
    with domain.domain_context():
        book_repo = domain.repository_for(Book)
        catalog_repo = domain.repository_for(BookCatalog)

        # Add books — events trigger the projector
        print("=== Adding Books ===")
        gatsby = Book.add_to_catalog(
            title="The Great Gatsby",
            author="F. Scott Fitzgerald",
            isbn="9780743273565",
            price=12.99,
        )
        book_repo.add(gatsby)

        brave = Book.add_to_catalog(
            title="Brave New World",
            author="Aldous Huxley",
            isbn="9780060850524",
            price=14.99,
        )
        book_repo.add(brave)

        orwell = Book.add_to_catalog(
            title="1984",
            author="George Orwell",
            isbn="9780451524935",
            price=11.99,
        )
        book_repo.add(orwell)

        # Query the projection — optimized for browsing
        print("\n=== Book Catalog (Projection) ===")
        all_entries = catalog_repo._dao.query.all()
        print(f"Total entries: {all_entries.total}")
        for entry in all_entries.items:
            print(f"  {entry.title} by {entry.author} — ${entry.price}")

        # Update a price — projector updates the catalog
        print("\n=== Updating Price ===")
        gatsby.update_price(15.99)
        book_repo.add(gatsby)

        updated_entry = catalog_repo.get(gatsby.id)
        print(f"Updated: {updated_entry.title} — ${updated_entry.price}")

Run it:

$ python bookshelf.py
=== Adding Books ===

=== Book Catalog (Projection) ===
Total entries: 3
  The Great Gatsby by F. Scott Fitzgerald  $12.99
  Brave New World by Aldous Huxley  $14.99
  1984 by George Orwell  $11.99

=== Updating Price ===
Updated: The Great Gatsby  $15.99

All checks passed!

The catalog projection is automatically maintained by the projector. When a book is added or its price changes, the catalog updates immediately (in sync mode) or shortly after (in async mode).

Full Source

from protean import Domain
from protean.core.projector import on
from protean.fields import Float, Identifier, String, Text
from protean.utils.globals import current_domain

domain = Domain()

domain.config["event_processing"] = "sync"


@domain.aggregate
class Book:
    title = String(max_length=200, required=True)
    author = String(max_length=150, required=True)
    isbn = String(max_length=13)
    price = Float(default=0.0)
    description = Text()

    @classmethod
    def add_to_catalog(cls, title, author, isbn=None, price=0.0, description=""):
        book = cls(
            title=title,
            author=author,
            isbn=isbn,
            price=price,
            description=description,
        )
        book.raise_(
            BookAdded(
                book_id=book.id,
                title=book.title,
                author=book.author,
                price=price,
                isbn=isbn or "",
            )
        )
        return book

    def update_price(self, new_price: float):
        self.price = new_price
        self.raise_(
            BookPriceUpdated(
                book_id=self.id,
                new_price=new_price,
            )
        )


@domain.event(part_of=Book)
class BookAdded:
    book_id = Identifier(required=True)
    title = String(max_length=200, required=True)
    author = String(max_length=150, required=True)
    price = Float()
    isbn = String(max_length=13)


@domain.event(part_of=Book)
class BookPriceUpdated:
    book_id = Identifier(required=True)
    new_price = Float(required=True)




@domain.projection
class BookCatalog:
    """A read-optimized view of the book catalog for browsing."""

    book_id = Identifier(identifier=True, required=True)
    title = String(max_length=200, required=True)
    author = String(max_length=150, required=True)
    price = Float()
    isbn = String(max_length=13)




@domain.projector(projector_for=BookCatalog, aggregates=[Book])
class BookCatalogProjector:
    """Maintains the BookCatalog projection from Book events."""

    @on(BookAdded)
    def on_book_added(self, event: BookAdded):
        catalog_entry = BookCatalog(
            book_id=event.book_id,
            title=event.title,
            author=event.author,
            price=event.price,
            isbn=event.isbn,
        )
        current_domain.repository_for(BookCatalog).add(catalog_entry)

    @on(BookPriceUpdated)
    def on_price_updated(self, event: BookPriceUpdated):
        repo = current_domain.repository_for(BookCatalog)
        entry = repo.get(event.book_id)
        entry.price = event.new_price
        repo.add(entry)




domain.init(traverse=False)


if __name__ == "__main__":
    with domain.domain_context():
        book_repo = domain.repository_for(Book)
        catalog_repo = domain.repository_for(BookCatalog)

        # Add books — events trigger the projector
        print("=== Adding Books ===")
        gatsby = Book.add_to_catalog(
            title="The Great Gatsby",
            author="F. Scott Fitzgerald",
            isbn="9780743273565",
            price=12.99,
        )
        book_repo.add(gatsby)

        brave = Book.add_to_catalog(
            title="Brave New World",
            author="Aldous Huxley",
            isbn="9780060850524",
            price=14.99,
        )
        book_repo.add(brave)

        orwell = Book.add_to_catalog(
            title="1984",
            author="George Orwell",
            isbn="9780451524935",
            price=11.99,
        )
        book_repo.add(orwell)

        # Query the projection — optimized for browsing
        print("\n=== Book Catalog (Projection) ===")
        all_entries = catalog_repo._dao.query.all()
        print(f"Total entries: {all_entries.total}")
        for entry in all_entries.items:
            print(f"  {entry.title} by {entry.author} — ${entry.price}")

        # Update a price — projector updates the catalog
        print("\n=== Updating Price ===")
        gatsby.update_price(15.99)
        book_repo.add(gatsby)

        updated_entry = catalog_repo.get(gatsby.id)
        print(f"Updated: {updated_entry.title} — ${updated_entry.price}")

        # Verify
        assert all_entries.total == 3
        assert updated_entry.price == 15.99
        print("\nAll checks passed!")

Summary

In this chapter you learned:

  • Projections are read-optimized, denormalized views defined with @domain.projection.
  • Projectors listen for events and build/update projections, using @domain.projector with @on(EventClass) handlers.
  • Projections have no business logic, no associations — just flat data optimized for queries.
  • Multiple projections can be built from the same events for different consumers.
  • This is the CQRS pattern — separating the write model (aggregates) from the read model (projections).

We have now covered all the domain elements. In the next part, we will move from in-memory to real infrastructure — databases, message brokers, and the Protean server.

Next

Chapter 12: Configuration and Real Databases →