Chapter 7: Reacting to Events
The compliance team mandates that every deposit over $10,000 triggers a suspicious-activity alert. The marketing team wants a welcome email when a new account opens. These are side effects that do not belong in the aggregate or the projector. In this chapter we will create event handlers — decoupled listeners that react to events and trigger external actions.
Event Handlers vs. Projectors
| Projectors | Event Handlers | |
|---|---|---|
| Purpose | Maintain read models | Trigger side effects |
| State | Update projections | Stateless |
| Decorator | @on(EventType) |
@handle(EventType) |
| Output | Projection writes | Commands, API calls, logs |
Both consume events, but their responsibilities are different.
The Compliance Alert Handler
@domain.event_handler(part_of=Account)
class ComplianceAlertHandler:
@handle(DepositMade)
def on_large_deposit(self, event: DepositMade):
if event.amount >= 10000:
print(
f" [COMPLIANCE] Large deposit alert: "
f"${event.amount:.2f} into account {event.account_id}"
)
This handler watches all DepositMade events. When one exceeds $10,000,
it prints an alert. In a real system, this would create a
ComplianceAlert aggregate or call an external service.
The Notification Handler
@domain.event_handler(part_of=Account)
class NotificationHandler:
@handle(AccountOpened)
def on_account_opened(self, event: AccountOpened):
self.id = event.account_id
print(
f" [NOTIFICATION] Welcome, {event.holder_name}! "
f"Your account {event.account_number} is now active."
)
@handle(WithdrawalMade)
def on_large_withdrawal(self, event: WithdrawalMade):
if event.amount >= 5000:
print(
f" [NOTIFICATION] Large withdrawal alert: "
f"${event.amount:.2f} from account {event.account_id}"
)
A single event handler can handle multiple event types. The
NotificationHandler listens for both AccountOpened (welcome message)
and WithdrawalMade (large withdrawal alert).
Multiple Consumers, One Event
A key principle of event-driven architecture: a single event can be consumed by multiple listeners. When a large deposit occurs:
- The AccountSummaryProjector updates the dashboard
- The ComplianceAlertHandler triggers an alert
- The NotificationHandler could send a receipt
None of these consumers know about each other. They are decoupled by the event.
Trying It Out
if __name__ == "__main__":
with domain.domain_context():
# Open an account — triggers welcome notification
print("=== Opening Account ===")
account_id = domain.process(
OpenAccount(
account_number="ACC-001",
holder_name="Alice Johnson",
opening_deposit=500.00,
)
)
# Make a $15,000 deposit — triggers compliance alert
print("\n=== Large Deposit ===")
domain.process(
MakeDeposit(
account_id=account_id,
amount=15000.00,
reference="wire transfer",
)
)
# Make a $6,000 withdrawal — triggers notification alert
print("\n=== Large Withdrawal ===")
domain.process(
MakeWithdrawal(
account_id=account_id,
amount=6000.00,
reference="property payment",
)
)
# Verify final balance
repo = current_domain.repository_for(Account)
account = repo.get(account_id)
print(f"\nFinal balance: ${account.balance:.2f}") # 500 + 15000 - 6000 = 9500
assert account.balance == 9500.00
print("\nAll checks passed!")
$ python fidelis.py
[NOTIFICATION] Welcome Alice Johnson! Account ACC-001 is ready.
Account opened: acc-001-uuid...
[COMPLIANCE ALERT] Large deposit: $15,000.00 to account acc-001-uuid
[NOTIFICATION] Large withdrawal alert: $6,000.00
Final balance: $10,000.00
Issuing Commands from Handlers
Event handlers can also issue commands to trigger work in other aggregates. For example, a compliance handler could create an investigation:
@handle(DepositMade)
def on_large_deposit(self, event: DepositMade):
if event.amount >= 10_000:
current_domain.process(
CreateComplianceInvestigation(
account_id=event.account_id,
amount=event.amount,
)
)
This is how cross-aggregate coordination works without coupling — the Account aggregate knows nothing about the Compliance aggregate. The event handler bridges them.
What We Built
- ComplianceAlertHandler — reacts to large deposits.
- NotificationHandler — sends welcome messages and withdrawal alerts.
- Event handlers that are stateless and decoupled from the aggregate.
- The pattern for issuing commands from handlers for cross-aggregate coordination.
Our system now reacts to events. But everything still runs synchronously — a slow compliance check blocks the deposit response. In the next chapter, we will switch to asynchronous processing with Redis and the Protean server.
Full Source
from protean import Domain, apply, handle, invariant
from protean.exceptions import ValidationError
from protean.fields import Float, Identifier, String
from protean.utils.globals import current_domain
domain = Domain("fidelis")
@domain.event(part_of="Account")
class AccountOpened:
account_id: Identifier(required=True)
account_number: String(required=True)
holder_name: String(required=True)
opening_deposit: Float(required=True)
@domain.event(part_of="Account")
class DepositMade:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.event(part_of="Account")
class WithdrawalMade:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.event(part_of="Account")
class AccountClosed:
account_id: Identifier(required=True)
reason: String()
@domain.aggregate(is_event_sourced=True)
class Account:
account_number: String(max_length=20, required=True)
holder_name: String(max_length=100, required=True)
balance: Float(default=0.0)
status: String(max_length=20, default="ACTIVE")
@invariant.post
def balance_must_not_be_negative(self):
if self.balance is not None and self.balance < 0:
raise ValidationError(
{"balance": ["Insufficient funds: balance cannot be negative"]}
)
@invariant.post
def closed_account_must_have_zero_balance(self):
if self.status == "CLOSED" and self.balance != 0:
raise ValidationError(
{"status": ["Cannot close account with non-zero balance"]}
)
@classmethod
def open(cls, account_number: str, holder_name: str, opening_deposit: float):
account = cls._create_new()
account.raise_(
AccountOpened(
account_id=str(account.id),
account_number=account_number,
holder_name=holder_name,
opening_deposit=opening_deposit,
)
)
return account
def deposit(self, amount: float, reference: str = None) -> None:
if amount <= 0:
raise ValidationError({"amount": ["Deposit amount must be positive"]})
self.raise_(
DepositMade(
account_id=str(self.id),
amount=amount,
reference=reference,
)
)
def withdraw(self, amount: float, reference: str = None) -> None:
if amount <= 0:
raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
self.raise_(
WithdrawalMade(
account_id=str(self.id),
amount=amount,
reference=reference,
)
)
def close(self, reason: str = None) -> None:
self.raise_(
AccountClosed(
account_id=str(self.id),
reason=reason,
)
)
@apply
def on_account_opened(self, event: AccountOpened):
self.id = event.account_id
self.account_number = event.account_number
self.holder_name = event.holder_name
self.balance = event.opening_deposit
self.status = "ACTIVE"
@apply
def on_deposit_made(self, event: DepositMade):
self.balance += event.amount
@apply
def on_withdrawal_made(self, event: WithdrawalMade):
self.balance -= event.amount
@apply
def on_account_closed(self, event: AccountClosed):
self.status = "CLOSED"
@domain.command(part_of=Account)
class OpenAccount:
account_number: String(required=True)
holder_name: String(required=True)
opening_deposit: Float(required=True)
@domain.command(part_of=Account)
class MakeDeposit:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.command(part_of=Account)
class MakeWithdrawal:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.command(part_of=Account)
class CloseAccount:
account_id: Identifier(required=True)
reason: String()
@domain.command_handler(part_of=Account)
class AccountCommandHandler:
@handle(OpenAccount)
def handle_open_account(self, command: OpenAccount):
account = Account.open(
account_number=command.account_number,
holder_name=command.holder_name,
opening_deposit=command.opening_deposit,
)
current_domain.repository_for(Account).add(account)
return str(account.id)
@handle(MakeDeposit)
def handle_make_deposit(self, command: MakeDeposit):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.deposit(command.amount, reference=command.reference)
repo.add(account)
@handle(MakeWithdrawal)
def handle_make_withdrawal(self, command: MakeWithdrawal):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.withdraw(command.amount, reference=command.reference)
repo.add(account)
@handle(CloseAccount)
def handle_close_account(self, command: CloseAccount):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.close(reason=command.reason)
repo.add(account)
@domain.event_handler(part_of=Account)
class ComplianceAlertHandler:
@handle(DepositMade)
def on_large_deposit(self, event: DepositMade):
if event.amount >= 10000:
print(
f" [COMPLIANCE] Large deposit alert: "
f"${event.amount:.2f} into account {event.account_id}"
)
@domain.event_handler(part_of=Account)
class NotificationHandler:
@handle(AccountOpened)
def on_account_opened(self, event: AccountOpened):
self.id = event.account_id
print(
f" [NOTIFICATION] Welcome, {event.holder_name}! "
f"Your account {event.account_number} is now active."
)
@handle(WithdrawalMade)
def on_large_withdrawal(self, event: WithdrawalMade):
if event.amount >= 5000:
print(
f" [NOTIFICATION] Large withdrawal alert: "
f"${event.amount:.2f} from account {event.account_id}"
)
domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"
if __name__ == "__main__":
with domain.domain_context():
# Open an account — triggers welcome notification
print("=== Opening Account ===")
account_id = domain.process(
OpenAccount(
account_number="ACC-001",
holder_name="Alice Johnson",
opening_deposit=500.00,
)
)
# Make a $15,000 deposit — triggers compliance alert
print("\n=== Large Deposit ===")
domain.process(
MakeDeposit(
account_id=account_id,
amount=15000.00,
reference="wire transfer",
)
)
# Make a $6,000 withdrawal — triggers notification alert
print("\n=== Large Withdrawal ===")
domain.process(
MakeWithdrawal(
account_id=account_id,
amount=6000.00,
reference="property payment",
)
)
# Verify final balance
repo = current_domain.repository_for(Account)
account = repo.get(account_id)
print(f"\nFinal balance: ${account.balance:.2f}") # 500 + 15000 - 6000 = 9500
assert account.balance == 9500.00
print("\nAll checks passed!")