Event Handlers
Event handlers consume events raised in an aggregate and help sync the state of the aggregate with other aggregates and other systems. They are the preferred mechanism to update multiple aggregates.
Defining an Event Handler
Event Handlers are defined with the Domain.event_handler
decorator. Below is
a simplified example of an Event Handler connected to Inventory
aggregate
syncing stock levels corresponding to changes in the Order
aggregate.
from protean import Domain, handle
from protean.fields import Identifier, Integer, String
domain = Domain(__file__, load_toml=False)
domain.config["event_processing"] = "sync"
@domain.event(part_of="Order")
class OrderShipped:
order_id = Identifier(required=True)
book_id = Identifier(required=True)
quantity = Integer(required=True)
total_amount = Integer(required=True)
@domain.aggregate
class Order:
book_id = Identifier(required=True)
quantity = Integer(required=True)
total_amount = Integer(required=True)
status = String(choices=["PENDING", "SHIPPED", "DELIVERED"], default="PENDING")
def ship_order(self):
self.status = "SHIPPED"
self.raise_( # (1)
OrderShipped(
order_id=self.id,
book_id=self.book_id,
quantity=self.quantity,
total_amount=self.total_amount,
)
)
@domain.aggregate
class Inventory:
book_id = Identifier(required=True)
in_stock = Integer(required=True)
@domain.event_handler(part_of=Inventory, stream_category="order")
class ManageInventory:
@handle(OrderShipped)
def reduce_stock_level(self, event: OrderShipped):
repo = domain.repository_for(Inventory)
inventory = repo._dao.find_by(book_id=event.book_id)
inventory.in_stock -= event.quantity # (2)
repo.add(inventory)
domain.init()
with domain.domain_context():
# Persist Order
order = Order(book_id=1, quantity=10, total_amount=100)
domain.repository_for(Order).add(order)
# Persist Inventory
inventory = Inventory(book_id=1, in_stock=100)
domain.repository_for(Inventory).add(inventory)
# Ship Order
order.ship_order()
domain.repository_for(Order).add(order)
# Verify that Inventory Level has been reduced
stock = domain.repository_for(Inventory).get(inventory.id)
print(stock.to_dict())
assert stock.in_stock == 90
-
Order
aggregate firesOrderShipped
event on book being shipped. -
Event handler picks up the event and updates stock levels in
Inventory
aggregate.
Simulating a hypothetical example, we can see that the stock levels were
decreased in response to the OrderShipped
event.
In [1]: order = Order(book_id=1, quantity=10, total_amount=100)
In [2]: domain.repository_for(Order).add(order)
Out[2]: <Order: Order object (id: 62f8fa8d-2963-4539-bd21-860d3bab639e)>
In [3]: inventory = Inventory(book_id=1, in_stock=100)
In [4]: domain.repository_for(Inventory).add(inventory)
Out[4]: <Inventory: Inventory object (id: 9272d70f-b796-417d-8f30-e01302d9f1a9)>
In [5]: order.ship_order()
In [6]: domain.repository_for(Order).add(order)
Out[6]: <Order: Order object (id: 62f8fa8d-2963-4539-bd21-860d3bab639e)>
In [7]: stock = domain.repository_for(Inventory).get(inventory.id)
In [8]: stock.to_dict()
Out[8]: {
'book_id': '1',
'in_stock': 90,
'id': '9272d70f-b796-417d-8f30-e01302d9f1a9'
}
Configuration Options
part_of
: The aggregate to which the event handler is connected.stream_category
: The event handler listens to events on this stream category. The stream category defaults to the category of the aggregate associated with the handler.
An Event Handler can be part of an aggregate, and have the stream category of
a different aggregate. This is the mechanism for an aggregate to listen to
another aggregate's events to sync its own state.
- source_stream
: When specified, the event handler only consumes events
generated in response to events or commands from this original stream.
For example, EmailNotifications
event handler listening to OrderShipped
events can be configured to generate a NotificationSent
event only when the
OrderShipped
event (in stream orders
) is generated in response to a
ShipOrder
(in stream manage_order
) command.