Skip to content

Chapter 7: Projections and Projectors

In this chapter we will create a BookCatalog projection — a read-optimized view that stays in sync with our Book aggregate through events.

CQRS Concept

Projections are a CQRS-specific pattern — they separate your read model from your write model. In the pure DDD approach, you query aggregates directly through repositories. Projections become valuable when your read and write needs diverge.

Why Projections?

Our aggregates enforce business rules, but for listing books in a catalog we want flat, fast data — no nested value objects, no business logic. Projections give us that.

Defining a Projection

A projection is a flat data structure optimized for queries:

@domain.projection
class BookCatalog:
    """A read-optimized view of the book catalog for browsing."""

    book_id: Identifier(identifier=True, required=True)
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    price: Float()
    isbn: String(max_length=13)

Notice that the projection has only simple fields — String, Float, Identifier. No associations or references. (ValueObject fields are also allowed in projections when you need to group related attributes.) The identifier=True on book_id marks it as the primary key.

Building a Projector

A projector listens to events and maintains the projection:

@domain.projector(projector_for=BookCatalog, aggregates=[Book])
class BookCatalogProjector:
    """Maintains the BookCatalog projection from Book events."""

    @on(BookAdded)
    def on_book_added(self, event: BookAdded):
        catalog_entry = BookCatalog(
            book_id=event.book_id,
            title=event.title,
            author=event.author,
            price=event.price,
            isbn=event.isbn,
        )
        current_domain.repository_for(BookCatalog).add(catalog_entry)

    @on(BookPriceUpdated)
    def on_price_updated(self, event: BookPriceUpdated):
        repo = current_domain.repository_for(BookCatalog)
        entry = repo.get(event.book_id)
        entry.price = event.new_price
        repo.add(entry)

The projector is registered with projector_for=BookCatalog (the projection it maintains) and aggregates=[Book] (the aggregates whose events it listens to). The @on() decorator specifies which event triggers each method.

When a BookAdded event fires, on_book_added creates a catalog entry. When BookPriceUpdated fires, on_price_updated updates the existing entry.

Querying the Projection

Projections are queried using domain.view_for(), which returns a ReadView — the CQRS read-side API:

if __name__ == "__main__":
    with domain.domain_context():
        book_repo = domain.repository_for(Book)

        # Add books — events trigger the projector
        print("=== Adding Books ===")
        gatsby = Book.add_to_catalog(
            title="The Great Gatsby",
            author="F. Scott Fitzgerald",
            isbn="9780743273565",
            price=12.99,
        )
        book_repo.add(gatsby)

        brave = Book.add_to_catalog(
            title="Brave New World",
            author="Aldous Huxley",
            isbn="9780060850524",
            price=14.99,
        )
        book_repo.add(brave)

        orwell = Book.add_to_catalog(
            title="1984",
            author="George Orwell",
            isbn="9780451524935",
            price=11.99,
        )
        book_repo.add(orwell)

        # Query the projection — optimized for browsing
        print("\n=== Book Catalog (Projection) ===")
        catalog = domain.view_for(BookCatalog)
        all_entries = catalog.query.all()
        print(f"Total entries: {all_entries.total}")
        for entry in all_entries.items:
            print(f"  {entry.title} by {entry.author} — ${entry.price}")

        # Update a price — projector updates the catalog
        print("\n=== Updating Price ===")
        gatsby.update_price(15.99)
        book_repo.add(gatsby)

        updated_entry = catalog.get(gatsby.id)
        print(f"Updated: {updated_entry.title} — ${updated_entry.price}")

        # Verify
        assert all_entries.total == 3
        assert updated_entry.price == 15.99
        print("\nAll checks passed!")

domain.view_for(BookCatalog) returns a ReadView with typed read-only methods: get() for a single record, query for a ReadOnlyQuerySet (filtering, sorting, pagination), find_by() for criteria-based lookup, count(), and exists(). All mutation operations are blocked — this enforces the CQRS principle: projections are read-only.

Run it:

$ python bookshelf.py
=== Adding Books ===

=== Book Catalog (Projection) ===
Total entries: 3
  The Great Gatsby by F. Scott Fitzgerald  $12.99
  Brave New World by Aldous Huxley  $14.99
  1984 by George Orwell  $11.99

=== Updating Price ===
Updated: The Great Gatsby  $15.99

All checks passed!

Notice that we never updated the projection directly — the projector reacted to events and kept it in sync automatically. The catalog always reflects the latest state of the Book aggregate.

What We Built

  • A BookCatalog projection — flat, query-optimized data.
  • A BookCatalogProjector — listens to Book events and maintains the projection.
  • domain.view_for() — the read-only view API for projections (with get(), query, find_by(), count(), exists()).
  • Automatic sync: adding or updating a book immediately updates the catalog.

In the next chapter, we will switch from in-memory storage to a real PostgreSQL database.

Full Source

from protean import Domain
from protean.core.projector import on
from protean.fields import Float, Identifier, String, Text
from protean.utils.globals import current_domain

domain = Domain()

domain.config["event_processing"] = "sync"


@domain.aggregate
class Book:
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    isbn: String(max_length=13)
    price: Float(default=0.0)
    description: Text()

    @classmethod
    def add_to_catalog(cls, title, author, isbn=None, price=0.0, description=""):
        book = cls(
            title=title,
            author=author,
            isbn=isbn,
            price=price,
            description=description,
        )
        book.raise_(
            BookAdded(
                book_id=book.id,
                title=book.title,
                author=book.author,
                price=price,
                isbn=isbn or "",
            )
        )
        return book

    def update_price(self, new_price: float):
        self.price = new_price
        self.raise_(
            BookPriceUpdated(
                book_id=self.id,
                new_price=new_price,
            )
        )


@domain.event(part_of=Book)
class BookAdded:
    book_id: Identifier(required=True)
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    price: Float()
    isbn: String(max_length=13)


@domain.event(part_of=Book)
class BookPriceUpdated:
    book_id: Identifier(required=True)
    new_price: Float(required=True)




@domain.projection
class BookCatalog:
    """A read-optimized view of the book catalog for browsing."""

    book_id: Identifier(identifier=True, required=True)
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    price: Float()
    isbn: String(max_length=13)




@domain.projector(projector_for=BookCatalog, aggregates=[Book])
class BookCatalogProjector:
    """Maintains the BookCatalog projection from Book events."""

    @on(BookAdded)
    def on_book_added(self, event: BookAdded):
        catalog_entry = BookCatalog(
            book_id=event.book_id,
            title=event.title,
            author=event.author,
            price=event.price,
            isbn=event.isbn,
        )
        current_domain.repository_for(BookCatalog).add(catalog_entry)

    @on(BookPriceUpdated)
    def on_price_updated(self, event: BookPriceUpdated):
        repo = current_domain.repository_for(BookCatalog)
        entry = repo.get(event.book_id)
        entry.price = event.new_price
        repo.add(entry)




domain.init(traverse=False)


if __name__ == "__main__":
    with domain.domain_context():
        book_repo = domain.repository_for(Book)

        # Add books — events trigger the projector
        print("=== Adding Books ===")
        gatsby = Book.add_to_catalog(
            title="The Great Gatsby",
            author="F. Scott Fitzgerald",
            isbn="9780743273565",
            price=12.99,
        )
        book_repo.add(gatsby)

        brave = Book.add_to_catalog(
            title="Brave New World",
            author="Aldous Huxley",
            isbn="9780060850524",
            price=14.99,
        )
        book_repo.add(brave)

        orwell = Book.add_to_catalog(
            title="1984",
            author="George Orwell",
            isbn="9780451524935",
            price=11.99,
        )
        book_repo.add(orwell)

        # Query the projection — optimized for browsing
        print("\n=== Book Catalog (Projection) ===")
        catalog = domain.view_for(BookCatalog)
        all_entries = catalog.query.all()
        print(f"Total entries: {all_entries.total}")
        for entry in all_entries.items:
            print(f"  {entry.title} by {entry.author} — ${entry.price}")

        # Update a price — projector updates the catalog
        print("\n=== Updating Price ===")
        gatsby.update_price(15.99)
        book_repo.add(gatsby)

        updated_entry = catalog.get(gatsby.id)
        print(f"Updated: {updated_entry.title} — ${updated_entry.price}")

        # Verify
        assert all_entries.total == 3
        assert updated_entry.price == 15.99
        print("\nAll checks passed!")

Next

Chapter 8: Connecting a Real Database →