Chapter 8: Event Handlers — Reacting to Change
In the previous chapter we raised events from our aggregates, but nothing reacted to them. In this chapter we add event handlers — decoupled components that process events to trigger side effects like sending notifications or updating other aggregates.
Why Event Handlers?
When an order is confirmed, several things should happen: send a
confirmation email, reserve inventory, update analytics. But the Order
aggregate should not know about any of these. Its job is to manage order
state, not send emails.
Event handlers solve this by reacting to events without coupling:
graph LR
O[Order.confirm] -->|raises| E[OrderConfirmed]
E --> H1[Send Email]
E --> H2[Reserve Inventory]
E --> H3[Update Analytics]
style E fill:#fce4ec
The aggregate raises the event. Handlers independently decide what to do with it.
Introducing the Inventory Aggregate
Before we build handlers, let's add a simple Inventory aggregate that
tracks stock for each book:
class Inventory:
book_id = Identifier(required=True)
title = String(max_length=200, required=True)
quantity = Integer(default=0)
def adjust_stock(self, amount: int):
self.quantity += amount
We will use event handlers to keep inventory in sync automatically.
Defining Event Handlers
An event handler is a class that processes one or more events:
@domain.event_handler(part_of=Book)
class BookEventHandler:
@handle(BookAdded)
def on_book_added(self, event: BookAdded):
"""When a book is added to the catalog, create an inventory record."""
inventory = Inventory(
book_id=event.book_id,
title=event.title,
quantity=10, # Start with 10 copies
Key points:
@domain.event_handler(part_of=Book)registers the handler within the Book aggregate's context.@handle(BookAdded)marks the method that processesBookAddedevents.- Inside the handler, we can create or update other aggregates — here we
create an
Inventoryrecord for the new book.
Multiple Handlers in One Class
A single handler class can process multiple events:
@domain.event_handler(part_of=Order)
class OrderEventHandler:
@handle(OrderConfirmed)
Each @handle method processes a different event type. The handler
class acts as a logical grouping of related reactions.
How Events Flow
When event processing is set to "sync", events are dispatched
immediately after the aggregate is persisted:
sequenceDiagram
participant App
participant Repo as Repository
participant EH as Event Handler
App->>Repo: repo.add(book) [with BookAdded event]
Repo->>Repo: Persist aggregate
Repo->>EH: Dispatch BookAdded
EH->>EH: Create Inventory record
EH-->>Repo: Persist Inventory
The flow is:
- You call
repo.add(book)— the book had a pendingBookAddedevent - The repository persists the book
- The repository dispatches pending events to handlers
- The
BookEventHandler.on_book_addedcreates anInventoryrecord
Sync vs Async Processing
We have been using synchronous event processing:
domain.config["event_processing"] = "sync"
In production, you will switch to asynchronous processing:
domain.config["event_processing"] = "async"
With async processing, events are written to a message store (event store or broker) and processed by the Protean server — a background process that subscribes to event streams and dispatches to handlers.
This gives you:
- Scalability: handlers run in separate processes
- Resilience: failed handlers can retry
- Decoupling: event producers and consumers are independent
We will set this up in Chapter 13.
End-to-End Flow
Let's trace the complete flow from adding a book to seeing inventory created automatically:
title=command.title,
author=command.author,
isbn=command.isbn,
price=Money(amount=command.price_amount),
description=command.description,
)
book.add_to_catalog()
current_domain.repository_for(Book).add(book)
return book.id
domain.init(traverse=False)
if __name__ == "__main__":
with domain.domain_context():
# Add a book — BookAdded event → inventory created
print("Adding book to catalog...")
book_id = domain.process(
AddBook(
title="The Great Gatsby",
author="F. Scott Fitzgerald",
isbn="9780743273565",
price_amount=12.99,
)
)
# Verify inventory was created by the event handler
inventories = current_domain.repository_for(Inventory)._dao.query.all()
assert inventories.total == 1
inv = inventories.items[0]
print(f" Inventory: {inv.title}, qty={inv.quantity}")
# Place and confirm an order
print("\nPlacing an order...")
order = Order(
customer_name="Alice Johnson",
items=[
OrderItem(
book_title="The Great Gatsby",
Run it:
$ python bookshelf.py
Adding book to catalog...
[Inventory] Stocked 10 copies of 'The Great Gatsby'
Inventory: The Great Gatsby, qty=10
Placing an order...
Confirming order...
[Notification] Order e5f6... confirmed for Alice Johnson
Shipping order...
[Notification] Order e5f6... shipped to Alice Johnson
All checks passed!
Notice how the inventory was created automatically when the book was added, and notifications were triggered when the order was confirmed and shipped — all through event handlers.
Full Source
from enum import Enum
from protean import Domain, handle
from protean.fields import (
Float,
HasMany,
Identifier,
Integer,
String,
Text,
ValueObject,
)
from protean.utils.globals import current_domain
domain = Domain()
domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"
@domain.value_object
class Money:
currency = String(max_length=3, default="USD")
amount = Float(required=True)
@domain.aggregate
class Book:
title = String(max_length=200, required=True)
author = String(max_length=150, required=True)
isbn = String(max_length=13)
price = ValueObject(Money)
description = Text()
def add_to_catalog(self):
self.raise_(
BookAdded(
book_id=self.id,
title=self.title,
author=self.author,
price_amount=self.price.amount if self.price else 0,
)
)
@domain.event(part_of=Book)
class BookAdded:
book_id = Identifier(required=True)
title = String(max_length=200, required=True)
author = String(max_length=150, required=True)
price_amount = Float()
class OrderStatus(Enum):
PENDING = "PENDING"
CONFIRMED = "CONFIRMED"
SHIPPED = "SHIPPED"
@domain.aggregate
class Order:
customer_name = String(max_length=150, required=True)
status = String(
max_length=20, choices=OrderStatus, default=OrderStatus.PENDING.value
)
items = HasMany("OrderItem")
def confirm(self):
self.status = OrderStatus.CONFIRMED.value
self.raise_(OrderConfirmed(order_id=self.id, customer_name=self.customer_name))
def ship(self):
self.status = OrderStatus.SHIPPED.value
self.raise_(OrderShipped(order_id=self.id, customer_name=self.customer_name))
@domain.entity(part_of=Order)
class OrderItem:
book_title = String(max_length=200, required=True)
quantity = Integer(required=True)
unit_price = ValueObject(Money)
@domain.event(part_of=Order)
class OrderPlaced:
order_id = Identifier(required=True)
customer_name = String(max_length=150, required=True)
total_items = Integer(required=True)
@domain.event(part_of=Order)
class OrderConfirmed:
order_id = Identifier(required=True)
customer_name = String(max_length=150, required=True)
@domain.event(part_of=Order)
class OrderShipped:
order_id = Identifier(required=True)
customer_name = String(max_length=150, required=True)
@domain.aggregate
class Inventory:
book_id = Identifier(required=True)
title = String(max_length=200, required=True)
quantity = Integer(default=0)
def adjust_stock(self, amount: int):
self.quantity += amount
@domain.event_handler(part_of=Book)
class BookEventHandler:
@handle(BookAdded)
def on_book_added(self, event: BookAdded):
"""When a book is added to the catalog, create an inventory record."""
inventory = Inventory(
book_id=event.book_id,
title=event.title,
quantity=10, # Start with 10 copies
)
current_domain.repository_for(Inventory).add(inventory)
print(f" [Inventory] Stocked 10 copies of '{event.title}'")
@domain.event_handler(part_of=Order)
class OrderEventHandler:
@handle(OrderConfirmed)
def on_order_confirmed(self, event: OrderConfirmed):
print(
f" [Notification] Order {event.order_id} confirmed for {event.customer_name}"
)
@handle(OrderShipped)
def on_order_shipped(self, event: OrderShipped):
print(
f" [Notification] Order {event.order_id} shipped to {event.customer_name}"
)
@domain.command(part_of=Book)
class AddBook:
title = String(max_length=200, required=True)
author = String(max_length=150, required=True)
isbn = String(max_length=13)
price_amount = Float(required=True)
description = Text()
@domain.command_handler(part_of=Book)
class BookCommandHandler:
@handle(AddBook)
def add_book(self, command: AddBook) -> Identifier:
book = Book(
title=command.title,
author=command.author,
isbn=command.isbn,
price=Money(amount=command.price_amount),
description=command.description,
)
book.add_to_catalog()
current_domain.repository_for(Book).add(book)
return book.id
domain.init(traverse=False)
if __name__ == "__main__":
with domain.domain_context():
# Add a book — BookAdded event → inventory created
print("Adding book to catalog...")
book_id = domain.process(
AddBook(
title="The Great Gatsby",
author="F. Scott Fitzgerald",
isbn="9780743273565",
price_amount=12.99,
)
)
# Verify inventory was created by the event handler
inventories = current_domain.repository_for(Inventory)._dao.query.all()
assert inventories.total == 1
inv = inventories.items[0]
print(f" Inventory: {inv.title}, qty={inv.quantity}")
# Place and confirm an order
print("\nPlacing an order...")
order = Order(
customer_name="Alice Johnson",
items=[
OrderItem(
book_title="The Great Gatsby",
quantity=2,
unit_price=Money(amount=12.99),
),
],
)
current_domain.repository_for(Order).add(order)
print("Confirming order...")
order.confirm()
current_domain.repository_for(Order).add(order)
print("Shipping order...")
order.ship()
current_domain.repository_for(Order).add(order)
print("\nAll checks passed!")
Summary
In this chapter you learned:
- Event handlers react to domain events, decoupling side effects from the aggregate that raised the event.
@domain.event_handler(part_of=...)registers a handler, and@handle(EventClass)marks which events it processes.- A single handler class can process multiple events with separate
@handlemethods. - Handlers can create or modify other aggregates — enabling cross-aggregate coordination through events.
- Sync processing dispatches immediately; async processing runs handlers in a background server.
We have covered all the fundamental building blocks: aggregates, entities, value objects, commands, events, and handlers. In the next part we will add services for complex use case coordination and projections for read-optimized views.