Chapter 6: The Account Dashboard
The product team needs a customer-facing dashboard showing account summaries — current balance, transaction count, last activity. Reading this from the event store by replaying events every time is not practical. In this chapter we will build a projection — a denormalized, read-optimized view — and a projector that keeps it up to date automatically as events occur.
The Write Model vs. Read Model
Our Account aggregate is the write model — it enforces business
rules and records events. But querying it requires replaying events from
the event store, which gets expensive as the history grows.
A projection is a read model — a flat, query-optimized table designed for a specific view. The projector listens to events and updates the projection automatically.
Defining the Projection
@domain.projection
class AccountSummary:
"""A read-optimized view of account data for the dashboard."""
account_id: Identifier(identifier=True, required=True)
account_number: String(max_length=20, required=True)
holder_name: String(max_length=100, required=True)
balance: Float(default=0.0)
transaction_count: Integer(default=0)
last_transaction_at: DateTime()
Notice:
- Projections use basic field types only — no
HasMany, noValueObject. They are flat database tables. identifier=Truemarks the projection's primary key.- The projection stores exactly the data the dashboard needs.
Defining the Projector
@domain.projector(projector_for=AccountSummary, aggregates=[Account])
class AccountSummaryProjector:
"""Maintains the AccountSummary projection from Account events."""
@on(AccountOpened)
def on_account_opened(self, event: AccountOpened):
self.id = event.account_id
summary = AccountSummary(
account_id=event.account_id,
account_number=event.account_number,
holder_name=event.holder_name,
balance=event.opening_deposit,
transaction_count=1,
last_transaction_at=event._metadata.headers.time,
)
current_domain.repository_for(AccountSummary).add(summary)
@on(DepositMade)
def on_deposit_made(self, event: DepositMade):
repo = current_domain.repository_for(AccountSummary)
summary = repo.get(event.account_id)
summary.balance += event.amount
summary.transaction_count += 1
summary.last_transaction_at = event._metadata.headers.time
repo.add(summary)
@on(WithdrawalMade)
def on_withdrawal_made(self, event: WithdrawalMade):
repo = current_domain.repository_for(AccountSummary)
summary = repo.get(event.account_id)
summary.balance -= event.amount
summary.transaction_count += 1
summary.last_transaction_at = event._metadata.headers.time
repo.add(summary)
Key concepts:
projector_for=AccountSummarylinks this projector to its projection.aggregates=[Account]tells the projector which event streams to subscribe to.@on(EventType)maps each event to a handler method — an alias for@handlethat reads better in projector context.- Each handler loads, updates, and saves the projection through a repository.
Projections are Mutable
Unlike events (which are immutable facts), projections are mutable views. The projector loads, modifies, and saves them on every event — always overwriting with the latest derived state.
Trying It Out
if __name__ == "__main__":
with domain.domain_context():
# Open an account with $1000
account_id = domain.process(
OpenAccount(
account_number="ACC-001",
holder_name="Alice Johnson",
opening_deposit=1000.00,
)
)
print(f"Account opened: {account_id}")
# Make a deposit
domain.process(
MakeDeposit(
account_id=account_id,
amount=500.00,
reference="paycheck",
)
)
print("Deposit of $500.00 made")
# Check the projection
summary_repo = current_domain.repository_for(AccountSummary)
summary = summary_repo.get(account_id)
print("\n=== Account Dashboard ===")
print(f"Account: {summary.account_number}")
print(f"Holder: {summary.holder_name}")
print(f"Balance: ${summary.balance:.2f}")
print(f"Transactions: {summary.transaction_count}")
print(f"Last activity: {summary.last_transaction_at}")
assert summary.balance == 1500.00
assert summary.transaction_count == 2
print("\nAll checks passed!")
$ python fidelis.py
Account: Alice Johnson (ACC-001)
Balance: $1500.00
Dashboard:
Balance: $1500.00
Transactions: 1
The projection was updated automatically by the projector as events flowed through the system.
What We Built
- An AccountSummary projection — a flat, query-optimized read model.
- An AccountSummaryProjector that listens to account events and updates the projection.
- The
@ondecorator for mapping events to projector handlers. - A clear separation between the write model (aggregate) and read model (projection).
In the next chapter, we will add event handlers for side effects like compliance alerts and notifications — reactions that do not update a projection but trigger external actions.
Full Source
from protean import Domain, apply, handle, invariant
from protean.core.projector import on
from protean.exceptions import ValidationError
from protean.fields import DateTime, Float, Identifier, Integer, String
from protean.utils.globals import current_domain
domain = Domain("fidelis")
@domain.event(part_of="Account")
class AccountOpened:
account_id: Identifier(required=True)
account_number: String(required=True)
holder_name: String(required=True)
opening_deposit: Float(required=True)
@domain.event(part_of="Account")
class DepositMade:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.event(part_of="Account")
class WithdrawalMade:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.event(part_of="Account")
class AccountClosed:
account_id: Identifier(required=True)
reason: String()
@domain.aggregate(is_event_sourced=True)
class Account:
account_number: String(max_length=20, required=True)
holder_name: String(max_length=100, required=True)
balance: Float(default=0.0)
status: String(max_length=20, default="ACTIVE")
@invariant.post
def balance_must_not_be_negative(self):
if self.balance is not None and self.balance < 0:
raise ValidationError(
{"balance": ["Insufficient funds: balance cannot be negative"]}
)
@invariant.post
def closed_account_must_have_zero_balance(self):
if self.status == "CLOSED" and self.balance != 0:
raise ValidationError(
{"status": ["Cannot close account with non-zero balance"]}
)
@classmethod
def open(cls, account_number: str, holder_name: str, opening_deposit: float):
account = cls._create_new()
account.raise_(
AccountOpened(
account_id=str(account.id),
account_number=account_number,
holder_name=holder_name,
opening_deposit=opening_deposit,
)
)
return account
def deposit(self, amount: float, reference: str = None) -> None:
if amount <= 0:
raise ValidationError({"amount": ["Deposit amount must be positive"]})
self.raise_(
DepositMade(
account_id=str(self.id),
amount=amount,
reference=reference,
)
)
def withdraw(self, amount: float, reference: str = None) -> None:
if amount <= 0:
raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
self.raise_(
WithdrawalMade(
account_id=str(self.id),
amount=amount,
reference=reference,
)
)
def close(self, reason: str = None) -> None:
self.raise_(
AccountClosed(
account_id=str(self.id),
reason=reason,
)
)
@apply
def on_account_opened(self, event: AccountOpened):
self.id = event.account_id
self.account_number = event.account_number
self.holder_name = event.holder_name
self.balance = event.opening_deposit
self.status = "ACTIVE"
@apply
def on_deposit_made(self, event: DepositMade):
self.balance += event.amount
@apply
def on_withdrawal_made(self, event: WithdrawalMade):
self.balance -= event.amount
@apply
def on_account_closed(self, event: AccountClosed):
self.status = "CLOSED"
@domain.command(part_of=Account)
class OpenAccount:
account_number: String(required=True)
holder_name: String(required=True)
opening_deposit: Float(required=True)
@domain.command(part_of=Account)
class MakeDeposit:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.command(part_of=Account)
class MakeWithdrawal:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.command(part_of=Account)
class CloseAccount:
account_id: Identifier(required=True)
reason: String()
@domain.command_handler(part_of=Account)
class AccountCommandHandler:
@handle(OpenAccount)
def handle_open_account(self, command: OpenAccount):
account = Account.open(
account_number=command.account_number,
holder_name=command.holder_name,
opening_deposit=command.opening_deposit,
)
current_domain.repository_for(Account).add(account)
return str(account.id)
@handle(MakeDeposit)
def handle_make_deposit(self, command: MakeDeposit):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.deposit(command.amount, reference=command.reference)
repo.add(account)
@handle(MakeWithdrawal)
def handle_make_withdrawal(self, command: MakeWithdrawal):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.withdraw(command.amount, reference=command.reference)
repo.add(account)
@handle(CloseAccount)
def handle_close_account(self, command: CloseAccount):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.close(reason=command.reason)
repo.add(account)
@domain.projection
class AccountSummary:
"""A read-optimized view of account data for the dashboard."""
account_id: Identifier(identifier=True, required=True)
account_number: String(max_length=20, required=True)
holder_name: String(max_length=100, required=True)
balance: Float(default=0.0)
transaction_count: Integer(default=0)
last_transaction_at: DateTime()
@domain.projector(projector_for=AccountSummary, aggregates=[Account])
class AccountSummaryProjector:
"""Maintains the AccountSummary projection from Account events."""
@on(AccountOpened)
def on_account_opened(self, event: AccountOpened):
self.id = event.account_id
summary = AccountSummary(
account_id=event.account_id,
account_number=event.account_number,
holder_name=event.holder_name,
balance=event.opening_deposit,
transaction_count=1,
last_transaction_at=event._metadata.headers.time,
)
current_domain.repository_for(AccountSummary).add(summary)
@on(DepositMade)
def on_deposit_made(self, event: DepositMade):
repo = current_domain.repository_for(AccountSummary)
summary = repo.get(event.account_id)
summary.balance += event.amount
summary.transaction_count += 1
summary.last_transaction_at = event._metadata.headers.time
repo.add(summary)
@on(WithdrawalMade)
def on_withdrawal_made(self, event: WithdrawalMade):
repo = current_domain.repository_for(AccountSummary)
summary = repo.get(event.account_id)
summary.balance -= event.amount
summary.transaction_count += 1
summary.last_transaction_at = event._metadata.headers.time
repo.add(summary)
domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"
if __name__ == "__main__":
with domain.domain_context():
# Open an account with $1000
account_id = domain.process(
OpenAccount(
account_number="ACC-001",
holder_name="Alice Johnson",
opening_deposit=1000.00,
)
)
print(f"Account opened: {account_id}")
# Make a deposit
domain.process(
MakeDeposit(
account_id=account_id,
amount=500.00,
reference="paycheck",
)
)
print("Deposit of $500.00 made")
# Check the projection
summary_repo = current_domain.repository_for(AccountSummary)
summary = summary_repo.get(account_id)
print("\n=== Account Dashboard ===")
print(f"Account: {summary.account_number}")
print(f"Holder: {summary.holder_name}")
print(f"Balance: ${summary.balance:.2f}")
print(f"Transactions: {summary.transaction_count}")
print(f"Last activity: {summary.last_transaction_at}")
assert summary.balance == 1500.00
assert summary.transaction_count == 2
print("\nAll checks passed!")