Skip to content

Chapter 8: Event Handlers — Reacting to Change

In the previous chapter we raised events from our aggregates, but nothing reacted to them. In this chapter we add event handlers — decoupled components that process events to trigger side effects like sending notifications or updating other aggregates.

Why Event Handlers?

When an order is confirmed, several things should happen: send a confirmation email, reserve inventory, update analytics. But the Order aggregate should not know about any of these. Its job is to manage order state, not send emails.

Event handlers solve this by reacting to events without coupling:

graph LR
    O[Order.confirm] -->|raises| E[OrderConfirmed]
    E --> H1[Send Email]
    E --> H2[Reserve Inventory]
    E --> H3[Update Analytics]

    style E fill:#fce4ec

The aggregate raises the event. Handlers independently decide what to do with it.

Introducing the Inventory Aggregate

Before we build handlers, let's add a simple Inventory aggregate that tracks stock for each book:

class Inventory:
    book_id = Identifier(required=True)
    title = String(max_length=200, required=True)
    quantity = Integer(default=0)

    def adjust_stock(self, amount: int):
        self.quantity += amount

We will use event handlers to keep inventory in sync automatically.

Defining Event Handlers

An event handler is a class that processes one or more events:

@domain.event_handler(part_of=Book)
class BookEventHandler:
    @handle(BookAdded)
    def on_book_added(self, event: BookAdded):
        """When a book is added to the catalog, create an inventory record."""
        inventory = Inventory(
            book_id=event.book_id,
            title=event.title,
            quantity=10,  # Start with 10 copies

Key points:

  • @domain.event_handler(part_of=Book) registers the handler within the Book aggregate's context.
  • @handle(BookAdded) marks the method that processes BookAdded events.
  • Inside the handler, we can create or update other aggregates — here we create an Inventory record for the new book.

Multiple Handlers in One Class

A single handler class can process multiple events:

@domain.event_handler(part_of=Order)
class OrderEventHandler:
    @handle(OrderConfirmed)

Each @handle method processes a different event type. The handler class acts as a logical grouping of related reactions.

How Events Flow

When event processing is set to "sync", events are dispatched immediately after the aggregate is persisted:

sequenceDiagram
    participant App
    participant Repo as Repository
    participant EH as Event Handler

    App->>Repo: repo.add(book) [with BookAdded event]
    Repo->>Repo: Persist aggregate
    Repo->>EH: Dispatch BookAdded
    EH->>EH: Create Inventory record
    EH-->>Repo: Persist Inventory

The flow is:

  1. You call repo.add(book) — the book had a pending BookAdded event
  2. The repository persists the book
  3. The repository dispatches pending events to handlers
  4. The BookEventHandler.on_book_added creates an Inventory record

Sync vs Async Processing

We have been using synchronous event processing:

domain.config["event_processing"] = "sync"

In production, you will switch to asynchronous processing:

domain.config["event_processing"] = "async"

With async processing, events are written to a message store (event store or broker) and processed by the Protean server — a background process that subscribes to event streams and dispatches to handlers.

This gives you:

  • Scalability: handlers run in separate processes
  • Resilience: failed handlers can retry
  • Decoupling: event producers and consumers are independent

We will set this up in Chapter 13.

End-to-End Flow

Let's trace the complete flow from adding a book to seeing inventory created automatically:

            title=command.title,
            author=command.author,
            isbn=command.isbn,
            price=Money(amount=command.price_amount),
            description=command.description,
        )
        book.add_to_catalog()
        current_domain.repository_for(Book).add(book)
        return book.id


domain.init(traverse=False)


if __name__ == "__main__":
    with domain.domain_context():
        # Add a book — BookAdded event → inventory created
        print("Adding book to catalog...")
        book_id = domain.process(
            AddBook(
                title="The Great Gatsby",
                author="F. Scott Fitzgerald",
                isbn="9780743273565",
                price_amount=12.99,
            )
        )

        # Verify inventory was created by the event handler
        inventories = current_domain.repository_for(Inventory)._dao.query.all()
        assert inventories.total == 1
        inv = inventories.items[0]
        print(f"  Inventory: {inv.title}, qty={inv.quantity}")

        # Place and confirm an order
        print("\nPlacing an order...")
        order = Order(
            customer_name="Alice Johnson",
            items=[
                OrderItem(
                    book_title="The Great Gatsby",

Run it:

$ python bookshelf.py
Adding book to catalog...
  [Inventory] Stocked 10 copies of 'The Great Gatsby'
  Inventory: The Great Gatsby, qty=10

Placing an order...
Confirming order...
  [Notification] Order e5f6... confirmed for Alice Johnson
Shipping order...
  [Notification] Order e5f6... shipped to Alice Johnson

All checks passed!

Notice how the inventory was created automatically when the book was added, and notifications were triggered when the order was confirmed and shipped — all through event handlers.

Full Source

from enum import Enum

from protean import Domain, handle
from protean.fields import (
    Float,
    HasMany,
    Identifier,
    Integer,
    String,
    Text,
    ValueObject,
)
from protean.utils.globals import current_domain

domain = Domain()

domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"


@domain.value_object
class Money:
    currency = String(max_length=3, default="USD")
    amount = Float(required=True)


@domain.aggregate
class Book:
    title = String(max_length=200, required=True)
    author = String(max_length=150, required=True)
    isbn = String(max_length=13)
    price = ValueObject(Money)
    description = Text()

    def add_to_catalog(self):
        self.raise_(
            BookAdded(
                book_id=self.id,
                title=self.title,
                author=self.author,
                price_amount=self.price.amount if self.price else 0,
            )
        )


@domain.event(part_of=Book)
class BookAdded:
    book_id = Identifier(required=True)
    title = String(max_length=200, required=True)
    author = String(max_length=150, required=True)
    price_amount = Float()


class OrderStatus(Enum):
    PENDING = "PENDING"
    CONFIRMED = "CONFIRMED"
    SHIPPED = "SHIPPED"


@domain.aggregate
class Order:
    customer_name = String(max_length=150, required=True)
    status = String(
        max_length=20, choices=OrderStatus, default=OrderStatus.PENDING.value
    )
    items = HasMany("OrderItem")

    def confirm(self):
        self.status = OrderStatus.CONFIRMED.value
        self.raise_(OrderConfirmed(order_id=self.id, customer_name=self.customer_name))

    def ship(self):
        self.status = OrderStatus.SHIPPED.value
        self.raise_(OrderShipped(order_id=self.id, customer_name=self.customer_name))


@domain.entity(part_of=Order)
class OrderItem:
    book_title = String(max_length=200, required=True)
    quantity = Integer(required=True)
    unit_price = ValueObject(Money)


@domain.event(part_of=Order)
class OrderPlaced:
    order_id = Identifier(required=True)
    customer_name = String(max_length=150, required=True)
    total_items = Integer(required=True)


@domain.event(part_of=Order)
class OrderConfirmed:
    order_id = Identifier(required=True)
    customer_name = String(max_length=150, required=True)


@domain.event(part_of=Order)
class OrderShipped:
    order_id = Identifier(required=True)
    customer_name = String(max_length=150, required=True)


@domain.aggregate
class Inventory:
    book_id = Identifier(required=True)
    title = String(max_length=200, required=True)
    quantity = Integer(default=0)

    def adjust_stock(self, amount: int):
        self.quantity += amount




@domain.event_handler(part_of=Book)
class BookEventHandler:
    @handle(BookAdded)
    def on_book_added(self, event: BookAdded):
        """When a book is added to the catalog, create an inventory record."""
        inventory = Inventory(
            book_id=event.book_id,
            title=event.title,
            quantity=10,  # Start with 10 copies
        )
        current_domain.repository_for(Inventory).add(inventory)
        print(f"  [Inventory] Stocked 10 copies of '{event.title}'")




@domain.event_handler(part_of=Order)
class OrderEventHandler:
    @handle(OrderConfirmed)
    def on_order_confirmed(self, event: OrderConfirmed):
        print(
            f"  [Notification] Order {event.order_id} confirmed for {event.customer_name}"
        )

    @handle(OrderShipped)
    def on_order_shipped(self, event: OrderShipped):
        print(
            f"  [Notification] Order {event.order_id} shipped to {event.customer_name}"
        )




@domain.command(part_of=Book)
class AddBook:
    title = String(max_length=200, required=True)
    author = String(max_length=150, required=True)
    isbn = String(max_length=13)
    price_amount = Float(required=True)
    description = Text()


@domain.command_handler(part_of=Book)
class BookCommandHandler:
    @handle(AddBook)
    def add_book(self, command: AddBook) -> Identifier:
        book = Book(
            title=command.title,
            author=command.author,
            isbn=command.isbn,
            price=Money(amount=command.price_amount),
            description=command.description,
        )
        book.add_to_catalog()
        current_domain.repository_for(Book).add(book)
        return book.id


domain.init(traverse=False)


if __name__ == "__main__":
    with domain.domain_context():
        # Add a book — BookAdded event → inventory created
        print("Adding book to catalog...")
        book_id = domain.process(
            AddBook(
                title="The Great Gatsby",
                author="F. Scott Fitzgerald",
                isbn="9780743273565",
                price_amount=12.99,
            )
        )

        # Verify inventory was created by the event handler
        inventories = current_domain.repository_for(Inventory)._dao.query.all()
        assert inventories.total == 1
        inv = inventories.items[0]
        print(f"  Inventory: {inv.title}, qty={inv.quantity}")

        # Place and confirm an order
        print("\nPlacing an order...")
        order = Order(
            customer_name="Alice Johnson",
            items=[
                OrderItem(
                    book_title="The Great Gatsby",
                    quantity=2,
                    unit_price=Money(amount=12.99),
                ),
            ],
        )
        current_domain.repository_for(Order).add(order)

        print("Confirming order...")
        order.confirm()
        current_domain.repository_for(Order).add(order)

        print("Shipping order...")
        order.ship()
        current_domain.repository_for(Order).add(order)

        print("\nAll checks passed!")

Summary

In this chapter you learned:

  • Event handlers react to domain events, decoupling side effects from the aggregate that raised the event.
  • @domain.event_handler(part_of=...) registers a handler, and @handle(EventClass) marks which events it processes.
  • A single handler class can process multiple events with separate @handle methods.
  • Handlers can create or modify other aggregates — enabling cross-aggregate coordination through events.
  • Sync processing dispatches immediately; async processing runs handlers in a background server.

We have covered all the fundamental building blocks: aggregates, entities, value objects, commands, events, and handlers. In the next part we will add services for complex use case coordination and projections for read-optimized views.

Next

Chapter 9: Application Services →