Chapter 6: Events and Reactions
In this chapter we will define domain events, raise them from aggregates, and build event handlers that react automatically — creating Inventory records whenever a book is added to the catalog. We will wire everything through the command pipeline introduced in Chapter 5.
Events in DDD and CQRS
Domain events and event handlers are used in both the DDD and CQRS approaches — they're a core DDD concept. The difference is in how events get triggered: in CQRS, commands flow through handlers that invoke aggregate methods; in pure DDD, Application Services do the same job directly.
Defining Events
When a book is added to the catalog, we want to record that fact as a domain event. Events are named in past tense — they describe something that already happened:
@domain.event(part_of=Book)
class BookAdded:
book_id: Identifier(required=True)
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
price_amount: Float()
Raising Events from Aggregates
Events are raised inside aggregate methods using self.raise_():
@domain.aggregate
class Book:
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
isbn: String(max_length=13)
price = ValueObject(Money)
description: Text()
def add_to_catalog(self):
"""Raise a BookAdded event to notify the rest of the system."""
self.raise_(
BookAdded(
book_id=self.id,
title=self.title,
author=self.author,
price_amount=self.price.amount if self.price else 0,
)
)
When add_to_catalog() is called, the BookAdded event is collected on
the aggregate. It will be dispatched when the aggregate is persisted —
this ensures events are never lost due to transaction failures.
Let's also raise events from the Order aggregate:
@domain.aggregate
class Order:
customer_name: String(max_length=150, required=True)
status: String(
max_length=20, choices=OrderStatus, default=OrderStatus.PENDING.value
)
items = HasMany("OrderItem")
def confirm(self):
self.status = OrderStatus.CONFIRMED.value
self.raise_(OrderConfirmed(order_id=self.id, customer_name=self.customer_name))
def ship(self):
self.status = OrderStatus.SHIPPED.value
self.raise_(OrderShipped(order_id=self.id, customer_name=self.customer_name))
And define the Order events:
@domain.event(part_of=Order)
class OrderConfirmed:
order_id: Identifier(required=True)
customer_name: String(max_length=150, required=True)
@domain.event(part_of=Order)
class OrderShipped:
order_id: Identifier(required=True)
customer_name: String(max_length=150, required=True)
The Inventory Aggregate
Before we build the event handler, we need something for it to manage.
Let's create an Inventory aggregate:
@domain.aggregate
class Inventory:
book_id: Identifier(required=True)
title: String(max_length=200, required=True)
quantity: Integer(default=0)
def adjust_stock(self, amount: int):
self.quantity += amount
Reacting with Event Handlers
An event handler listens for specific events and performs side effects. Let's create one that stocks inventory when a book is added:
@domain.event_handler(part_of=Book)
class BookEventHandler:
@handle(BookAdded)
def on_book_added(self, event: BookAdded):
"""When a book is added to the catalog, create an inventory record."""
inventory = Inventory(
book_id=event.book_id,
title=event.title,
quantity=10, # Start with 10 copies
)
current_domain.repository_for(Inventory).add(inventory)
print(f" [Inventory] Stocked 10 copies of '{event.title}'")
Notice that the handler is registered with part_of=Book — it listens
to events from the Book aggregate. When a BookAdded event is raised,
the on_book_added method runs automatically and creates an inventory
record.
We can also handle Order events to send notifications:
@domain.event_handler(part_of=Order)
class OrderEventHandler:
@handle(OrderConfirmed)
def on_order_confirmed(self, event: OrderConfirmed):
print(
f" [Notification] Order {event.order_id} confirmed for {event.customer_name}"
)
@handle(OrderShipped)
def on_order_shipped(self, event: OrderShipped):
print(
f" [Notification] Order {event.order_id} shipped to {event.customer_name}"
)
A single event handler class can handle multiple event types.
Commands for the Full Flow
Just like Chapter 5, we define commands and handlers for every state change. Here are the commands for placing, confirming, and shipping orders:
@domain.command(part_of=Book)
class AddBook:
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
isbn: String(max_length=13)
price_amount: Float(required=True)
description: Text()
@domain.command(part_of=Order)
class PlaceOrder:
customer_name: String(max_length=150, required=True)
book_title: String(max_length=200, required=True)
quantity: Integer(required=True)
unit_price_amount: Float(required=True)
@domain.command(part_of=Order)
class ConfirmOrder:
order_id: Identifier(required=True)
@domain.command(part_of=Order)
class ShipOrder:
order_id: Identifier(required=True)
And the command handlers that orchestrate the flow:
@domain.command_handler(part_of=Book)
class BookCommandHandler:
@handle(AddBook)
def add_book(self, command: AddBook) -> Identifier:
book = Book(
title=command.title,
author=command.author,
isbn=command.isbn,
price=Money(amount=command.price_amount),
description=command.description,
)
book.add_to_catalog()
current_domain.repository_for(Book).add(book)
return book.id
@domain.command_handler(part_of=Order)
class OrderCommandHandler:
@handle(PlaceOrder)
def place_order(self, command: PlaceOrder) -> Identifier:
order = Order(
customer_name=command.customer_name,
items=[
OrderItem(
book_title=command.book_title,
quantity=command.quantity,
unit_price=Money(amount=command.unit_price_amount),
),
],
)
current_domain.repository_for(Order).add(order)
return order.id
@handle(ConfirmOrder)
def confirm_order(self, command: ConfirmOrder) -> None:
repo = current_domain.repository_for(Order)
order = repo.get(command.order_id)
order.confirm()
repo.add(order)
@handle(ShipOrder)
def ship_order(self, command: ShipOrder) -> None:
repo = current_domain.repository_for(Order)
order = repo.get(command.order_id)
order.ship()
repo.add(order)
The BookCommandHandler calls book.add_to_catalog() which raises
the BookAdded event. The OrderCommandHandler loads the order from
the repository, calls the domain method, and persists the result — the
events are dispatched automatically on save.
End-to-End Flow
Let's see the complete flow — adding a book triggers inventory creation,
and order lifecycle events trigger notifications. Everything flows
through domain.process():
if __name__ == "__main__":
with domain.domain_context():
# Add a book — BookAdded event triggers inventory creation
print("Adding book to catalog...")
book_id = domain.process(
AddBook(
title="The Great Gatsby",
author="F. Scott Fitzgerald",
isbn="9780743273565",
price_amount=12.99,
)
)
# Verify inventory was created by the event handler
inventories = current_domain.repository_for(Inventory).query.all()
assert inventories.total == 1
inv = inventories.items[0]
print(f" Inventory: {inv.title}, qty={inv.quantity}")
# Place an order — through the command pipeline
print("\nPlacing an order...")
order_id = domain.process(
PlaceOrder(
customer_name="Alice Johnson",
book_title="The Great Gatsby",
quantity=2,
unit_price_amount=12.99,
)
)
# Confirm the order — triggers OrderConfirmed event
print("Confirming order...")
domain.process(ConfirmOrder(order_id=order_id))
# Ship the order — triggers OrderShipped event
print("Shipping order...")
domain.process(ShipOrder(order_id=order_id))
# Verify final state
order = current_domain.repository_for(Order).get(order_id)
assert order.status == "SHIPPED"
assert len(order.items) == 1
print("\nAll checks passed!")
Run it:
$ python bookshelf.py
Adding book to catalog...
[Inventory] Stocked 10 copies of 'The Great Gatsby'
Inventory: The Great Gatsby, qty=10
Placing an order...
Confirming order...
[Notification] Order e5f6g7h8-... confirmed for Alice Johnson
Shipping order...
[Notification] Order e5f6g7h8-... shipped to Alice Johnson
All checks passed!
Notice that we never called the event handlers ourselves — they ran automatically when events were raised and the aggregate was persisted. The Book aggregate knows nothing about Inventory; the event handler provides the decoupling. And every operation flows through explicit commands, just like Chapter 5.
What We Built
BookAdded,OrderConfirmed,OrderShippedevents — immutable facts recorded when state changes.self.raise_()— raises events from aggregate methods.BookEventHandler— reacts toBookAddedby creating inventory.OrderEventHandler— reacts to order lifecycle events.PlaceOrder,ConfirmOrder,ShipOrdercommands — every operation flows throughdomain.process().- A fully decoupled flow where adding a book automatically stocks inventory.
In the next chapter, we will build a read-optimized projection for browsing the book catalog — the read side of CQRS.
Full Source
from enum import Enum
from protean import Domain, handle
from protean.fields import (
Float,
HasMany,
Identifier,
Integer,
String,
Text,
ValueObject,
)
from protean.utils.globals import current_domain
domain = Domain()
domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"
@domain.value_object
class Money:
currency: String(max_length=3, default="USD")
amount: Float(required=True)
@domain.aggregate
class Book:
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
isbn: String(max_length=13)
price = ValueObject(Money)
description: Text()
def add_to_catalog(self):
"""Raise a BookAdded event to notify the rest of the system."""
self.raise_(
BookAdded(
book_id=self.id,
title=self.title,
author=self.author,
price_amount=self.price.amount if self.price else 0,
)
)
@domain.event(part_of=Book)
class BookAdded:
book_id: Identifier(required=True)
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
price_amount: Float()
class OrderStatus(Enum):
PENDING = "PENDING"
CONFIRMED = "CONFIRMED"
SHIPPED = "SHIPPED"
@domain.aggregate
class Order:
customer_name: String(max_length=150, required=True)
status: String(
max_length=20, choices=OrderStatus, default=OrderStatus.PENDING.value
)
items = HasMany("OrderItem")
def confirm(self):
self.status = OrderStatus.CONFIRMED.value
self.raise_(OrderConfirmed(order_id=self.id, customer_name=self.customer_name))
def ship(self):
self.status = OrderStatus.SHIPPED.value
self.raise_(OrderShipped(order_id=self.id, customer_name=self.customer_name))
@domain.entity(part_of=Order)
class OrderItem:
book_title: String(max_length=200, required=True)
quantity: Integer(required=True)
unit_price = ValueObject(Money)
@domain.event(part_of=Order)
class OrderConfirmed:
order_id: Identifier(required=True)
customer_name: String(max_length=150, required=True)
@domain.event(part_of=Order)
class OrderShipped:
order_id: Identifier(required=True)
customer_name: String(max_length=150, required=True)
@domain.aggregate
class Inventory:
book_id: Identifier(required=True)
title: String(max_length=200, required=True)
quantity: Integer(default=0)
def adjust_stock(self, amount: int):
self.quantity += amount
@domain.event_handler(part_of=Book)
class BookEventHandler:
@handle(BookAdded)
def on_book_added(self, event: BookAdded):
"""When a book is added to the catalog, create an inventory record."""
inventory = Inventory(
book_id=event.book_id,
title=event.title,
quantity=10, # Start with 10 copies
)
current_domain.repository_for(Inventory).add(inventory)
print(f" [Inventory] Stocked 10 copies of '{event.title}'")
@domain.event_handler(part_of=Order)
class OrderEventHandler:
@handle(OrderConfirmed)
def on_order_confirmed(self, event: OrderConfirmed):
print(
f" [Notification] Order {event.order_id} confirmed for {event.customer_name}"
)
@handle(OrderShipped)
def on_order_shipped(self, event: OrderShipped):
print(
f" [Notification] Order {event.order_id} shipped to {event.customer_name}"
)
@domain.command(part_of=Book)
class AddBook:
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
isbn: String(max_length=13)
price_amount: Float(required=True)
description: Text()
@domain.command(part_of=Order)
class PlaceOrder:
customer_name: String(max_length=150, required=True)
book_title: String(max_length=200, required=True)
quantity: Integer(required=True)
unit_price_amount: Float(required=True)
@domain.command(part_of=Order)
class ConfirmOrder:
order_id: Identifier(required=True)
@domain.command(part_of=Order)
class ShipOrder:
order_id: Identifier(required=True)
@domain.command_handler(part_of=Book)
class BookCommandHandler:
@handle(AddBook)
def add_book(self, command: AddBook) -> Identifier:
book = Book(
title=command.title,
author=command.author,
isbn=command.isbn,
price=Money(amount=command.price_amount),
description=command.description,
)
book.add_to_catalog()
current_domain.repository_for(Book).add(book)
return book.id
@domain.command_handler(part_of=Order)
class OrderCommandHandler:
@handle(PlaceOrder)
def place_order(self, command: PlaceOrder) -> Identifier:
order = Order(
customer_name=command.customer_name,
items=[
OrderItem(
book_title=command.book_title,
quantity=command.quantity,
unit_price=Money(amount=command.unit_price_amount),
),
],
)
current_domain.repository_for(Order).add(order)
return order.id
@handle(ConfirmOrder)
def confirm_order(self, command: ConfirmOrder) -> None:
repo = current_domain.repository_for(Order)
order = repo.get(command.order_id)
order.confirm()
repo.add(order)
@handle(ShipOrder)
def ship_order(self, command: ShipOrder) -> None:
repo = current_domain.repository_for(Order)
order = repo.get(command.order_id)
order.ship()
repo.add(order)
domain.init(traverse=False)
if __name__ == "__main__":
with domain.domain_context():
# Add a book — BookAdded event triggers inventory creation
print("Adding book to catalog...")
book_id = domain.process(
AddBook(
title="The Great Gatsby",
author="F. Scott Fitzgerald",
isbn="9780743273565",
price_amount=12.99,
)
)
# Verify inventory was created by the event handler
inventories = current_domain.repository_for(Inventory).query.all()
assert inventories.total == 1
inv = inventories.items[0]
print(f" Inventory: {inv.title}, qty={inv.quantity}")
# Place an order — through the command pipeline
print("\nPlacing an order...")
order_id = domain.process(
PlaceOrder(
customer_name="Alice Johnson",
book_title="The Great Gatsby",
quantity=2,
unit_price_amount=12.99,
)
)
# Confirm the order — triggers OrderConfirmed event
print("Confirming order...")
domain.process(ConfirmOrder(order_id=order_id))
# Ship the order — triggers OrderShipped event
print("Shipping order...")
domain.process(ShipOrder(order_id=order_id))
# Verify final state
order = current_domain.repository_for(Order).get(order_id)
assert order.status == "SHIPPED"
assert len(order.items) == 1
print("\nAll checks passed!")