Skip to content

Chapter 6: Events and Reactions

In this chapter we will define domain events, raise them from aggregates, and build event handlers that react automatically — creating Inventory records whenever a book is added to the catalog.

Defining Events

When a book is added to the catalog, we want to record that fact as a domain event. Events are named in past tense — they describe something that already happened:

@domain.event(part_of=Book)
class BookAdded:
    book_id: Identifier(required=True)
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    price_amount: Float()

Raising Events from Aggregates

Events are raised inside aggregate methods using self.raise_():

@domain.aggregate
class Book:
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    isbn: String(max_length=13)
    price = ValueObject(Money)
    description: Text()

    def add_to_catalog(self):
        """Raise a BookAdded event to notify the rest of the system."""
        self.raise_(
            BookAdded(
                book_id=self.id,
                title=self.title,
                author=self.author,
                price_amount=self.price.amount if self.price else 0,
            )
        )

When add_to_catalog() is called, the BookAdded event is collected on the aggregate. It will be dispatched when the aggregate is persisted — this ensures events are never lost due to transaction failures.

Let's also raise events from the Order aggregate:

@domain.aggregate
class Order:
    customer_name: String(max_length=150, required=True)
    status: String(
        max_length=20, choices=OrderStatus, default=OrderStatus.PENDING.value
    )
    items = HasMany("OrderItem")

    def confirm(self):
        self.status = OrderStatus.CONFIRMED.value
        self.raise_(OrderConfirmed(order_id=self.id, customer_name=self.customer_name))

    def ship(self):
        self.status = OrderStatus.SHIPPED.value
        self.raise_(OrderShipped(order_id=self.id, customer_name=self.customer_name))

And define the Order events:

@domain.event(part_of=Order)
class OrderConfirmed:
    order_id: Identifier(required=True)
    customer_name: String(max_length=150, required=True)


@domain.event(part_of=Order)
class OrderShipped:
    order_id: Identifier(required=True)
    customer_name: String(max_length=150, required=True)

The Inventory Aggregate

Before we build the event handler, we need something for it to manage. Let's create an Inventory aggregate:

@domain.aggregate
class Inventory:
    book_id: Identifier(required=True)
    title: String(max_length=200, required=True)
    quantity: Integer(default=0)

    def adjust_stock(self, amount: int):
        self.quantity += amount

Reacting with Event Handlers

An event handler listens for specific events and performs side effects. Let's create one that stocks inventory when a book is added:

@domain.event_handler(part_of=Book)
class BookEventHandler:
    @handle(BookAdded)
    def on_book_added(self, event: BookAdded):
        """When a book is added to the catalog, create an inventory record."""
        inventory = Inventory(
            book_id=event.book_id,
            title=event.title,
            quantity=10,  # Start with 10 copies
        )
        current_domain.repository_for(Inventory).add(inventory)
        print(f"  [Inventory] Stocked 10 copies of '{event.title}'")

Notice that the handler is registered with part_of=Book — it listens to events from the Book aggregate. When a BookAdded event is raised, the on_book_added method runs automatically and creates an inventory record.

We can also handle Order events to send notifications:

@domain.event_handler(part_of=Order)
class OrderEventHandler:
    @handle(OrderConfirmed)
    def on_order_confirmed(self, event: OrderConfirmed):
        print(
            f"  [Notification] Order {event.order_id} confirmed for {event.customer_name}"
        )

    @handle(OrderShipped)
    def on_order_shipped(self, event: OrderShipped):
        print(
            f"  [Notification] Order {event.order_id} shipped to {event.customer_name}"

A single event handler class can handle multiple event types.

End-to-End Flow

Let's see the complete flow — adding a book triggers inventory creation, and order lifecycle events trigger notifications:

            isbn=command.isbn,
            price=Money(amount=command.price_amount),
            description=command.description,
        )
        book.add_to_catalog()
        current_domain.repository_for(Book).add(book)
        return book.id


domain.init(traverse=False)


if __name__ == "__main__":
    with domain.domain_context():
        # Add a book — BookAdded event triggers inventory creation
        print("Adding book to catalog...")
        book_id = domain.process(
            AddBook(
                title="The Great Gatsby",
                author="F. Scott Fitzgerald",
                isbn="9780743273565",
                price_amount=12.99,
            )
        )

        # Verify inventory was created by the event handler
        inventories = current_domain.repository_for(Inventory)._dao.query.all()
        assert inventories.total == 1
        inv = inventories.items[0]
        print(f"  Inventory: {inv.title}, qty={inv.quantity}")

        # Place and confirm an order
        print("\nPlacing an order...")
        order = Order(
            customer_name="Alice Johnson",
            items=[
                OrderItem(
                    book_title="The Great Gatsby",
                    quantity=2,
                    unit_price=Money(amount=12.99),
                ),
            ],
        )
        current_domain.repository_for(Order).add(order)

        print("Confirming order...")
        order.confirm()
        current_domain.repository_for(Order).add(order)

        print("Shipping order...")
        order.ship()
        current_domain.repository_for(Order).add(order)

        print("\nAll checks passed!")

Run it:

$ python bookshelf.py
Adding book to catalog...
  [Inventory] Stocked 10 copies of 'The Great Gatsby'
  Inventory: The Great Gatsby, qty=10

Placing an order...
Confirming order...
  [Notification] Order e5f6g7h8-... confirmed for Alice Johnson
Shipping order...
  [Notification] Order e5f6g7h8-... shipped to Alice Johnson

All checks passed!

Notice that we never called the event handlers ourselves — they ran automatically when events were raised and the aggregate was persisted. The Book aggregate knows nothing about Inventory; the event handler provides the decoupling.

What We Built

  • BookAdded, OrderConfirmed, OrderShipped events — immutable facts recorded when state changes.
  • self.raise_() — raises events from aggregate methods.
  • BookEventHandler — reacts to BookAdded by creating inventory.
  • OrderEventHandler — reacts to order lifecycle events.
  • A fully decoupled flow where adding a book automatically stocks inventory.

In the next chapter, we will build a read-optimized projection for browsing the book catalog.

Full Source

from enum import Enum

from protean import Domain, handle
from protean.fields import (
    Float,
    HasMany,
    Identifier,
    Integer,
    String,
    Text,
    ValueObject,
)
from protean.utils.globals import current_domain

domain = Domain()

domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"


@domain.value_object
class Money:
    currency: String(max_length=3, default="USD")
    amount: Float(required=True)


@domain.aggregate
class Book:
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    isbn: String(max_length=13)
    price = ValueObject(Money)
    description: Text()

    def add_to_catalog(self):
        """Raise a BookAdded event to notify the rest of the system."""
        self.raise_(
            BookAdded(
                book_id=self.id,
                title=self.title,
                author=self.author,
                price_amount=self.price.amount if self.price else 0,
            )
        )




@domain.event(part_of=Book)
class BookAdded:
    book_id: Identifier(required=True)
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    price_amount: Float()




class OrderStatus(Enum):
    PENDING = "PENDING"
    CONFIRMED = "CONFIRMED"
    SHIPPED = "SHIPPED"


@domain.aggregate
class Order:
    customer_name: String(max_length=150, required=True)
    status: String(
        max_length=20, choices=OrderStatus, default=OrderStatus.PENDING.value
    )
    items = HasMany("OrderItem")

    def confirm(self):
        self.status = OrderStatus.CONFIRMED.value
        self.raise_(OrderConfirmed(order_id=self.id, customer_name=self.customer_name))

    def ship(self):
        self.status = OrderStatus.SHIPPED.value
        self.raise_(OrderShipped(order_id=self.id, customer_name=self.customer_name))




@domain.entity(part_of=Order)
class OrderItem:
    book_title: String(max_length=200, required=True)
    quantity: Integer(required=True)
    unit_price = ValueObject(Money)


@domain.event(part_of=Order)
class OrderConfirmed:
    order_id: Identifier(required=True)
    customer_name: String(max_length=150, required=True)


@domain.event(part_of=Order)
class OrderShipped:
    order_id: Identifier(required=True)
    customer_name: String(max_length=150, required=True)




@domain.aggregate
class Inventory:
    book_id: Identifier(required=True)
    title: String(max_length=200, required=True)
    quantity: Integer(default=0)

    def adjust_stock(self, amount: int):
        self.quantity += amount




@domain.event_handler(part_of=Book)
class BookEventHandler:
    @handle(BookAdded)
    def on_book_added(self, event: BookAdded):
        """When a book is added to the catalog, create an inventory record."""
        inventory = Inventory(
            book_id=event.book_id,
            title=event.title,
            quantity=10,  # Start with 10 copies
        )
        current_domain.repository_for(Inventory).add(inventory)
        print(f"  [Inventory] Stocked 10 copies of '{event.title}'")




@domain.event_handler(part_of=Order)
class OrderEventHandler:
    @handle(OrderConfirmed)
    def on_order_confirmed(self, event: OrderConfirmed):
        print(
            f"  [Notification] Order {event.order_id} confirmed for {event.customer_name}"
        )

    @handle(OrderShipped)
    def on_order_shipped(self, event: OrderShipped):
        print(
            f"  [Notification] Order {event.order_id} shipped to {event.customer_name}"
        )




@domain.command(part_of=Book)
class AddBook:
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    isbn: String(max_length=13)
    price_amount: Float(required=True)
    description: Text()


@domain.command_handler(part_of=Book)
class BookCommandHandler:
    @handle(AddBook)
    def add_book(self, command: AddBook) -> Identifier:
        book = Book(
            title=command.title,
            author=command.author,
            isbn=command.isbn,
            price=Money(amount=command.price_amount),
            description=command.description,
        )
        book.add_to_catalog()
        current_domain.repository_for(Book).add(book)
        return book.id


domain.init(traverse=False)


if __name__ == "__main__":
    with domain.domain_context():
        # Add a book — BookAdded event triggers inventory creation
        print("Adding book to catalog...")
        book_id = domain.process(
            AddBook(
                title="The Great Gatsby",
                author="F. Scott Fitzgerald",
                isbn="9780743273565",
                price_amount=12.99,
            )
        )

        # Verify inventory was created by the event handler
        inventories = current_domain.repository_for(Inventory)._dao.query.all()
        assert inventories.total == 1
        inv = inventories.items[0]
        print(f"  Inventory: {inv.title}, qty={inv.quantity}")

        # Place and confirm an order
        print("\nPlacing an order...")
        order = Order(
            customer_name="Alice Johnson",
            items=[
                OrderItem(
                    book_title="The Great Gatsby",
                    quantity=2,
                    unit_price=Money(amount=12.99),
                ),
            ],
        )
        current_domain.repository_for(Order).add(order)

        print("Confirming order...")
        order.confirm()
        current_domain.repository_for(Order).add(order)

        print("Shipping order...")
        order.ship()
        current_domain.repository_for(Order).add(order)

        print("\nAll checks passed!")

Next

Chapter 7: Projections →