Skip to content

Chapter 6: The Account Dashboard

The product team needs a customer-facing dashboard showing account summaries — current balance, transaction count, last activity. Reading this from the event store by replaying events every time is not practical. In this chapter we will build a projection — a denormalized, read-optimized view — and a projector that keeps it up to date automatically as events occur.

The Write Model vs. Read Model

Our Account aggregate is the write model — it enforces business rules and records events. But querying it requires replaying events from the event store, which gets expensive as the history grows.

A projection is a read model — a flat, query-optimized table designed for a specific view. The projector listens to events and updates the projection automatically.

Defining the Projection

@domain.projection
class AccountSummary:
    """A read-optimized view of account data for the dashboard."""

    account_id: Identifier(identifier=True, required=True)
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    transaction_count: Integer(default=0)
    last_transaction_at: DateTime()

Notice:

  • Projections use basic field types only — no HasMany, no ValueObject. They are flat database tables.
  • identifier=True marks the projection's primary key.
  • The projection stores exactly the data the dashboard needs.

Defining the Projector

@domain.projector(projector_for=AccountSummary, aggregates=[Account])
class AccountSummaryProjector:
    """Maintains the AccountSummary projection from Account events."""

    @on(AccountOpened)
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        summary = AccountSummary(
            account_id=event.account_id,
            account_number=event.account_number,
            holder_name=event.holder_name,
            balance=event.opening_deposit,
            transaction_count=1,
            last_transaction_at=event._metadata.headers.time,
        )
        current_domain.repository_for(AccountSummary).add(summary)

    @on(DepositMade)
    def on_deposit_made(self, event: DepositMade):
        repo = current_domain.repository_for(AccountSummary)
        summary = repo.get(event.account_id)
        summary.balance += event.amount
        summary.transaction_count += 1
        summary.last_transaction_at = event._metadata.headers.time
        repo.add(summary)

    @on(WithdrawalMade)
    def on_withdrawal_made(self, event: WithdrawalMade):
        repo = current_domain.repository_for(AccountSummary)
        summary = repo.get(event.account_id)
        summary.balance -= event.amount
        summary.transaction_count += 1
        summary.last_transaction_at = event._metadata.headers.time
        repo.add(summary)

Key concepts:

  • projector_for=AccountSummary links this projector to its projection.
  • aggregates=[Account] tells the projector which event streams to subscribe to.
  • @on(EventType) maps each event to a handler method — an alias for @handle that reads better in projector context.
  • Each handler loads, updates, and saves the projection through a repository.

Projections are Mutable

Unlike events (which are immutable facts), projections are mutable views. The projector loads, modifies, and saves them on every event — always overwriting with the latest derived state.

Trying It Out

if __name__ == "__main__":
    with domain.domain_context():
        # Open an account with $1000
        account_id = domain.process(
            OpenAccount(
                account_number="ACC-001",
                holder_name="Alice Johnson",
                opening_deposit=1000.00,
            )
        )
        print(f"Account opened: {account_id}")

        # Make a deposit
        domain.process(
            MakeDeposit(
                account_id=account_id,
                amount=500.00,
                reference="paycheck",
            )
        )
        print("Deposit of $500.00 made")

        # Check the projection
        summary_repo = current_domain.repository_for(AccountSummary)
        summary = summary_repo.get(account_id)
        print("\n=== Account Dashboard ===")
        print(f"Account: {summary.account_number}")
        print(f"Holder: {summary.holder_name}")
        print(f"Balance: ${summary.balance:.2f}")
        print(f"Transactions: {summary.transaction_count}")
        print(f"Last activity: {summary.last_transaction_at}")

        assert summary.balance == 1500.00
        assert summary.transaction_count == 2
        print("\nAll checks passed!")
$ python fidelis.py
Account: Alice Johnson (ACC-001)
Balance: $1500.00

Dashboard:
  Balance: $1500.00
  Transactions: 1

The projection was updated automatically by the projector as events flowed through the system.

What We Built

  • An AccountSummary projection — a flat, query-optimized read model.
  • An AccountSummaryProjector that listens to account events and updates the projection.
  • The @on decorator for mapping events to projector handlers.
  • A clear separation between the write model (aggregate) and read model (projection).

In the next chapter, we will add event handlers for side effects like compliance alerts and notifications — reactions that do not update a projection but trigger external actions.

Full Source

from protean import Domain, apply, handle, invariant
from protean.core.projector import on
from protean.exceptions import ValidationError
from protean.fields import DateTime, Float, Identifier, Integer, String
from protean.utils.globals import current_domain

domain = Domain("fidelis")


@domain.event(part_of="Account")
class AccountOpened:
    account_id: Identifier(required=True)
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.event(part_of="Account")
class DepositMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class WithdrawalMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class AccountClosed:
    account_id: Identifier(required=True)
    reason: String()


@domain.aggregate(is_event_sourced=True)
class Account:
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")

    @invariant.post
    def balance_must_not_be_negative(self):
        if self.balance is not None and self.balance < 0:
            raise ValidationError(
                {"balance": ["Insufficient funds: balance cannot be negative"]}
            )

    @invariant.post
    def closed_account_must_have_zero_balance(self):
        if self.status == "CLOSED" and self.balance != 0:
            raise ValidationError(
                {"status": ["Cannot close account with non-zero balance"]}
            )

    @classmethod
    def open(cls, account_number: str, holder_name: str, opening_deposit: float):
        account = cls._create_new()
        account.raise_(
            AccountOpened(
                account_id=str(account.id),
                account_number=account_number,
                holder_name=holder_name,
                opening_deposit=opening_deposit,
            )
        )
        return account

    def deposit(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Deposit amount must be positive"]})
        self.raise_(
            DepositMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def withdraw(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
        self.raise_(
            WithdrawalMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def close(self, reason: str = None) -> None:
        self.raise_(
            AccountClosed(
                account_id=str(self.id),
                reason=reason,
            )
        )

    @apply
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        self.account_number = event.account_number
        self.holder_name = event.holder_name
        self.balance = event.opening_deposit
        self.status = "ACTIVE"

    @apply
    def on_deposit_made(self, event: DepositMade):
        self.balance += event.amount

    @apply
    def on_withdrawal_made(self, event: WithdrawalMade):
        self.balance -= event.amount

    @apply
    def on_account_closed(self, event: AccountClosed):
        self.status = "CLOSED"


@domain.command(part_of=Account)
class OpenAccount:
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.command(part_of=Account)
class MakeDeposit:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command(part_of=Account)
class MakeWithdrawal:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command(part_of=Account)
class CloseAccount:
    account_id: Identifier(required=True)
    reason: String()


@domain.command_handler(part_of=Account)
class AccountCommandHandler:
    @handle(OpenAccount)
    def handle_open_account(self, command: OpenAccount):
        account = Account.open(
            account_number=command.account_number,
            holder_name=command.holder_name,
            opening_deposit=command.opening_deposit,
        )
        current_domain.repository_for(Account).add(account)
        return str(account.id)

    @handle(MakeDeposit)
    def handle_make_deposit(self, command: MakeDeposit):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.deposit(command.amount, reference=command.reference)
        repo.add(account)

    @handle(MakeWithdrawal)
    def handle_make_withdrawal(self, command: MakeWithdrawal):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.withdraw(command.amount, reference=command.reference)
        repo.add(account)

    @handle(CloseAccount)
    def handle_close_account(self, command: CloseAccount):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.close(reason=command.reason)
        repo.add(account)


@domain.projection
class AccountSummary:
    """A read-optimized view of account data for the dashboard."""

    account_id: Identifier(identifier=True, required=True)
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    transaction_count: Integer(default=0)
    last_transaction_at: DateTime()


@domain.projector(projector_for=AccountSummary, aggregates=[Account])
class AccountSummaryProjector:
    """Maintains the AccountSummary projection from Account events."""

    @on(AccountOpened)
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        summary = AccountSummary(
            account_id=event.account_id,
            account_number=event.account_number,
            holder_name=event.holder_name,
            balance=event.opening_deposit,
            transaction_count=1,
            last_transaction_at=event._metadata.headers.time,
        )
        current_domain.repository_for(AccountSummary).add(summary)

    @on(DepositMade)
    def on_deposit_made(self, event: DepositMade):
        repo = current_domain.repository_for(AccountSummary)
        summary = repo.get(event.account_id)
        summary.balance += event.amount
        summary.transaction_count += 1
        summary.last_transaction_at = event._metadata.headers.time
        repo.add(summary)

    @on(WithdrawalMade)
    def on_withdrawal_made(self, event: WithdrawalMade):
        repo = current_domain.repository_for(AccountSummary)
        summary = repo.get(event.account_id)
        summary.balance -= event.amount
        summary.transaction_count += 1
        summary.last_transaction_at = event._metadata.headers.time
        repo.add(summary)


domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"


if __name__ == "__main__":
    with domain.domain_context():
        # Open an account with $1000
        account_id = domain.process(
            OpenAccount(
                account_number="ACC-001",
                holder_name="Alice Johnson",
                opening_deposit=1000.00,
            )
        )
        print(f"Account opened: {account_id}")

        # Make a deposit
        domain.process(
            MakeDeposit(
                account_id=account_id,
                amount=500.00,
                reference="paycheck",
            )
        )
        print("Deposit of $500.00 made")

        # Check the projection
        summary_repo = current_domain.repository_for(AccountSummary)
        summary = summary_repo.get(account_id)
        print("\n=== Account Dashboard ===")
        print(f"Account: {summary.account_number}")
        print(f"Holder: {summary.holder_name}")
        print(f"Balance: ${summary.balance:.2f}")
        print(f"Transactions: {summary.transaction_count}")
        print(f"Last activity: {summary.last_transaction_at}")

        assert summary.balance == 1500.00
        assert summary.transaction_count == 2
        print("\nAll checks passed!")

Next

Chapter 7: Reacting to Events →