Skip to content

Chapter 15: Fact Events and the Reporting Pipeline

The marketing team wants a reporting dashboard showing current book information. They don't want to process individual BookAdded and BookPriceUpdated events — they want a single event per change containing the complete, current book state.

Delta Events vs. Fact Events

So far, our events have been delta events — they describe what changed:

Event Content
BookAdded book_id, title, author, price
BookPriceUpdated book_id, new_price

A consumer processing BookPriceUpdated only knows the new price — not the title, author, or any other field. To build a complete picture, it must process every event from the beginning.

Fact events solve this. A fact event contains the complete current state of the aggregate after each change:

Event Content
BookFactEvent id, title, author, isbn, price, description

Every time a Book is persisted, Protean automatically generates a fact event with the full state.

Enabling Fact Events

Add fact_events=True to the Book aggregate:

@domain.aggregate(fact_events=True)
class Book:
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    isbn: String(max_length=13)
    price: Float(default=0.0)
    description: Text()

    @classmethod
    def add_to_catalog(cls, title, author, isbn=None, price=0.0, description=""):
        book = cls(
            title=title,
            author=author,
            isbn=isbn,
            price=price,
            description=description,
        )
        book.raise_(
            BookAdded(
                book_id=book.id,
                title=book.title,
                author=book.author,
                price=price,
            )
        )
        return book

    def update_price(self, new_price: float):
        self.price = new_price

That's it. Protean now generates a BookFactEvent automatically whenever a Book is persisted.

Building a Report Projection

The marketing dashboard needs a BookReport projection populated from fact events:

@domain.projection
class BookReport:
    """Marketing dashboard projection — populated from fact events.

    When running with ``protean server`` (async processing), the
    ``BookReportHandler`` below maintains this projection automatically.
    """

    book_id: Identifier(identifier=True, required=True)
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    price: Float()
    isbn: String(max_length=13)

Consuming Fact Events

Fact events flow through a separate stream — <domain>::book-fact instead of <domain>::book. An event handler subscribed to the aggregate will receive both delta and fact events. When running the async server (protean server), the handler is invoked automatically:

@domain.event_handler(part_of=Book)
class BookReportHandler:
    """Consumes fact events to maintain the BookReport projection.

    Each fact event contains the complete current state, so we simply
    overwrite the projection entry — no need to apply deltas.

    When running with async event processing (``protean server``), this
    handler is automatically subscribed to the ``<domain>::book-fact``
    stream and invoked for every fact event.
    """

    @handle("$any")
    def on_book_fact(self, event):
        # Only process fact events (ignore delta events like BookAdded)
        if not event.__class__.__name__.endswith("FactEvent"):
            return

        repo = current_domain.repository_for(BookReport)

        try:
            report = repo.get(event.id)
            report.title = event.title
            report.author = event.author
            report.price = event.price
            report.isbn = event.isbn or ""
        except Exception:
            report = BookReport(
                book_id=event.id,
                title=event.title,
                author=event.author,
                price=event.price,
                isbn=event.isbn or "",
            )

        repo.add(report)

The handler receives the complete state and can simply overwrite the projection — no need to track deltas.

Verifying

if __name__ == "__main__":
    with domain.domain_context():
        book_repo = domain.repository_for(Book)

        # Add a book — triggers both the BookAdded event and a fact event
        print("=== Adding a Book ===")
        gatsby = Book.add_to_catalog(
            title="The Great Gatsby",
            author="F. Scott Fitzgerald",
            isbn="9780743273565",
            price=12.99,
        )
        book_repo.add(gatsby)

        # Update price — triggers a new fact event with complete state
        print("\n=== Updating Price ===")
        gatsby.update_price(15.99)
        book_repo.add(gatsby)

        # Read fact events from the event store
        fact_stream = f"{Book.meta_.stream_category}-fact-{gatsby.id}"
        fact_messages = domain.event_store.store.read(fact_stream)
        print(f"\nFact events in stream: {len(fact_messages)}")
        for msg in fact_messages:
            event = msg.to_domain_object()
            print(f"  Title: {event.title}, Price: ${event.price}")

        assert len(fact_messages) == 2  # One per state change
        last_fact = fact_messages[-1].to_domain_object()
        assert last_fact.price == 15.99
        assert last_fact.title == "The Great Gatsby"

        print("\nAll checks passed!")

Every time a book is added or updated, a fact event with the complete current state is written to the event store. The marketing team never needs to understand the internal event schema — they just consume fact events.

What We Built

  • Fact events with fact_events=True on the Book aggregate.
  • A BookReport projection for the marketing dashboard.
  • A BookReportHandler consuming fact events from the event store.
  • Understanding of delta vs. fact events and when to use each.

Part III is complete! The bookstore now runs asynchronously, validates inventory before shipping, integrates with external suppliers, and feeds a marketing dashboard. In the next chapter, we will enter production operations — starting with message tracing.

Full Source

"""Chapter 15: Fact Events and the Reporting Pipeline

Demonstrates how fact events provide a snapshot of aggregate state after every
change.  Fact events are auto-generated when an aggregate is configured with
``fact_events=True``.  They flow through a separate ``<aggregate>-fact``
stream, making them ideal for building projections that only need the latest
state rather than reconstructing it from individual domain events.
"""

from protean import Domain, handle
from protean.fields import Float, Identifier, String, Text
from protean.utils.globals import current_domain

domain = Domain()
domain.config["event_processing"] = "sync"


@domain.event(part_of="Book")
class BookAdded:
    book_id: Identifier(required=True)
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    price: Float()


@domain.aggregate(fact_events=True)
class Book:
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    isbn: String(max_length=13)
    price: Float(default=0.0)
    description: Text()

    @classmethod
    def add_to_catalog(cls, title, author, isbn=None, price=0.0, description=""):
        book = cls(
            title=title,
            author=author,
            isbn=isbn,
            price=price,
            description=description,
        )
        book.raise_(
            BookAdded(
                book_id=book.id,
                title=book.title,
                author=book.author,
                price=price,
            )
        )
        return book

    def update_price(self, new_price: float):
        self.price = new_price




@domain.projection
class BookReport:
    """Marketing dashboard projection — populated from fact events.

    When running with ``protean server`` (async processing), the
    ``BookReportHandler`` below maintains this projection automatically.
    """

    book_id: Identifier(identifier=True, required=True)
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    price: Float()
    isbn: String(max_length=13)




@domain.event_handler(part_of=Book)
class BookReportHandler:
    """Consumes fact events to maintain the BookReport projection.

    Each fact event contains the complete current state, so we simply
    overwrite the projection entry — no need to apply deltas.

    When running with async event processing (``protean server``), this
    handler is automatically subscribed to the ``<domain>::book-fact``
    stream and invoked for every fact event.
    """

    @handle("$any")
    def on_book_fact(self, event):
        # Only process fact events (ignore delta events like BookAdded)
        if not event.__class__.__name__.endswith("FactEvent"):
            return

        repo = current_domain.repository_for(BookReport)

        try:
            report = repo.get(event.id)
            report.title = event.title
            report.author = event.author
            report.price = event.price
            report.isbn = event.isbn or ""
        except Exception:
            report = BookReport(
                book_id=event.id,
                title=event.title,
                author=event.author,
                price=event.price,
                isbn=event.isbn or "",
            )

        repo.add(report)




domain.init(traverse=False)


if __name__ == "__main__":
    with domain.domain_context():
        book_repo = domain.repository_for(Book)

        # Add a book — triggers both the BookAdded event and a fact event
        print("=== Adding a Book ===")
        gatsby = Book.add_to_catalog(
            title="The Great Gatsby",
            author="F. Scott Fitzgerald",
            isbn="9780743273565",
            price=12.99,
        )
        book_repo.add(gatsby)

        # Update price — triggers a new fact event with complete state
        print("\n=== Updating Price ===")
        gatsby.update_price(15.99)
        book_repo.add(gatsby)

        # Read fact events from the event store
        fact_stream = f"{Book.meta_.stream_category}-fact-{gatsby.id}"
        fact_messages = domain.event_store.store.read(fact_stream)
        print(f"\nFact events in stream: {len(fact_messages)}")
        for msg in fact_messages:
            event = msg.to_domain_object()
            print(f"  Title: {event.title}, Price: ${event.price}")

        assert len(fact_messages) == 2  # One per state change
        last_fact = fact_messages[-1].to_domain_object()
        assert last_fact.price == 15.99
        assert last_fact.title == "The Great Gatsby"

        print("\nAll checks passed!")

Next

Chapter 16: Following the Trail — Message Tracing →