Skip to content

Chapter 7: Domain Events — Things That Happened

Commands express intent — "add this book." Events record facts — "a book was added." In this chapter we define domain events and raise them from our aggregates.

What Are Domain Events?

A domain event is an immutable record of something significant that happened in the domain. Events are:

  • Named in past tense: BookAdded, OrderConfirmed, OrderShipped
  • Immutable: once created, they cannot be changed
  • Facts: they represent things that already happened

Events vs Commands:

Commands Events
"Add this book" (intent) "Book was added" (fact)
Imperative: AddBook Past tense: BookAdded
One handler processes it Many handlers can react
Can be rejected Already happened

Defining Events

Events are defined just like commands — fields describe the relevant data at the moment the event occurred:

class BookAdded:
    book_id = Identifier(required=True)
    title = String(max_length=200, required=True)
    author = String(max_length=150, required=True)
    price_amount = Float()
    price_currency = String(max_length=3, default="USD")

The part_of=Book associates this event with the Book aggregate cluster. An event's fields should capture enough data for any handler to process it without needing to query the aggregate.

Order Events

Let's also define events for our Order aggregate:

@domain.entity(part_of=Order)
class OrderItem:
    book_title = String(max_length=200, required=True)
    quantity = Integer(required=True)
    unit_price = ValueObject(Money)


@domain.event(part_of=Order)
class OrderPlaced:
    order_id = Identifier(required=True)
    customer_name = String(max_length=150, required=True)
    total_items = Integer(required=True)


@domain.event(part_of=Order)

Raising Events from Aggregates

Events are raised inside aggregate methods using self.raise_():

    title = String(max_length=200, required=True)
    author = String(max_length=150, required=True)
    isbn = String(max_length=13)
    price = ValueObject(Money)
    description = Text()

    def add_to_catalog(self):
        """Mark this book as added to the catalog and raise an event."""
        self.raise_(
            BookAdded(
                book_id=self.id,
                title=self.title,
                author=self.author,
                price_amount=self.price.amount if self.price else 0,
                price_currency=self.price.currency if self.price else "USD",
            )
        )

When you call book.add_to_catalog(), the event is not immediately dispatched. It is collected on the aggregate. The event is dispatched later, when the aggregate is persisted through the repository.

Similarly for orders:

@domain.aggregate
class Order:
    customer_name = String(max_length=150, required=True)
    status = String(
        max_length=20, choices=OrderStatus, default=OrderStatus.PENDING.value
    )
    items = HasMany("OrderItem")

    def confirm(self):
        self.status = OrderStatus.CONFIRMED.value
        self.raise_(
            OrderConfirmed(
                order_id=self.id,
                customer_name=self.customer_name,
            )
        )

    def ship(self):
        self.status = OrderStatus.SHIPPED.value
        self.raise_(

The confirm() and ship() methods each change state and raise an event recording what happened.

The Event Lifecycle

Events go through a clear lifecycle:

graph LR
    A[1. Raised] --> B[2. Collected]
    B --> C[3. Persisted]
    C --> D[4. Dispatched]

    style A fill:#e1f5fe
    style B fill:#fff3e0
    style C fill:#e8f5e9
    style D fill:#fce4ec
  1. Raised: self.raise_(BookAdded(...)) — the event is created
  2. Collected: The event is stored on the aggregate's internal _events list
  3. Persisted: When repo.add(book) is called, the aggregate and its events are saved
  4. Dispatched: Events are delivered to handlers (synchronously or asynchronously)

This means events are never lost — they are only dispatched after the aggregate has been successfully persisted.

Event Metadata

Every event carries metadata beyond its declared fields:

>>> event._metadata.id         # Unique event ID
>>> event._metadata.timestamp  # When the event was created
>>> event._metadata.type       # Fully qualified event type

Metadata is auto-generated and read-only. It is useful for logging, tracing, and ordering events.

Enriching Our Aggregates

Let's update the command handler to raise events when adding books:

@domain.command(part_of=Book)
class AddBook:
    title = String(max_length=200, required=True)
    author = String(max_length=150, required=True)
    isbn = String(max_length=13)
    price_amount = Float(required=True)
    price_currency = String(max_length=3, default="USD")
    description = Text()


@domain.command_handler(part_of=Book)
class BookCommandHandler:

The handler now calls book.add_to_catalog() before persisting. This raises the BookAdded event, which will be dispatched after the repository commit.

Putting It Together

                amount=command.price_amount,
                currency=command.price_currency,
            ),
            description=command.description,
        )
        book.add_to_catalog()  # Raises BookAdded event
        current_domain.repository_for(Book).add(book)
        return book.id




domain.init(traverse=False)


if __name__ == "__main__":
    with domain.domain_context():
        # Add a book — triggers BookAdded event
        book_id = domain.process(
            AddBook(
                title="The Great Gatsby",
                author="F. Scott Fitzgerald",
                isbn="9780743273565",
                price_amount=12.99,
                description="A story of the mysteriously wealthy Jay Gatsby.",
            )
        )
        print(f"Book added: {book_id}")

        # Create and confirm an order — triggers OrderConfirmed event
        repo = current_domain.repository_for(Order)
        order = Order(
            customer_name="Alice Johnson",
            items=[
                OrderItem(
                    book_title="The Great Gatsby",
                    quantity=1,
                    unit_price=Money(amount=12.99),
                ),
            ],

Run it:

$ python bookshelf.py
Book added: a3b2c1d0-...
Order confirmed: e5f6g7h8-...
Order shipped: e5f6g7h8-...

All checks passed!

Events are being raised and collected. But nothing is reacting to them yet. In the next chapter we will add event handlers that process these events to trigger side effects.

Full Source

from enum import Enum

from protean import Domain, handle
from protean.fields import (
    Float,
    HasMany,
    Identifier,
    Integer,
    String,
    Text,
    ValueObject,
)
from protean.utils.globals import current_domain

domain = Domain()

domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"


@domain.value_object
class Money:
    currency = String(max_length=3, default="USD")
    amount = Float(required=True)


@domain.aggregate
class Book:
    title = String(max_length=200, required=True)
    author = String(max_length=150, required=True)
    isbn = String(max_length=13)
    price = ValueObject(Money)
    description = Text()

    def add_to_catalog(self):
        """Mark this book as added to the catalog and raise an event."""
        self.raise_(
            BookAdded(
                book_id=self.id,
                title=self.title,
                author=self.author,
                price_amount=self.price.amount if self.price else 0,
                price_currency=self.price.currency if self.price else "USD",
            )
        )




@domain.event(part_of=Book)
class BookAdded:
    book_id = Identifier(required=True)
    title = String(max_length=200, required=True)
    author = String(max_length=150, required=True)
    price_amount = Float()
    price_currency = String(max_length=3, default="USD")




class OrderStatus(Enum):
    PENDING = "PENDING"
    CONFIRMED = "CONFIRMED"
    SHIPPED = "SHIPPED"


@domain.aggregate
class Order:
    customer_name = String(max_length=150, required=True)
    status = String(
        max_length=20, choices=OrderStatus, default=OrderStatus.PENDING.value
    )
    items = HasMany("OrderItem")

    def confirm(self):
        self.status = OrderStatus.CONFIRMED.value
        self.raise_(
            OrderConfirmed(
                order_id=self.id,
                customer_name=self.customer_name,
            )
        )

    def ship(self):
        self.status = OrderStatus.SHIPPED.value
        self.raise_(
            OrderShipped(
                order_id=self.id,
                customer_name=self.customer_name,
            )
        )




@domain.entity(part_of=Order)
class OrderItem:
    book_title = String(max_length=200, required=True)
    quantity = Integer(required=True)
    unit_price = ValueObject(Money)


@domain.event(part_of=Order)
class OrderPlaced:
    order_id = Identifier(required=True)
    customer_name = String(max_length=150, required=True)
    total_items = Integer(required=True)


@domain.event(part_of=Order)
class OrderConfirmed:
    order_id = Identifier(required=True)
    customer_name = String(max_length=150, required=True)


@domain.event(part_of=Order)
class OrderShipped:
    order_id = Identifier(required=True)
    customer_name = String(max_length=150, required=True)




@domain.command(part_of=Book)
class AddBook:
    title = String(max_length=200, required=True)
    author = String(max_length=150, required=True)
    isbn = String(max_length=13)
    price_amount = Float(required=True)
    price_currency = String(max_length=3, default="USD")
    description = Text()


@domain.command_handler(part_of=Book)
class BookCommandHandler:
    @handle(AddBook)
    def add_book(self, command: AddBook) -> Identifier:
        book = Book(
            title=command.title,
            author=command.author,
            isbn=command.isbn,
            price=Money(
                amount=command.price_amount,
                currency=command.price_currency,
            ),
            description=command.description,
        )
        book.add_to_catalog()  # Raises BookAdded event
        current_domain.repository_for(Book).add(book)
        return book.id




domain.init(traverse=False)


if __name__ == "__main__":
    with domain.domain_context():
        # Add a book — triggers BookAdded event
        book_id = domain.process(
            AddBook(
                title="The Great Gatsby",
                author="F. Scott Fitzgerald",
                isbn="9780743273565",
                price_amount=12.99,
                description="A story of the mysteriously wealthy Jay Gatsby.",
            )
        )
        print(f"Book added: {book_id}")

        # Create and confirm an order — triggers OrderConfirmed event
        repo = current_domain.repository_for(Order)
        order = Order(
            customer_name="Alice Johnson",
            items=[
                OrderItem(
                    book_title="The Great Gatsby",
                    quantity=1,
                    unit_price=Money(amount=12.99),
                ),
            ],
        )
        repo.add(order)

        # Confirm the order
        order.confirm()
        repo.add(order)
        print(f"Order confirmed: {order.id}")

        # Ship the order
        order.ship()
        repo.add(order)
        print(f"Order shipped: {order.id}")

        # Verify
        saved_order = repo.get(order.id)
        assert saved_order.status == OrderStatus.SHIPPED.value
        print("\nAll checks passed!")

Summary

In this chapter you learned:

  • Domain events are immutable records of things that happened, named in past tense.
  • @domain.event(part_of=...) defines an event and associates it with an aggregate.
  • self.raise_(...) raises an event inside an aggregate method.
  • Events are collected on the aggregate and dispatched when the aggregate is persisted.
  • Event metadata (id, timestamp, type) is auto-generated.

Events are raised but not yet consumed. In the next chapter we will build event handlers that react to events — for notifications, inventory tracking, and more.

Next

Chapter 8: Event Handlers →