Chapter 6: Events and Reactions
In this chapter we will define domain events, raise them from aggregates, and build event handlers that react automatically — creating Inventory records whenever a book is added to the catalog.
Defining Events
When a book is added to the catalog, we want to record that fact as a domain event. Events are named in past tense — they describe something that already happened:
@domain.event(part_of=Book)
class BookAdded:
book_id: Identifier(required=True)
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
price_amount: Float()
Raising Events from Aggregates
Events are raised inside aggregate methods using self.raise_():
@domain.aggregate
class Book:
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
isbn: String(max_length=13)
price = ValueObject(Money)
description: Text()
def add_to_catalog(self):
"""Raise a BookAdded event to notify the rest of the system."""
self.raise_(
BookAdded(
book_id=self.id,
title=self.title,
author=self.author,
price_amount=self.price.amount if self.price else 0,
)
)
When add_to_catalog() is called, the BookAdded event is collected on
the aggregate. It will be dispatched when the aggregate is persisted —
this ensures events are never lost due to transaction failures.
Let's also raise events from the Order aggregate:
@domain.aggregate
class Order:
customer_name: String(max_length=150, required=True)
status: String(
max_length=20, choices=OrderStatus, default=OrderStatus.PENDING.value
)
items = HasMany("OrderItem")
def confirm(self):
self.status = OrderStatus.CONFIRMED.value
self.raise_(OrderConfirmed(order_id=self.id, customer_name=self.customer_name))
def ship(self):
self.status = OrderStatus.SHIPPED.value
self.raise_(OrderShipped(order_id=self.id, customer_name=self.customer_name))
And define the Order events:
@domain.event(part_of=Order)
class OrderConfirmed:
order_id: Identifier(required=True)
customer_name: String(max_length=150, required=True)
@domain.event(part_of=Order)
class OrderShipped:
order_id: Identifier(required=True)
customer_name: String(max_length=150, required=True)
The Inventory Aggregate
Before we build the event handler, we need something for it to manage.
Let's create an Inventory aggregate:
@domain.aggregate
class Inventory:
book_id: Identifier(required=True)
title: String(max_length=200, required=True)
quantity: Integer(default=0)
def adjust_stock(self, amount: int):
self.quantity += amount
Reacting with Event Handlers
An event handler listens for specific events and performs side effects. Let's create one that stocks inventory when a book is added:
@domain.event_handler(part_of=Book)
class BookEventHandler:
@handle(BookAdded)
def on_book_added(self, event: BookAdded):
"""When a book is added to the catalog, create an inventory record."""
inventory = Inventory(
book_id=event.book_id,
title=event.title,
quantity=10, # Start with 10 copies
)
current_domain.repository_for(Inventory).add(inventory)
print(f" [Inventory] Stocked 10 copies of '{event.title}'")
Notice that the handler is registered with part_of=Book — it listens
to events from the Book aggregate. When a BookAdded event is raised,
the on_book_added method runs automatically and creates an inventory
record.
We can also handle Order events to send notifications:
@domain.event_handler(part_of=Order)
class OrderEventHandler:
@handle(OrderConfirmed)
def on_order_confirmed(self, event: OrderConfirmed):
print(
f" [Notification] Order {event.order_id} confirmed for {event.customer_name}"
)
@handle(OrderShipped)
def on_order_shipped(self, event: OrderShipped):
print(
f" [Notification] Order {event.order_id} shipped to {event.customer_name}"
A single event handler class can handle multiple event types.
End-to-End Flow
Let's see the complete flow — adding a book triggers inventory creation, and order lifecycle events trigger notifications:
isbn=command.isbn,
price=Money(amount=command.price_amount),
description=command.description,
)
book.add_to_catalog()
current_domain.repository_for(Book).add(book)
return book.id
domain.init(traverse=False)
if __name__ == "__main__":
with domain.domain_context():
# Add a book — BookAdded event triggers inventory creation
print("Adding book to catalog...")
book_id = domain.process(
AddBook(
title="The Great Gatsby",
author="F. Scott Fitzgerald",
isbn="9780743273565",
price_amount=12.99,
)
)
# Verify inventory was created by the event handler
inventories = current_domain.repository_for(Inventory)._dao.query.all()
assert inventories.total == 1
inv = inventories.items[0]
print(f" Inventory: {inv.title}, qty={inv.quantity}")
# Place and confirm an order
print("\nPlacing an order...")
order = Order(
customer_name="Alice Johnson",
items=[
OrderItem(
book_title="The Great Gatsby",
quantity=2,
unit_price=Money(amount=12.99),
),
],
)
current_domain.repository_for(Order).add(order)
print("Confirming order...")
order.confirm()
current_domain.repository_for(Order).add(order)
print("Shipping order...")
order.ship()
current_domain.repository_for(Order).add(order)
print("\nAll checks passed!")
Run it:
$ python bookshelf.py
Adding book to catalog...
[Inventory] Stocked 10 copies of 'The Great Gatsby'
Inventory: The Great Gatsby, qty=10
Placing an order...
Confirming order...
[Notification] Order e5f6g7h8-... confirmed for Alice Johnson
Shipping order...
[Notification] Order e5f6g7h8-... shipped to Alice Johnson
All checks passed!
Notice that we never called the event handlers ourselves — they ran automatically when events were raised and the aggregate was persisted. The Book aggregate knows nothing about Inventory; the event handler provides the decoupling.
What We Built
BookAdded,OrderConfirmed,OrderShippedevents — immutable facts recorded when state changes.self.raise_()— raises events from aggregate methods.BookEventHandler— reacts toBookAddedby creating inventory.OrderEventHandler— reacts to order lifecycle events.- A fully decoupled flow where adding a book automatically stocks inventory.
In the next chapter, we will build a read-optimized projection for browsing the book catalog.
Full Source
from enum import Enum
from protean import Domain, handle
from protean.fields import (
Float,
HasMany,
Identifier,
Integer,
String,
Text,
ValueObject,
)
from protean.utils.globals import current_domain
domain = Domain()
domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"
@domain.value_object
class Money:
currency: String(max_length=3, default="USD")
amount: Float(required=True)
@domain.aggregate
class Book:
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
isbn: String(max_length=13)
price = ValueObject(Money)
description: Text()
def add_to_catalog(self):
"""Raise a BookAdded event to notify the rest of the system."""
self.raise_(
BookAdded(
book_id=self.id,
title=self.title,
author=self.author,
price_amount=self.price.amount if self.price else 0,
)
)
@domain.event(part_of=Book)
class BookAdded:
book_id: Identifier(required=True)
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
price_amount: Float()
class OrderStatus(Enum):
PENDING = "PENDING"
CONFIRMED = "CONFIRMED"
SHIPPED = "SHIPPED"
@domain.aggregate
class Order:
customer_name: String(max_length=150, required=True)
status: String(
max_length=20, choices=OrderStatus, default=OrderStatus.PENDING.value
)
items = HasMany("OrderItem")
def confirm(self):
self.status = OrderStatus.CONFIRMED.value
self.raise_(OrderConfirmed(order_id=self.id, customer_name=self.customer_name))
def ship(self):
self.status = OrderStatus.SHIPPED.value
self.raise_(OrderShipped(order_id=self.id, customer_name=self.customer_name))
@domain.entity(part_of=Order)
class OrderItem:
book_title: String(max_length=200, required=True)
quantity: Integer(required=True)
unit_price = ValueObject(Money)
@domain.event(part_of=Order)
class OrderConfirmed:
order_id: Identifier(required=True)
customer_name: String(max_length=150, required=True)
@domain.event(part_of=Order)
class OrderShipped:
order_id: Identifier(required=True)
customer_name: String(max_length=150, required=True)
@domain.aggregate
class Inventory:
book_id: Identifier(required=True)
title: String(max_length=200, required=True)
quantity: Integer(default=0)
def adjust_stock(self, amount: int):
self.quantity += amount
@domain.event_handler(part_of=Book)
class BookEventHandler:
@handle(BookAdded)
def on_book_added(self, event: BookAdded):
"""When a book is added to the catalog, create an inventory record."""
inventory = Inventory(
book_id=event.book_id,
title=event.title,
quantity=10, # Start with 10 copies
)
current_domain.repository_for(Inventory).add(inventory)
print(f" [Inventory] Stocked 10 copies of '{event.title}'")
@domain.event_handler(part_of=Order)
class OrderEventHandler:
@handle(OrderConfirmed)
def on_order_confirmed(self, event: OrderConfirmed):
print(
f" [Notification] Order {event.order_id} confirmed for {event.customer_name}"
)
@handle(OrderShipped)
def on_order_shipped(self, event: OrderShipped):
print(
f" [Notification] Order {event.order_id} shipped to {event.customer_name}"
)
@domain.command(part_of=Book)
class AddBook:
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
isbn: String(max_length=13)
price_amount: Float(required=True)
description: Text()
@domain.command_handler(part_of=Book)
class BookCommandHandler:
@handle(AddBook)
def add_book(self, command: AddBook) -> Identifier:
book = Book(
title=command.title,
author=command.author,
isbn=command.isbn,
price=Money(amount=command.price_amount),
description=command.description,
)
book.add_to_catalog()
current_domain.repository_for(Book).add(book)
return book.id
domain.init(traverse=False)
if __name__ == "__main__":
with domain.domain_context():
# Add a book — BookAdded event triggers inventory creation
print("Adding book to catalog...")
book_id = domain.process(
AddBook(
title="The Great Gatsby",
author="F. Scott Fitzgerald",
isbn="9780743273565",
price_amount=12.99,
)
)
# Verify inventory was created by the event handler
inventories = current_domain.repository_for(Inventory)._dao.query.all()
assert inventories.total == 1
inv = inventories.items[0]
print(f" Inventory: {inv.title}, qty={inv.quantity}")
# Place and confirm an order
print("\nPlacing an order...")
order = Order(
customer_name="Alice Johnson",
items=[
OrderItem(
book_title="The Great Gatsby",
quantity=2,
unit_price=Money(amount=12.99),
),
],
)
current_domain.repository_for(Order).add(order)
print("Confirming order...")
order.confirm()
current_domain.repository_for(Order).add(order)
print("Shipping order...")
order.ship()
current_domain.repository_for(Order).add(order)
print("\nAll checks passed!")