Skip to content

Event Handlers

Event handlers consume events raised in an aggregate and help sync the state of the aggregate with other aggregates and other systems. They are the preferred mechanism to update multiple aggregates.

Defining an Event Handler

Event Handlers are defined with the Domain.event_handler decorator. Below is a simplified example of an Event Handler connected to Inventory aggregate syncing stock levels corresponding to changes in the Order aggregate.

from protean import Domain, handle
from protean.fields import Identifier, Integer, String

domain = Domain(__file__, load_toml=False)
domain.config["event_processing"] = "sync"


@domain.event(part_of="Order")
class OrderShipped:
    order_id = Identifier(required=True)
    book_id = Identifier(required=True)
    quantity = Integer(required=True)
    total_amount = Integer(required=True)


@domain.aggregate
class Order:
    book_id = Identifier(required=True)
    quantity = Integer(required=True)
    total_amount = Integer(required=True)
    status = String(choices=["PENDING", "SHIPPED", "DELIVERED"], default="PENDING")

    def ship_order(self):
        self.status = "SHIPPED"

        self.raise_(  # (1)
            OrderShipped(
                order_id=self.id,
                book_id=self.book_id,
                quantity=self.quantity,
                total_amount=self.total_amount,
            )
        )


@domain.aggregate
class Inventory:
    book_id = Identifier(required=True)
    in_stock = Integer(required=True)


@domain.event_handler(part_of=Inventory, stream_category="order")
class ManageInventory:
    @handle(OrderShipped)
    def reduce_stock_level(self, event: OrderShipped):
        repo = domain.repository_for(Inventory)
        inventory = repo._dao.find_by(book_id=event.book_id)

        inventory.in_stock -= event.quantity  # (2)

        repo.add(inventory)


domain.init()
with domain.domain_context():
    # Persist Order
    order = Order(book_id=1, quantity=10, total_amount=100)
    domain.repository_for(Order).add(order)

    # Persist Inventory
    inventory = Inventory(book_id=1, in_stock=100)
    domain.repository_for(Inventory).add(inventory)

    # Ship Order
    order.ship_order()
    domain.repository_for(Order).add(order)

    # Verify that Inventory Level has been reduced
    stock = domain.repository_for(Inventory).get(inventory.id)
    print(stock.to_dict())
    assert stock.in_stock == 90
  1. Order aggregate fires OrderShipped event on book being shipped.

  2. Event handler picks up the event and updates stock levels in Inventory aggregate.

Simulating a hypothetical example, we can see that the stock levels were decreased in response to the OrderShipped event.

In [1]: order = Order(book_id=1, quantity=10, total_amount=100)

In [2]: domain.repository_for(Order).add(order)
Out[2]: <Order: Order object (id: 62f8fa8d-2963-4539-bd21-860d3bab639e)>

In [3]: inventory = Inventory(book_id=1, in_stock=100)

In [4]: domain.repository_for(Inventory).add(inventory)
Out[4]: <Inventory: Inventory object (id: 9272d70f-b796-417d-8f30-e01302d9f1a9)>

In [5]: order.ship_order()

In [6]: domain.repository_for(Order).add(order)
Out[6]: <Order: Order object (id: 62f8fa8d-2963-4539-bd21-860d3bab639e)>

In [7]: stock = domain.repository_for(Inventory).get(inventory.id)

In [8]: stock.to_dict()
Out[8]: {
 'book_id': '1',
 'in_stock': 90,
 'id': '9272d70f-b796-417d-8f30-e01302d9f1a9'
 }

Configuration Options

  • part_of: The aggregate to which the event handler is connected.
  • stream_category: The event handler listens to events on this stream category. The stream category defaults to the category of the aggregate associated with the handler.

An Event Handler can be part of an aggregate, and have the stream category of a different aggregate. This is the mechanism for an aggregate to listen to another aggregate's events to sync its own state. - source_stream: When specified, the event handler only consumes events generated in response to events or commands from this original stream. For example, EmailNotifications event handler listening to OrderShipped events can be configured to generate a NotificationSent event only when the OrderShipped event (in stream orders) is generated in response to a ShipOrder (in stream manage_order) command.