Skip to content

Chapter 6: Events and Reactions

In this chapter we will define domain events, raise them from aggregates, and build event handlers that react automatically — creating Inventory records whenever a book is added to the catalog. We will wire everything through the command pipeline introduced in Chapter 5.

Events in DDD and CQRS

Domain events and event handlers are used in both the DDD and CQRS approaches — they're a core DDD concept. The difference is in how events get triggered: in CQRS, commands flow through handlers that invoke aggregate methods; in pure DDD, Application Services do the same job directly.

Defining Events

When a book is added to the catalog, we want to record that fact as a domain event. Events are named in past tense — they describe something that already happened:

@domain.event(part_of=Book)
class BookAdded:
    book_id: Identifier(required=True)
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    price_amount: Float()

Raising Events from Aggregates

Events are raised inside aggregate methods using self.raise_():

@domain.aggregate
class Book:
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    isbn: String(max_length=13)
    price = ValueObject(Money)
    description: Text()

    def add_to_catalog(self):
        """Raise a BookAdded event to notify the rest of the system."""
        self.raise_(
            BookAdded(
                book_id=self.id,
                title=self.title,
                author=self.author,
                price_amount=self.price.amount if self.price else 0,
            )
        )

When add_to_catalog() is called, the BookAdded event is collected on the aggregate. It will be dispatched when the aggregate is persisted — this ensures events are never lost due to transaction failures.

Let's also raise events from the Order aggregate:

@domain.aggregate
class Order:
    customer_name: String(max_length=150, required=True)
    status: String(
        max_length=20, choices=OrderStatus, default=OrderStatus.PENDING.value
    )
    items = HasMany("OrderItem")

    def confirm(self):
        self.status = OrderStatus.CONFIRMED.value
        self.raise_(OrderConfirmed(order_id=self.id, customer_name=self.customer_name))

    def ship(self):
        self.status = OrderStatus.SHIPPED.value
        self.raise_(OrderShipped(order_id=self.id, customer_name=self.customer_name))

And define the Order events:

@domain.event(part_of=Order)
class OrderConfirmed:
    order_id: Identifier(required=True)
    customer_name: String(max_length=150, required=True)


@domain.event(part_of=Order)
class OrderShipped:
    order_id: Identifier(required=True)
    customer_name: String(max_length=150, required=True)

The Inventory Aggregate

Before we build the event handler, we need something for it to manage. Let's create an Inventory aggregate:

@domain.aggregate
class Inventory:
    book_id: Identifier(required=True)
    title: String(max_length=200, required=True)
    quantity: Integer(default=0)

    def adjust_stock(self, amount: int):
        self.quantity += amount

Reacting with Event Handlers

An event handler listens for specific events and performs side effects. Let's create one that stocks inventory when a book is added:

@domain.event_handler(part_of=Book)
class BookEventHandler:
    @handle(BookAdded)
    def on_book_added(self, event: BookAdded):
        """When a book is added to the catalog, create an inventory record."""
        inventory = Inventory(
            book_id=event.book_id,
            title=event.title,
            quantity=10,  # Start with 10 copies
        )
        current_domain.repository_for(Inventory).add(inventory)
        print(f"  [Inventory] Stocked 10 copies of '{event.title}'")

Notice that the handler is registered with part_of=Book — it listens to events from the Book aggregate. When a BookAdded event is raised, the on_book_added method runs automatically and creates an inventory record.

We can also handle Order events to send notifications:

@domain.event_handler(part_of=Order)
class OrderEventHandler:
    @handle(OrderConfirmed)
    def on_order_confirmed(self, event: OrderConfirmed):
        print(
            f"  [Notification] Order {event.order_id} confirmed for {event.customer_name}"
        )

    @handle(OrderShipped)
    def on_order_shipped(self, event: OrderShipped):
        print(
            f"  [Notification] Order {event.order_id} shipped to {event.customer_name}"
        )

A single event handler class can handle multiple event types.

Commands for the Full Flow

Just like Chapter 5, we define commands and handlers for every state change. Here are the commands for placing, confirming, and shipping orders:

@domain.command(part_of=Book)
class AddBook:
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    isbn: String(max_length=13)
    price_amount: Float(required=True)
    description: Text()


@domain.command(part_of=Order)
class PlaceOrder:
    customer_name: String(max_length=150, required=True)
    book_title: String(max_length=200, required=True)
    quantity: Integer(required=True)
    unit_price_amount: Float(required=True)


@domain.command(part_of=Order)
class ConfirmOrder:
    order_id: Identifier(required=True)


@domain.command(part_of=Order)
class ShipOrder:
    order_id: Identifier(required=True)

And the command handlers that orchestrate the flow:

@domain.command_handler(part_of=Book)
class BookCommandHandler:
    @handle(AddBook)
    def add_book(self, command: AddBook) -> Identifier:
        book = Book(
            title=command.title,
            author=command.author,
            isbn=command.isbn,
            price=Money(amount=command.price_amount),
            description=command.description,
        )
        book.add_to_catalog()
        current_domain.repository_for(Book).add(book)
        return book.id




@domain.command_handler(part_of=Order)
class OrderCommandHandler:
    @handle(PlaceOrder)
    def place_order(self, command: PlaceOrder) -> Identifier:
        order = Order(
            customer_name=command.customer_name,
            items=[
                OrderItem(
                    book_title=command.book_title,
                    quantity=command.quantity,
                    unit_price=Money(amount=command.unit_price_amount),
                ),
            ],
        )
        current_domain.repository_for(Order).add(order)
        return order.id

    @handle(ConfirmOrder)
    def confirm_order(self, command: ConfirmOrder) -> None:
        repo = current_domain.repository_for(Order)
        order = repo.get(command.order_id)
        order.confirm()
        repo.add(order)

    @handle(ShipOrder)
    def ship_order(self, command: ShipOrder) -> None:
        repo = current_domain.repository_for(Order)
        order = repo.get(command.order_id)
        order.ship()
        repo.add(order)

The BookCommandHandler calls book.add_to_catalog() which raises the BookAdded event. The OrderCommandHandler loads the order from the repository, calls the domain method, and persists the result — the events are dispatched automatically on save.

End-to-End Flow

Let's see the complete flow — adding a book triggers inventory creation, and order lifecycle events trigger notifications. Everything flows through domain.process():

if __name__ == "__main__":
    with domain.domain_context():
        # Add a book — BookAdded event triggers inventory creation
        print("Adding book to catalog...")
        book_id = domain.process(
            AddBook(
                title="The Great Gatsby",
                author="F. Scott Fitzgerald",
                isbn="9780743273565",
                price_amount=12.99,
            )
        )

        # Verify inventory was created by the event handler
        inventories = current_domain.repository_for(Inventory).query.all()
        assert inventories.total == 1
        inv = inventories.items[0]
        print(f"  Inventory: {inv.title}, qty={inv.quantity}")

        # Place an order — through the command pipeline
        print("\nPlacing an order...")
        order_id = domain.process(
            PlaceOrder(
                customer_name="Alice Johnson",
                book_title="The Great Gatsby",
                quantity=2,
                unit_price_amount=12.99,
            )
        )

        # Confirm the order — triggers OrderConfirmed event
        print("Confirming order...")
        domain.process(ConfirmOrder(order_id=order_id))

        # Ship the order — triggers OrderShipped event
        print("Shipping order...")
        domain.process(ShipOrder(order_id=order_id))

        # Verify final state
        order = current_domain.repository_for(Order).get(order_id)
        assert order.status == "SHIPPED"
        assert len(order.items) == 1

        print("\nAll checks passed!")

Run it:

$ python bookshelf.py
Adding book to catalog...
  [Inventory] Stocked 10 copies of 'The Great Gatsby'
  Inventory: The Great Gatsby, qty=10

Placing an order...
Confirming order...
  [Notification] Order e5f6g7h8-... confirmed for Alice Johnson
Shipping order...
  [Notification] Order e5f6g7h8-... shipped to Alice Johnson

All checks passed!

Notice that we never called the event handlers ourselves — they ran automatically when events were raised and the aggregate was persisted. The Book aggregate knows nothing about Inventory; the event handler provides the decoupling. And every operation flows through explicit commands, just like Chapter 5.

What We Built

  • BookAdded, OrderConfirmed, OrderShipped events — immutable facts recorded when state changes.
  • self.raise_() — raises events from aggregate methods.
  • BookEventHandler — reacts to BookAdded by creating inventory.
  • OrderEventHandler — reacts to order lifecycle events.
  • PlaceOrder, ConfirmOrder, ShipOrder commands — every operation flows through domain.process().
  • A fully decoupled flow where adding a book automatically stocks inventory.

In the next chapter, we will build a read-optimized projection for browsing the book catalog — the read side of CQRS.

Full Source

from enum import Enum

from protean import Domain, handle
from protean.fields import (
    Float,
    HasMany,
    Identifier,
    Integer,
    String,
    Text,
    ValueObject,
)
from protean.utils.globals import current_domain

domain = Domain()

domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"


@domain.value_object
class Money:
    currency: String(max_length=3, default="USD")
    amount: Float(required=True)


@domain.aggregate
class Book:
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    isbn: String(max_length=13)
    price = ValueObject(Money)
    description: Text()

    def add_to_catalog(self):
        """Raise a BookAdded event to notify the rest of the system."""
        self.raise_(
            BookAdded(
                book_id=self.id,
                title=self.title,
                author=self.author,
                price_amount=self.price.amount if self.price else 0,
            )
        )




@domain.event(part_of=Book)
class BookAdded:
    book_id: Identifier(required=True)
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    price_amount: Float()




class OrderStatus(Enum):
    PENDING = "PENDING"
    CONFIRMED = "CONFIRMED"
    SHIPPED = "SHIPPED"


@domain.aggregate
class Order:
    customer_name: String(max_length=150, required=True)
    status: String(
        max_length=20, choices=OrderStatus, default=OrderStatus.PENDING.value
    )
    items = HasMany("OrderItem")

    def confirm(self):
        self.status = OrderStatus.CONFIRMED.value
        self.raise_(OrderConfirmed(order_id=self.id, customer_name=self.customer_name))

    def ship(self):
        self.status = OrderStatus.SHIPPED.value
        self.raise_(OrderShipped(order_id=self.id, customer_name=self.customer_name))




@domain.entity(part_of=Order)
class OrderItem:
    book_title: String(max_length=200, required=True)
    quantity: Integer(required=True)
    unit_price = ValueObject(Money)


@domain.event(part_of=Order)
class OrderConfirmed:
    order_id: Identifier(required=True)
    customer_name: String(max_length=150, required=True)


@domain.event(part_of=Order)
class OrderShipped:
    order_id: Identifier(required=True)
    customer_name: String(max_length=150, required=True)




@domain.aggregate
class Inventory:
    book_id: Identifier(required=True)
    title: String(max_length=200, required=True)
    quantity: Integer(default=0)

    def adjust_stock(self, amount: int):
        self.quantity += amount




@domain.event_handler(part_of=Book)
class BookEventHandler:
    @handle(BookAdded)
    def on_book_added(self, event: BookAdded):
        """When a book is added to the catalog, create an inventory record."""
        inventory = Inventory(
            book_id=event.book_id,
            title=event.title,
            quantity=10,  # Start with 10 copies
        )
        current_domain.repository_for(Inventory).add(inventory)
        print(f"  [Inventory] Stocked 10 copies of '{event.title}'")




@domain.event_handler(part_of=Order)
class OrderEventHandler:
    @handle(OrderConfirmed)
    def on_order_confirmed(self, event: OrderConfirmed):
        print(
            f"  [Notification] Order {event.order_id} confirmed for {event.customer_name}"
        )

    @handle(OrderShipped)
    def on_order_shipped(self, event: OrderShipped):
        print(
            f"  [Notification] Order {event.order_id} shipped to {event.customer_name}"
        )




@domain.command(part_of=Book)
class AddBook:
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    isbn: String(max_length=13)
    price_amount: Float(required=True)
    description: Text()


@domain.command(part_of=Order)
class PlaceOrder:
    customer_name: String(max_length=150, required=True)
    book_title: String(max_length=200, required=True)
    quantity: Integer(required=True)
    unit_price_amount: Float(required=True)


@domain.command(part_of=Order)
class ConfirmOrder:
    order_id: Identifier(required=True)


@domain.command(part_of=Order)
class ShipOrder:
    order_id: Identifier(required=True)




@domain.command_handler(part_of=Book)
class BookCommandHandler:
    @handle(AddBook)
    def add_book(self, command: AddBook) -> Identifier:
        book = Book(
            title=command.title,
            author=command.author,
            isbn=command.isbn,
            price=Money(amount=command.price_amount),
            description=command.description,
        )
        book.add_to_catalog()
        current_domain.repository_for(Book).add(book)
        return book.id




@domain.command_handler(part_of=Order)
class OrderCommandHandler:
    @handle(PlaceOrder)
    def place_order(self, command: PlaceOrder) -> Identifier:
        order = Order(
            customer_name=command.customer_name,
            items=[
                OrderItem(
                    book_title=command.book_title,
                    quantity=command.quantity,
                    unit_price=Money(amount=command.unit_price_amount),
                ),
            ],
        )
        current_domain.repository_for(Order).add(order)
        return order.id

    @handle(ConfirmOrder)
    def confirm_order(self, command: ConfirmOrder) -> None:
        repo = current_domain.repository_for(Order)
        order = repo.get(command.order_id)
        order.confirm()
        repo.add(order)

    @handle(ShipOrder)
    def ship_order(self, command: ShipOrder) -> None:
        repo = current_domain.repository_for(Order)
        order = repo.get(command.order_id)
        order.ship()
        repo.add(order)




domain.init(traverse=False)


if __name__ == "__main__":
    with domain.domain_context():
        # Add a book — BookAdded event triggers inventory creation
        print("Adding book to catalog...")
        book_id = domain.process(
            AddBook(
                title="The Great Gatsby",
                author="F. Scott Fitzgerald",
                isbn="9780743273565",
                price_amount=12.99,
            )
        )

        # Verify inventory was created by the event handler
        inventories = current_domain.repository_for(Inventory).query.all()
        assert inventories.total == 1
        inv = inventories.items[0]
        print(f"  Inventory: {inv.title}, qty={inv.quantity}")

        # Place an order — through the command pipeline
        print("\nPlacing an order...")
        order_id = domain.process(
            PlaceOrder(
                customer_name="Alice Johnson",
                book_title="The Great Gatsby",
                quantity=2,
                unit_price_amount=12.99,
            )
        )

        # Confirm the order — triggers OrderConfirmed event
        print("Confirming order...")
        domain.process(ConfirmOrder(order_id=order_id))

        # Ship the order — triggers OrderShipped event
        print("Shipping order...")
        domain.process(ShipOrder(order_id=order_id))

        # Verify final state
        order = current_domain.repository_for(Order).get(order_id)
        assert order.status == "SHIPPED"
        assert len(order.items) == 1

        print("\nAll checks passed!")

Next

Chapter 7: Projections and Projectors →