Skip to content

Chapter 7: Reacting to Events

The compliance team mandates that every deposit over $10,000 triggers a suspicious-activity alert. The marketing team wants a welcome email when a new account opens. These are side effects that do not belong in the aggregate or the projector. In this chapter we will create event handlers — decoupled listeners that react to events and trigger external actions.

Event Handlers vs. Projectors

Projectors Event Handlers
Purpose Maintain read models Trigger side effects
State Update projections Stateless
Decorator @on(EventType) @handle(EventType)
Output Projection writes Commands, API calls, logs

Both consume events, but their responsibilities are different.

The Compliance Alert Handler

@domain.event_handler(part_of=Account)
class ComplianceAlertHandler:
    @handle(DepositMade)
    def on_large_deposit(self, event: DepositMade):
        if event.amount >= 10000:
            print(
                f"  [COMPLIANCE] Large deposit alert: "
                f"${event.amount:.2f} into account {event.account_id}"
            )

This handler watches all DepositMade events. When one exceeds $10,000, it prints an alert. In a real system, this would create a ComplianceAlert aggregate or call an external service.

The Notification Handler

@domain.event_handler(part_of=Account)
class NotificationHandler:
    @handle(AccountOpened)
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        print(
            f"  [NOTIFICATION] Welcome, {event.holder_name}! "
            f"Your account {event.account_number} is now active."
        )

    @handle(WithdrawalMade)
    def on_large_withdrawal(self, event: WithdrawalMade):
        if event.amount >= 5000:
            print(
                f"  [NOTIFICATION] Large withdrawal alert: "
                f"${event.amount:.2f} from account {event.account_id}"
            )

A single event handler can handle multiple event types. The NotificationHandler listens for both AccountOpened (welcome message) and WithdrawalMade (large withdrawal alert).

Multiple Consumers, One Event

A key principle of event-driven architecture: a single event can be consumed by multiple listeners. When a large deposit occurs:

  1. The AccountSummaryProjector updates the dashboard
  2. The ComplianceAlertHandler triggers an alert
  3. The NotificationHandler could send a receipt

None of these consumers know about each other. They are decoupled by the event.

Trying It Out

if __name__ == "__main__":
    with domain.domain_context():
        # Open an account — triggers welcome notification
        print("=== Opening Account ===")
        account_id = domain.process(
            OpenAccount(
                account_number="ACC-001",
                holder_name="Alice Johnson",
                opening_deposit=500.00,
            )
        )

        # Make a $15,000 deposit — triggers compliance alert
        print("\n=== Large Deposit ===")
        domain.process(
            MakeDeposit(
                account_id=account_id,
                amount=15000.00,
                reference="wire transfer",
            )
        )

        # Make a $6,000 withdrawal — triggers notification alert
        print("\n=== Large Withdrawal ===")
        domain.process(
            MakeWithdrawal(
                account_id=account_id,
                amount=6000.00,
                reference="property payment",
            )
        )

        # Verify final balance
        repo = current_domain.repository_for(Account)
        account = repo.get(account_id)
        print(f"\nFinal balance: ${account.balance:.2f}")  # 500 + 15000 - 6000 = 9500

        assert account.balance == 9500.00
        print("\nAll checks passed!")
$ python fidelis.py
[NOTIFICATION] Welcome Alice Johnson! Account ACC-001 is ready.
Account opened: acc-001-uuid...
[COMPLIANCE ALERT] Large deposit: $15,000.00 to account acc-001-uuid
[NOTIFICATION] Large withdrawal alert: $6,000.00
Final balance: $10,000.00

Issuing Commands from Handlers

Event handlers can also issue commands to trigger work in other aggregates. For example, a compliance handler could create an investigation:

@handle(DepositMade)
def on_large_deposit(self, event: DepositMade):
    if event.amount >= 10_000:
        current_domain.process(
            CreateComplianceInvestigation(
                account_id=event.account_id,
                amount=event.amount,
            )
        )

This is how cross-aggregate coordination works without coupling — the Account aggregate knows nothing about the Compliance aggregate. The event handler bridges them.

What We Built

  • ComplianceAlertHandler — reacts to large deposits.
  • NotificationHandler — sends welcome messages and withdrawal alerts.
  • Event handlers that are stateless and decoupled from the aggregate.
  • The pattern for issuing commands from handlers for cross-aggregate coordination.

Our system now reacts to events. But everything still runs synchronously — a slow compliance check blocks the deposit response. In the next chapter, we will switch to asynchronous processing with Redis and the Protean server.

Full Source

from protean import Domain, apply, handle, invariant
from protean.exceptions import ValidationError
from protean.fields import Float, Identifier, String
from protean.utils.globals import current_domain

domain = Domain("fidelis")


@domain.event(part_of="Account")
class AccountOpened:
    account_id: Identifier(required=True)
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.event(part_of="Account")
class DepositMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class WithdrawalMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class AccountClosed:
    account_id: Identifier(required=True)
    reason: String()


@domain.aggregate(is_event_sourced=True)
class Account:
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")

    @invariant.post
    def balance_must_not_be_negative(self):
        if self.balance is not None and self.balance < 0:
            raise ValidationError(
                {"balance": ["Insufficient funds: balance cannot be negative"]}
            )

    @invariant.post
    def closed_account_must_have_zero_balance(self):
        if self.status == "CLOSED" and self.balance != 0:
            raise ValidationError(
                {"status": ["Cannot close account with non-zero balance"]}
            )

    @classmethod
    def open(cls, account_number: str, holder_name: str, opening_deposit: float):
        account = cls._create_new()
        account.raise_(
            AccountOpened(
                account_id=str(account.id),
                account_number=account_number,
                holder_name=holder_name,
                opening_deposit=opening_deposit,
            )
        )
        return account

    def deposit(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Deposit amount must be positive"]})
        self.raise_(
            DepositMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def withdraw(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
        self.raise_(
            WithdrawalMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def close(self, reason: str = None) -> None:
        self.raise_(
            AccountClosed(
                account_id=str(self.id),
                reason=reason,
            )
        )

    @apply
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        self.account_number = event.account_number
        self.holder_name = event.holder_name
        self.balance = event.opening_deposit
        self.status = "ACTIVE"

    @apply
    def on_deposit_made(self, event: DepositMade):
        self.balance += event.amount

    @apply
    def on_withdrawal_made(self, event: WithdrawalMade):
        self.balance -= event.amount

    @apply
    def on_account_closed(self, event: AccountClosed):
        self.status = "CLOSED"


@domain.command(part_of=Account)
class OpenAccount:
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.command(part_of=Account)
class MakeDeposit:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command(part_of=Account)
class MakeWithdrawal:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command(part_of=Account)
class CloseAccount:
    account_id: Identifier(required=True)
    reason: String()


@domain.command_handler(part_of=Account)
class AccountCommandHandler:
    @handle(OpenAccount)
    def handle_open_account(self, command: OpenAccount):
        account = Account.open(
            account_number=command.account_number,
            holder_name=command.holder_name,
            opening_deposit=command.opening_deposit,
        )
        current_domain.repository_for(Account).add(account)
        return str(account.id)

    @handle(MakeDeposit)
    def handle_make_deposit(self, command: MakeDeposit):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.deposit(command.amount, reference=command.reference)
        repo.add(account)

    @handle(MakeWithdrawal)
    def handle_make_withdrawal(self, command: MakeWithdrawal):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.withdraw(command.amount, reference=command.reference)
        repo.add(account)

    @handle(CloseAccount)
    def handle_close_account(self, command: CloseAccount):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.close(reason=command.reason)
        repo.add(account)


@domain.event_handler(part_of=Account)
class ComplianceAlertHandler:
    @handle(DepositMade)
    def on_large_deposit(self, event: DepositMade):
        if event.amount >= 10000:
            print(
                f"  [COMPLIANCE] Large deposit alert: "
                f"${event.amount:.2f} into account {event.account_id}"
            )


@domain.event_handler(part_of=Account)
class NotificationHandler:
    @handle(AccountOpened)
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        print(
            f"  [NOTIFICATION] Welcome, {event.holder_name}! "
            f"Your account {event.account_number} is now active."
        )

    @handle(WithdrawalMade)
    def on_large_withdrawal(self, event: WithdrawalMade):
        if event.amount >= 5000:
            print(
                f"  [NOTIFICATION] Large withdrawal alert: "
                f"${event.amount:.2f} from account {event.account_id}"
            )


domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"


if __name__ == "__main__":
    with domain.domain_context():
        # Open an account — triggers welcome notification
        print("=== Opening Account ===")
        account_id = domain.process(
            OpenAccount(
                account_number="ACC-001",
                holder_name="Alice Johnson",
                opening_deposit=500.00,
            )
        )

        # Make a $15,000 deposit — triggers compliance alert
        print("\n=== Large Deposit ===")
        domain.process(
            MakeDeposit(
                account_id=account_id,
                amount=15000.00,
                reference="wire transfer",
            )
        )

        # Make a $6,000 withdrawal — triggers notification alert
        print("\n=== Large Withdrawal ===")
        domain.process(
            MakeWithdrawal(
                account_id=account_id,
                amount=6000.00,
                reference="property payment",
            )
        )

        # Verify final balance
        repo = current_domain.repository_for(Account)
        account = repo.get(account_id)
        print(f"\nFinal balance: ${account.balance:.2f}")  # 500 + 15000 - 6000 = 9500

        assert account.balance == 9500.00
        print("\nAll checks passed!")

Next

Chapter 8: Going Async — The Server →