Skip to content

Event Handlers

Event handlers consume events raised in an aggregate and help sync the state of the aggregate with other aggregates and other systems. They are the preferred mechanism to update multiple aggregates.

Defining an Event Handler

Event Handlers are defined with the Domain.event_handler decorator. Below is a simplified example of an Event Handler connected to Inventory aggregate syncing stock levels corresponding to changes in the Order aggregate.

from protean import Domain, handle
from protean.fields import Identifier, Integer, String

domain = Domain()
domain.config["event_processing"] = "sync"


@domain.event(part_of="Order")
class OrderShipped:
    order_id = Identifier(required=True)
    book_id = Identifier(required=True)
    quantity = Integer(required=True)
    total_amount = Integer(required=True)


@domain.aggregate
class Order:
    book_id = Identifier(required=True)
    quantity = Integer(required=True)
    total_amount = Integer(required=True)
    status = String(choices=["PENDING", "SHIPPED", "DELIVERED"], default="PENDING")

    def ship_order(self):
        self.status = "SHIPPED"

        self.raise_(  # (1)
            OrderShipped(
                order_id=self.id,
                book_id=self.book_id,
                quantity=self.quantity,
                total_amount=self.total_amount,
            )
        )


@domain.aggregate
class Inventory:
    book_id = Identifier(required=True)
    in_stock = Integer(required=True)


@domain.event_handler(part_of=Inventory, stream_category="order")
class ManageInventory:
    @handle(OrderShipped)
    def reduce_stock_level(self, event: OrderShipped):
        repo = domain.repository_for(Inventory)
        inventory = repo._dao.find_by(book_id=event.book_id)

        inventory.in_stock -= event.quantity  # (2)

        repo.add(inventory)


domain.init()
with domain.domain_context():
    # Persist Order
    order = Order(book_id=1, quantity=10, total_amount=100)
    domain.repository_for(Order).add(order)

    # Persist Inventory
    inventory = Inventory(book_id=1, in_stock=100)
    domain.repository_for(Inventory).add(inventory)

    # Ship Order
    order.ship_order()
    domain.repository_for(Order).add(order)

    # Verify that Inventory Level has been reduced
    stock = domain.repository_for(Inventory).get(inventory.id)
    print(stock.to_dict())
    assert stock.in_stock == 90
  1. Order aggregate fires OrderShipped event on book being shipped.

  2. Event handler picks up the event and updates stock levels in Inventory aggregate.

Simulating a hypothetical example, we can see that the stock levels were decreased in response to the OrderShipped event.

In [1]: order = Order(book_id=1, quantity=10, total_amount=100)

In [2]: domain.repository_for(Order).add(order)
Out[2]: <Order: Order object (id: 62f8fa8d-2963-4539-bd21-860d3bab639e)>

In [3]: inventory = Inventory(book_id=1, in_stock=100)

In [4]: domain.repository_for(Inventory).add(inventory)
Out[4]: <Inventory: Inventory object (id: 9272d70f-b796-417d-8f30-e01302d9f1a9)>

In [5]: order.ship_order()

In [6]: domain.repository_for(Order).add(order)
Out[6]: <Order: Order object (id: 62f8fa8d-2963-4539-bd21-860d3bab639e)>

In [7]: stock = domain.repository_for(Inventory).get(inventory.id)

In [8]: stock.to_dict()
Out[8]: {
 'book_id': '1',
 'in_stock': 90,
 'id': '9272d70f-b796-417d-8f30-e01302d9f1a9'
 }

Event Handler Workflow

Event handlers follow an asynchronous, fire-and-forget pattern. When an event is published, event handlers process it without returning any values to the caller.

sequenceDiagram
  autonumber
  Aggregate->>Domain: Publish Event
  Domain->>Event Store: Store Event
  Event Store-->>Domain: 
  Domain-->>Aggregate: 

  Note over Domain,Event Handler: Asynchronous Processing

  Event Store->>Event Handler: Deliver Event
  Event Handler->>Event Handler: Process Event
  Event Handler->>Repository: Load/Update Aggregates
  Repository-->>Event Handler: 
  Event Handler->>Event Handler: Perform Side Effects
  Event Handler->>Repository: Persist Aggregates
  1. Aggregate Publishes Event: An action in an aggregate triggers an event to be published.
  2. Domain Stores Event: The domain stores the event in the event store.
  3. Event Store Confirms Storage: The event store confirms the event has been stored.
  4. Domain Returns to Aggregate: The domain returns control to the aggregate.
  5. Event Store Delivers Event: Asynchronously, the event store delivers the event to all subscribed event handlers.
  6. Event Handler Processes Event: The event handler receives and processes the event.
  7. Event Handler Loads/Updates Aggregates: If needed, the event handler loads and updates relevant aggregates.
  8. Repository Returns Data: The repository returns requested data to the event handler.
  9. Event Handler Performs Side Effects: The event handler may perform additional side effects (sending emails, updating other systems, etc.).
  10. Event Handler Persists Data and Optionally Raises Events: The event handler persists the mutated aggregate, which can also raise events.

Return Values from Event Handlers

Event handlers in Protean follow the standard CQRS pattern where event handlers do not return values to the caller. This deliberate design choice ensures:

  1. Decoupling: The publisher of events remains completely decoupled from the consumers.
  2. Asynchronous Processing: Events can be processed in the background without blocking.
  3. Multiple Consumers: Multiple event handlers can process the same event independently.

If an event handler needs to communicate information as part of its processing, it should:

  • Emit new events
  • Update relevant aggregates that can be queried later
  • Log information for monitoring purposes

Configuration Options

  • part_of: The aggregate to which the event handler is connected.
  • stream_category: The event handler listens to events on this stream category. The stream category defaults to the category of the aggregate associated with the handler.

An Event Handler can be part of an aggregate, and have the stream category of a different aggregate. This is the mechanism for an aggregate to listen to another aggregate's events to sync its own state. - source_stream: When specified, the event handler only consumes events generated in response to events or commands from this original stream. For example, EmailNotifications event handler listening to OrderShipped events can be configured to generate a NotificationSent event only when the OrderShipped event (in stream orders) is generated in response to a ShipOrder (in stream manage_order) command.

Error Handling

Protean provides a robust error handling mechanism for event handlers through the optional handle_error method. This method allows event handlers to gracefully recover from exceptions without disrupting the overall event processing pipeline.

The handle_error Method

You can add a handle_error class method to your event handler to implement custom error handling:

@domain.event_handler(part_of=Inventory)
class InventoryEventHandler:
    @handle(OrderShipped)
    def update_inventory(self, event):
        # Event handling logic that might raise exceptions
        ...

    @classmethod
    def handle_error(cls, exc: Exception, message):
        """Custom error handling for event processing failures"""
        # Log the error
        logger.error(f"Failed to process event {message.type}: {exc}")

        # Perform recovery operations
        # Example: store failed events for retry, trigger compensating actions, etc.
        ...

How It Works

  1. When an exception occurs during event processing, the Protean Engine catches it.
  2. The engine logs detailed error information including stack traces.
  3. The engine calls the event handler's handle_error method, passing:
  4. The original exception that was raised
  5. The event message being processed when the exception occurred
  6. After handle_error completes, processing continues with subsequent events.

Error Handler Failures

If an exception occurs within the handle_error method itself, the Protean Engine will catch and log that exception as well, ensuring that the event processing pipeline continues to function. This provides an additional layer of resilience:

@classmethod
def handle_error(cls, exc: Exception, message):
    try:
        # Error handling logic that might itself fail
        ...
    except Exception as error_exc:
        # The engine will catch and log this secondary exception
        logger.error(f"Error handler failed: {error_exc}")
        # Processing continues regardless