Skip to content

Chapter 15: Fact Events and the Reporting Pipeline

The finance team needs a flat reporting database showing the current state of every account for regulatory filings. They do not want to process individual deposits and withdrawals — they want a single event per state change that contains the complete, current account state.

This is what fact events provide.

Delta Events vs. Fact Events

Delta Events Fact Events
Content What changed Complete current state
Example "Deposit of $500 made" "Account now has $1,500 balance"
Raised by raise_() in domain methods Auto-generated by the repository
Stream fidelis::account-{id} fidelis::account-fact-{id}
Use for Aggregate replay, handlers Reporting, external consumers

Delta events describe what happened. Fact events describe where things stand.

Enabling Fact Events

Add fact_events=True to the aggregate:

@domain.aggregate(is_event_sourced=True, fact_events=True)
class Account:
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")
    ...

Now every time the aggregate is persisted via repo.add(), Protean auto-generates a fact event containing the full aggregate state. You do not raise fact events manually — they are a byproduct of persistence.

The Reporting Projector

A projector that consumes fact events is simpler than one that consumes delta events, because every fact event is a complete state snapshot:

@domain.projection
class AccountReport:
    """A projection built entirely from fact events.

    Each fact event carries the full aggregate state, so the projector
    simply overwrites the projection record — no incremental calculations.
    """

    account_id: Identifier(identifier=True, required=True)
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")
    last_updated: DateTime()
@domain.event_handler(
    part_of=Account,
    stream_category="fidelis::account-fact",
)
class AccountReportHandler:
    """Maintains the AccountReport projection from Account fact events.

    Because fact events carry the complete aggregate state, the handler
    either creates or fully replaces the projection record on every event.

    Fact events are consumed as Messages (not typed domain events) since
    the auto-generated fact event class is created at runtime.
    """

    @handle("Fidelis.AccountFactEvent.v1")
    def on_account_fact_event(self, event):
        repo = current_domain.repository_for(AccountReport)

        try:
            report = repo.get(event.id)
            report.account_number = event.account_number
            report.holder_name = event.holder_name
            report.balance = event.balance
            report.status = event.status
        except Exception:
            report = AccountReport(
                account_id=event.id,
                account_number=event.account_number,
                holder_name=event.holder_name,
                balance=event.balance,
                status=event.status,
            )

        repo.add(report)

Notice:

  • The projector subscribes to stream_categories=["fidelis::account-fact"] — the fact event stream, not the regular event stream.
  • The handler simply overwrites the projection with the latest state. No need to apply deltas.

When to Use Fact Events

  • Reporting pipelines that need current state without replaying event history.
  • External consumers that do not understand your delta event schema.
  • Data warehouses and analytics systems that want snapshots.
  • Cross-bounded-context communication where the consumer needs a complete picture.

Do not use fact events for:

  • Aggregate replay — that uses delta events.
  • Event handlers that react to specific changes — "a large deposit was made" requires DepositMade, not a fact event.

What We Built

  • fact_events=True on the Account aggregate.
  • Auto-generated fact events containing complete aggregate state.
  • An AccountReportProjector consuming fact events for regulatory reporting.
  • Understanding of when to use delta events vs. fact events.

In the next chapter, an auditor asks to trace the complete chain of events triggered by a suspicious deposit.

Full Source

"""Chapter 15: Fact Events

Demonstrates how fact events provide a snapshot of aggregate state after every
change.  Fact events are auto-generated when an aggregate is configured with
``fact_events=True``.  They flow through a separate ``<aggregate>-fact``
stream, making them ideal for building projections that only need the latest
state rather than reconstructing it from individual domain events.
"""

from protean import Domain, apply, handle, invariant
from protean.exceptions import ValidationError
from protean.fields import DateTime, Float, Identifier, String
from protean.utils.globals import current_domain

domain = Domain("fidelis")


@domain.event(part_of="Account")
class AccountOpened:
    account_id: Identifier(required=True)
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.event(part_of="Account")
class DepositMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class WithdrawalMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.aggregate(is_event_sourced=True, fact_events=True)
class Account:
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")

    @invariant.post
    def balance_must_not_be_negative(self):
        if self.balance is not None and self.balance < 0:
            raise ValidationError(
                {"balance": ["Insufficient funds: balance cannot be negative"]}
            )

    @classmethod
    def open(cls, account_number: str, holder_name: str, opening_deposit: float):
        account = cls._create_new()
        account.raise_(
            AccountOpened(
                account_id=str(account.id),
                account_number=account_number,
                holder_name=holder_name,
                opening_deposit=opening_deposit,
            )
        )
        return account

    def deposit(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Deposit amount must be positive"]})
        self.raise_(
            DepositMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def withdraw(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
        self.raise_(
            WithdrawalMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    @apply
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        self.account_number = event.account_number
        self.holder_name = event.holder_name
        self.balance = event.opening_deposit
        self.status = "ACTIVE"

    @apply
    def on_deposit_made(self, event: DepositMade):
        self.balance += event.amount

    @apply
    def on_withdrawal_made(self, event: WithdrawalMade):
        self.balance -= event.amount


@domain.command(part_of=Account)
class OpenAccount:
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.command(part_of=Account)
class MakeDeposit:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command(part_of=Account)
class MakeWithdrawal:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command_handler(part_of=Account)
class AccountCommandHandler:
    @handle(OpenAccount)
    def handle_open_account(self, command: OpenAccount):
        account = Account.open(
            account_number=command.account_number,
            holder_name=command.holder_name,
            opening_deposit=command.opening_deposit,
        )
        current_domain.repository_for(Account).add(account)
        return str(account.id)

    @handle(MakeDeposit)
    def handle_make_deposit(self, command: MakeDeposit):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.deposit(command.amount, reference=command.reference)
        repo.add(account)

    @handle(MakeWithdrawal)
    def handle_make_withdrawal(self, command: MakeWithdrawal):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.withdraw(command.amount, reference=command.reference)
        repo.add(account)


@domain.projection
class AccountReport:
    """A projection built entirely from fact events.

    Each fact event carries the full aggregate state, so the projector
    simply overwrites the projection record — no incremental calculations.
    """

    account_id: Identifier(identifier=True, required=True)
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")
    last_updated: DateTime()


@domain.event_handler(
    part_of=Account,
    stream_category="fidelis::account-fact",
)
class AccountReportHandler:
    """Maintains the AccountReport projection from Account fact events.

    Because fact events carry the complete aggregate state, the handler
    either creates or fully replaces the projection record on every event.

    Fact events are consumed as Messages (not typed domain events) since
    the auto-generated fact event class is created at runtime.
    """

    @handle("Fidelis.AccountFactEvent.v1")
    def on_account_fact_event(self, event):
        repo = current_domain.repository_for(AccountReport)

        try:
            report = repo.get(event.id)
            report.account_number = event.account_number
            report.holder_name = event.holder_name
            report.balance = event.balance
            report.status = event.status
        except Exception:
            report = AccountReport(
                account_id=event.id,
                account_number=event.account_number,
                holder_name=event.holder_name,
                balance=event.balance,
                status=event.status,
            )

        repo.add(report)


domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"


if __name__ == "__main__":
    with domain.domain_context():
        # Open an account with $1000
        account_id = domain.process(
            OpenAccount(
                account_number="ACC-001",
                holder_name="Alice Johnson",
                opening_deposit=1000.00,
            )
        )
        print(f"Account opened: {account_id}")

        # Make a deposit
        domain.process(
            MakeDeposit(
                account_id=account_id,
                amount=500.00,
                reference="paycheck",
            )
        )
        print("Deposit of $500.00 made")

        # Make a withdrawal
        domain.process(
            MakeWithdrawal(
                account_id=account_id,
                amount=200.00,
                reference="groceries",
            )
        )
        print("Withdrawal of $200.00 made")

        # Verify account state from the event-sourced aggregate
        repo = current_domain.repository_for(Account)
        account = repo.get(account_id)
        print(f"\nAggregate balance: ${account.balance:.2f}")
        assert account.balance == 1300.00  # 1000 + 500 - 200

        # Verify fact events were generated in the event store
        fact_stream = f"{Account.meta_.stream_category}-fact-{account_id}"
        fact_messages = domain.event_store.store.read(fact_stream)
        print(f"\nFact events in stream: {len(fact_messages)}")
        for msg in fact_messages:
            event = msg.to_domain_object()
            print(f"  Balance: ${event.balance:.2f}, Status: {event.status}")

        assert len(fact_messages) == 3  # One per state change
        # Last fact event has the final state
        last_fact = fact_messages[-1].to_domain_object()
        assert last_fact.balance == 1300.00
        assert last_fact.status == "ACTIVE"

        # When the server runs, AccountReportHandler processes these
        # fact events to maintain the AccountReport projection.
        print("\nAll checks passed!")

Next

Chapter 16: Following the Trail — Message Tracing →