Skip to content

Chapter 7: Projections and Projectors

In this chapter we will create a BookCatalog projection — a read-optimized view that stays in sync with our Book aggregate through events.

Why Projections?

Our aggregates enforce business rules, but for listing books in a catalog we want flat, fast data — no nested value objects, no business logic. Projections give us that.

Defining a Projection

A projection is a flat data structure optimized for queries:

@domain.projection
class BookCatalog:
    """A read-optimized view of the book catalog for browsing."""

    book_id: Identifier(identifier=True, required=True)
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    price: Float()
    isbn: String(max_length=13)

Notice that the projection has only simple fields — String, Float, Identifier. No associations, no value objects. The identifier=True on book_id marks it as the primary key.

Building a Projector

A projector listens to events and maintains the projection:

@domain.projector(projector_for=BookCatalog, aggregates=[Book])
class BookCatalogProjector:
    """Maintains the BookCatalog projection from Book events."""

    @on(BookAdded)
    def on_book_added(self, event: BookAdded):
        catalog_entry = BookCatalog(
            book_id=event.book_id,
            title=event.title,
            author=event.author,
            price=event.price,
            isbn=event.isbn,
        )
        current_domain.repository_for(BookCatalog).add(catalog_entry)

    @on(BookPriceUpdated)
    def on_price_updated(self, event: BookPriceUpdated):
        repo = current_domain.repository_for(BookCatalog)
        entry = repo.get(event.book_id)
        entry.price = event.new_price
        repo.add(entry)

The projector is registered with projector_for=BookCatalog (the projection it maintains) and aggregates=[Book] (the aggregates whose events it listens to). The @on() decorator specifies which event triggers each method.

When a BookAdded event fires, on_book_added creates a catalog entry. When BookPriceUpdated fires, on_price_updated updates the existing entry.

Querying the Projection

Projections are queried the same way as aggregates — through a repository:

if __name__ == "__main__":
    with domain.domain_context():
        book_repo = domain.repository_for(Book)
        catalog_repo = domain.repository_for(BookCatalog)

        # Add books — events trigger the projector
        print("=== Adding Books ===")
        gatsby = Book.add_to_catalog(
            title="The Great Gatsby",
            author="F. Scott Fitzgerald",
            isbn="9780743273565",
            price=12.99,
        )
        book_repo.add(gatsby)

        brave = Book.add_to_catalog(
            title="Brave New World",
            author="Aldous Huxley",
            isbn="9780060850524",
            price=14.99,
        )
        book_repo.add(brave)

        orwell = Book.add_to_catalog(
            title="1984",
            author="George Orwell",
            isbn="9780451524935",
            price=11.99,
        )
        book_repo.add(orwell)

        # Query the projection — optimized for browsing
        print("\n=== Book Catalog (Projection) ===")
        all_entries = catalog_repo._dao.query.all()
        print(f"Total entries: {all_entries.total}")
        for entry in all_entries.items:
            print(f"  {entry.title} by {entry.author} — ${entry.price}")

        # Update a price — projector updates the catalog
        print("\n=== Updating Price ===")
        gatsby.update_price(15.99)
        book_repo.add(gatsby)

        updated_entry = catalog_repo.get(gatsby.id)
        print(f"Updated: {updated_entry.title} — ${updated_entry.price}")

        # Verify
        assert all_entries.total == 3
        assert updated_entry.price == 15.99
        print("\nAll checks passed!")

Run it:

$ python bookshelf.py
=== Adding Books ===

=== Book Catalog (Projection) ===
Total entries: 3
  The Great Gatsby by F. Scott Fitzgerald  $12.99
  Brave New World by Aldous Huxley  $14.99
  1984 by George Orwell  $11.99

=== Updating Price ===
Updated: The Great Gatsby  $15.99

All checks passed!

Notice that we never updated the projection directly — the projector reacted to events and kept it in sync automatically. The catalog always reflects the latest state of the Book aggregate.

What We Built

  • A BookCatalog projection — flat, query-optimized data.
  • A BookCatalogProjector — listens to Book events and maintains the projection.
  • Automatic sync: adding or updating a book immediately updates the catalog.

In the next chapter, we will switch from in-memory storage to a real PostgreSQL database.

Full Source

from protean import Domain
from protean.core.projector import on
from protean.fields import Float, Identifier, String, Text
from protean.utils.globals import current_domain

domain = Domain()

domain.config["event_processing"] = "sync"


@domain.aggregate
class Book:
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    isbn: String(max_length=13)
    price: Float(default=0.0)
    description: Text()

    @classmethod
    def add_to_catalog(cls, title, author, isbn=None, price=0.0, description=""):
        book = cls(
            title=title,
            author=author,
            isbn=isbn,
            price=price,
            description=description,
        )
        book.raise_(
            BookAdded(
                book_id=book.id,
                title=book.title,
                author=book.author,
                price=price,
                isbn=isbn or "",
            )
        )
        return book

    def update_price(self, new_price: float):
        self.price = new_price
        self.raise_(
            BookPriceUpdated(
                book_id=self.id,
                new_price=new_price,
            )
        )


@domain.event(part_of=Book)
class BookAdded:
    book_id: Identifier(required=True)
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    price: Float()
    isbn: String(max_length=13)


@domain.event(part_of=Book)
class BookPriceUpdated:
    book_id: Identifier(required=True)
    new_price: Float(required=True)




@domain.projection
class BookCatalog:
    """A read-optimized view of the book catalog for browsing."""

    book_id: Identifier(identifier=True, required=True)
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    price: Float()
    isbn: String(max_length=13)




@domain.projector(projector_for=BookCatalog, aggregates=[Book])
class BookCatalogProjector:
    """Maintains the BookCatalog projection from Book events."""

    @on(BookAdded)
    def on_book_added(self, event: BookAdded):
        catalog_entry = BookCatalog(
            book_id=event.book_id,
            title=event.title,
            author=event.author,
            price=event.price,
            isbn=event.isbn,
        )
        current_domain.repository_for(BookCatalog).add(catalog_entry)

    @on(BookPriceUpdated)
    def on_price_updated(self, event: BookPriceUpdated):
        repo = current_domain.repository_for(BookCatalog)
        entry = repo.get(event.book_id)
        entry.price = event.new_price
        repo.add(entry)




domain.init(traverse=False)


if __name__ == "__main__":
    with domain.domain_context():
        book_repo = domain.repository_for(Book)
        catalog_repo = domain.repository_for(BookCatalog)

        # Add books — events trigger the projector
        print("=== Adding Books ===")
        gatsby = Book.add_to_catalog(
            title="The Great Gatsby",
            author="F. Scott Fitzgerald",
            isbn="9780743273565",
            price=12.99,
        )
        book_repo.add(gatsby)

        brave = Book.add_to_catalog(
            title="Brave New World",
            author="Aldous Huxley",
            isbn="9780060850524",
            price=14.99,
        )
        book_repo.add(brave)

        orwell = Book.add_to_catalog(
            title="1984",
            author="George Orwell",
            isbn="9780451524935",
            price=11.99,
        )
        book_repo.add(orwell)

        # Query the projection — optimized for browsing
        print("\n=== Book Catalog (Projection) ===")
        all_entries = catalog_repo._dao.query.all()
        print(f"Total entries: {all_entries.total}")
        for entry in all_entries.items:
            print(f"  {entry.title} by {entry.author} — ${entry.price}")

        # Update a price — projector updates the catalog
        print("\n=== Updating Price ===")
        gatsby.update_price(15.99)
        book_repo.add(gatsby)

        updated_entry = catalog_repo.get(gatsby.id)
        print(f"Updated: {updated_entry.title} — ${updated_entry.price}")

        # Verify
        assert all_entries.total == 3
        assert updated_entry.price == 15.99
        print("\nAll checks passed!")

Next

Chapter 8: Connecting a Real Database →