Skip to content

Chapter 2: Deposits and Withdrawals

An account that can only be opened is not very useful. In this chapter we will add deposits and withdrawals — multiple event types flowing through a single aggregate. Along the way, we will see a core principle of Event Sourcing: all state changes flow exclusively through events.

New Events

A deposit and a withdrawal are two distinct facts:

@domain.event(part_of="Account")
class DepositMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class WithdrawalMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()

Each event captures the data needed to describe what happened. The reference field is optional — it records why the transaction occurred (a paycheck, a refund, a grocery purchase).

Domain Methods and Apply Handlers

Now we add domain methods to the Account aggregate and the corresponding @apply handlers:

@domain.aggregate(is_event_sourced=True)
class Account:
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")

    @classmethod
    def open(cls, account_number: str, holder_name: str, opening_deposit: float):
        account = cls._create_new()
        account.raise_(
            AccountOpened(
                account_id=str(account.id),
                account_number=account_number,
                holder_name=holder_name,
                opening_deposit=opening_deposit,
            )
        )
        return account

    def deposit(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Deposit amount must be positive"]})
        self.raise_(
            DepositMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def withdraw(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
        if amount > self.balance:
            raise ValidationError({"amount": ["Insufficient funds"]})
        self.raise_(
            WithdrawalMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    @apply
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        self.account_number = event.account_number
        self.holder_name = event.holder_name
        self.balance = event.opening_deposit
        self.status = "ACTIVE"

    @apply
    def on_deposit_made(self, event: DepositMade):
        self.balance += event.amount

    @apply
    def on_withdrawal_made(self, event: WithdrawalMade):
        self.balance -= event.amount

Notice the separation of concerns:

  • Domain methods (deposit(), withdraw()) contain validation logic — they check that the amount is positive and that funds are sufficient. If validation passes, they call raise_().
  • @apply handlers contain pure state mutations — they update the balance. No validation, no side effects, just state changes.

This separation is critical. During replay (when loading from the event store), only the @apply handlers run. The validation in deposit() and withdraw() does not re-execute — those events already happened and were validated at the time. Replay trusts the event history.

Multiple Events, One Aggregate

Let's exercise the full lifecycle:

if __name__ == "__main__":
    with domain.domain_context():
        # Open an account
        account = Account.open(
            account_number="ACC-001",
            holder_name="Alice Johnson",
            opening_deposit=1000.00,
        )

        # Make several transactions
        account.deposit(500.00, reference="paycheck")
        account.deposit(200.00, reference="refund")
        account.withdraw(150.00, reference="groceries")

        # Persist — all four events are written to the event store
        repo = domain.repository_for(Account)
        repo.add(account)

        # Retrieve — all four events are replayed
        loaded = repo.get(account.id)
        print(f"Account: {loaded.holder_name} ({loaded.account_number})")
        print(f"Balance: ${loaded.balance:.2f}")  # 1000 + 500 + 200 - 150 = 1550
        print(f"Version: {loaded._version}")

        # Each event incremented the version
        assert loaded.balance == 1550.00
        assert loaded._version == 3  # 0-indexed: events 0, 1, 2, 3

Run it:

$ python fidelis.py
Account: Alice Johnson (ACC-001)
Balance: $1550.00
Version: 3

The balance is $1,550.00: the opening deposit of $1,000 plus $500 and $200 in deposits, minus the $150 withdrawal. But this number was never stored — it was computed by replaying four events through their @apply handlers.

Version Tracking

Each event increments the aggregate's _version. After four events (opened, two deposits, one withdrawal), the version is 3 (0-indexed). The version serves as an optimistic concurrency check — if two processes try to modify the same account simultaneously, the second one will detect a version conflict.

Validation at the Domain Boundary

Try withdrawing more than the balance:

        # Try an invalid withdrawal
        try:
            loaded.withdraw(10000.00)
        except ValidationError as e:
            print(f"\nRejected: {e.messages}")

Output:

Rejected: {'amount': ['Insufficient funds']}

The validation in withdraw() catches the problem before any event is raised. No event means no state change. The aggregate remains consistent.

Note

This validation lives in the domain method, not in the @apply handler. In the next chapters we will move business rules into invariants — a more robust mechanism that validates state after every @apply handler runs.

What We Built

  • DepositMade and WithdrawalMade events describing financial transactions.
  • Domain methods that validate inputs and raise events.
  • @apply handlers that mutate state from events.
  • An aggregate that derives its balance from multiple events.
  • Version tracking that increments with each event.

The aggregate is starting to feel like a real ledger. Next, we will add commands and the processing pipeline so external systems can interact with our domain through typed contracts.

Full Source

from protean import Domain, apply
from protean.exceptions import ValidationError
from protean.fields import Float, Identifier, String

domain = Domain("fidelis")


@domain.event(part_of="Account")
class AccountOpened:
    account_id: Identifier(required=True)
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.event(part_of="Account")
class DepositMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class WithdrawalMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()




@domain.aggregate(is_event_sourced=True)
class Account:
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")

    @classmethod
    def open(cls, account_number: str, holder_name: str, opening_deposit: float):
        account = cls._create_new()
        account.raise_(
            AccountOpened(
                account_id=str(account.id),
                account_number=account_number,
                holder_name=holder_name,
                opening_deposit=opening_deposit,
            )
        )
        return account

    def deposit(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Deposit amount must be positive"]})
        self.raise_(
            DepositMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def withdraw(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
        if amount > self.balance:
            raise ValidationError({"amount": ["Insufficient funds"]})
        self.raise_(
            WithdrawalMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    @apply
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        self.account_number = event.account_number
        self.holder_name = event.holder_name
        self.balance = event.opening_deposit
        self.status = "ACTIVE"

    @apply
    def on_deposit_made(self, event: DepositMade):
        self.balance += event.amount

    @apply
    def on_withdrawal_made(self, event: WithdrawalMade):
        self.balance -= event.amount


domain.init(traverse=False)


if __name__ == "__main__":
    with domain.domain_context():
        # Open an account
        account = Account.open(
            account_number="ACC-001",
            holder_name="Alice Johnson",
            opening_deposit=1000.00,
        )

        # Make several transactions
        account.deposit(500.00, reference="paycheck")
        account.deposit(200.00, reference="refund")
        account.withdraw(150.00, reference="groceries")

        # Persist — all four events are written to the event store
        repo = domain.repository_for(Account)
        repo.add(account)

        # Retrieve — all four events are replayed
        loaded = repo.get(account.id)
        print(f"Account: {loaded.holder_name} ({loaded.account_number})")
        print(f"Balance: ${loaded.balance:.2f}")  # 1000 + 500 + 200 - 150 = 1550
        print(f"Version: {loaded._version}")

        # Each event incremented the version
        assert loaded.balance == 1550.00
        assert loaded._version == 3  # 0-indexed: events 0, 1, 2, 3
        # Try an invalid withdrawal
        try:
            loaded.withdraw(10000.00)
        except ValidationError as e:
            print(f"\nRejected: {e.messages}")
        print("\nAll checks passed!")

Next

Chapter 3: Commands and the Processing Pipeline →