Skip to content

Event Handlers

DDD CQRS ES

When an aggregate changes state, other parts of the system often need to react — updating a different aggregate, sending a notification, or triggering a downstream process. Putting that logic inside the originating aggregate would violate its boundary. Event handlers solve this: they listen for domain events and execute side effects in their own transaction, keeping aggregates decoupled.

Event handlers consume events raised in an aggregate and help sync the state of the aggregate with other aggregates and other systems. They are the preferred mechanism to update multiple aggregates.

Choosing the Right Consumer

Protean provides four elements that consume events or messages. Each serves a distinct purpose:

Element Consumes Source Association Dispatch Use case
Event Handler Domain events Internal event store part_of (aggregate) @handle(EventClass) Side effects, cross-aggregate sync
Projector Domain events Internal event store projector_for (projection) @on(EventClass) Maintain read models (projections)
Subscriber Raw dict payloads External message broker stream (broker stream) __call__(payload) Anti-corruption layer for external systems
Query Handler Queries Synchronous dispatch part_of (projection) @read(QueryClass) Structured read-side access

Use event handlers when you need to react to internal domain events with side effects (syncing aggregates, sending notifications, triggering processes). Use projectors when the reaction is specifically maintaining a read model. Use subscribers when messages come from outside your bounded context. Use query handlers when you need structured, validated read access to projections.

Defining an Event Handler

Event Handlers are defined with the Domain.event_handler decorator. Below is a simplified example of an Event Handler that syncs stock levels in Inventory in response to changes in the Order aggregate.

from protean import Domain, handle
from protean.fields import Identifier, Integer, String

domain = Domain()
domain.config["event_processing"] = "sync"


@domain.event(part_of="Order")
class OrderShipped:
    order_id: Identifier(required=True)
    book_id: Identifier(required=True)
    quantity: Integer(required=True)
    total_amount: Integer(required=True)


@domain.aggregate
class Order:
    book_id: Identifier(required=True)
    quantity: Integer(required=True)
    total_amount: Integer(required=True)
    status: String(choices=["PENDING", "SHIPPED", "DELIVERED"], default="PENDING")

    def ship_order(self):
        self.status = "SHIPPED"

        self.raise_(  # (1)
            OrderShipped(
                order_id=self.id,
                book_id=self.book_id,
                quantity=self.quantity,
                total_amount=self.total_amount,
            )
        )


@domain.aggregate
class Inventory:
    book_id: Identifier(required=True)
    in_stock: Integer(required=True)

    def reduce_stock(self, quantity: int) -> None:
        self.in_stock -= quantity


@domain.event_handler(part_of=Order)  # (2)
class ManageInventory:
    @handle(OrderShipped)
    def reduce_stock_level(self, event: OrderShipped):
        repo = domain.repository_for(Inventory)
        inventory = repo.find_by(book_id=event.book_id)

        inventory.reduce_stock(event.quantity)  # (3)

        repo.add(inventory)


domain.init()
with domain.domain_context():
    # Persist Order
    order = Order(book_id="book-1", quantity=10, total_amount=100)
    domain.repository_for(Order).add(order)

    # Persist Inventory
    inventory = Inventory(book_id="book-1", in_stock=100)
    domain.repository_for(Inventory).add(inventory)

    # Ship Order
    order.ship_order()
    domain.repository_for(Order).add(order)

    # Verify that Inventory Level has been reduced
    stock = domain.repository_for(Inventory).get(inventory.id)
    print(stock.to_dict())
    assert stock.in_stock == 90
  1. Order aggregate fires OrderShipped event on book being shipped.

  2. Event handler is registered with part_of=Order so it subscribes to the Order aggregate's event stream. In production with async processing, you would typically use part_of=Inventory, stream_category="order" to keep the handler associated with its owning aggregate while listening to another aggregate's stream.

  3. Event handler calls reduce_stock() on the Inventory aggregate — delegating to a domain method rather than mutating state directly.

Simulating the example, we can see that the stock levels were decreased in response to the OrderShipped event.

In [1]: order = Order(book_id="book-1", quantity=10, total_amount=100)

In [2]: domain.repository_for(Order).add(order)
Out[2]: <Order: Order object (id: 62f8fa8d-2963-4539-bd21-860d3bab639e)>

In [3]: inventory = Inventory(book_id="book-1", in_stock=100)

In [4]: domain.repository_for(Inventory).add(inventory)
Out[4]: <Inventory: Inventory object (id: 9272d70f-b796-417d-8f30-e01302d9f1a9)>

In [5]: order.ship_order()

In [6]: domain.repository_for(Order).add(order)
Out[6]: <Order: Order object (id: 62f8fa8d-2963-4539-bd21-860d3bab639e)>

In [7]: stock = domain.repository_for(Inventory).get(inventory.id)

In [8]: stock.to_dict()
Out[8]: {
 'book_id': 'book-1',
 'in_stock': 90,
 'id': '9272d70f-b796-417d-8f30-e01302d9f1a9'
 }
Internal workflow

Event handlers follow an asynchronous, fire-and-forget pattern. When an event is published, event handlers process it without returning any values to the caller.

sequenceDiagram
  autonumber
  Aggregate->>Domain: Publish Event
  Domain->>Event Store: Store Event
  Event Store-->>Domain: Done
  Domain-->>Aggregate: Done

  Note over Domain,Event Handler: Asynchronous Processing

  Event Store->>Event Handler: Deliver Event
  Event Handler->>Event Handler: Process Event
  Event Handler->>Repository: Load/Update Aggregates
  Repository-->>Event Handler: Done
  Event Handler->>Event Handler: Perform Side Effects
  Event Handler->>Repository: Persist Aggregates
  1. Aggregate Publishes Event: An action in an aggregate triggers an event to be published.
  2. Domain Stores Event: The domain stores the event in the event store.
  3. Event Store Confirms Storage: The event store confirms the event has been stored.
  4. Domain Returns to Aggregate: The domain returns control to the aggregate.
  5. Event Store Delivers Event: Asynchronously, the event store delivers the event to all subscribed event handlers.
  6. Event Handler Processes Event: The event handler receives and processes the event.
  7. Event Handler Loads/Updates Aggregates: If needed, the event handler loads and updates relevant aggregates.
  8. Repository Returns Data: The repository returns requested data to the event handler.
  9. Event Handler Performs Side Effects: The event handler may perform additional side effects (sending emails, updating other systems, etc.).
  10. Event Handler Persists Data and Optionally Raises Events: The event handler persists the mutated aggregate, which can also raise events.

The @handle Decorator

The @handle decorator binds a handler method to a specific event type. Import it from protean:

from protean import handle

Each @handle-decorated method receives exactly one event class as its argument. When that event type arrives, the framework invokes the corresponding method with the deserialized event object.

A single event handler class can contain multiple @handle methods, each processing a different event type:

@domain.event_handler(part_of=Inventory, stream_category="order")
class ManageInventory:
    @handle(OrderShipped)
    def reduce_stock(self, event: OrderShipped):
        repo = current_domain.repository_for(Inventory)
        inventory = repo.find_by(book_id=event.book_id)
        inventory.reduce_stock(event.quantity)
        repo.add(inventory)

    @handle(OrderCancelled)
    def restore_stock(self, event: OrderCancelled):
        repo = current_domain.repository_for(Inventory)
        inventory = repo.find_by(book_id=event.book_id)
        inventory.increase_stock(event.quantity)
        repo.add(inventory)

Each @handle method runs within its own Unit of Work. If the handler modifies an aggregate and persists it, the changes are committed atomically. If an error occurs, the transaction is rolled back.

One event class per method

Each @handle method accepts exactly one event class. To handle multiple event types, define multiple methods in the same handler class.

Return Values from Event Handlers

Event handlers in Protean follow the standard CQRS pattern where event handlers do not return values to the caller. This deliberate design choice ensures:

  1. Decoupling: The publisher of events remains completely decoupled from the consumers.
  2. Asynchronous Processing: Events can be processed in the background without blocking.
  3. Multiple Consumers: Multiple event handlers can process the same event independently.

If an event handler needs to communicate information as part of its processing, it should:

  • Emit new events
  • Update relevant aggregates that can be queried later
  • Log information for monitoring purposes

Configuration Options

Handler Options

  • part_of: The aggregate to which the event handler is connected.
  • stream_category: The event handler listens to events on this stream category. The stream category defaults to the category of the aggregate associated with the handler.

    An Event Handler can be part of an aggregate, and have the stream category of a different aggregate. This is the mechanism for an aggregate to listen to another aggregate's events to sync its own state. Learn more in the Stream Categories guide.

  • source_stream: When specified, the event handler only consumes events whose origin_stream matches this value. This filters events based on what originally triggered them — useful when the same event type can be raised from different contexts and you only want to react to a specific trigger.

    @domain.event_handler(
        part_of=Notification,
        stream_category="order",
        source_stream="manage_order",
    )
    class EmailNotifications:
        @handle(OrderShipped)
        def send_shipping_email(self, event: OrderShipped):
            # Only invoked when OrderShipped was triggered by a command
            # in the manage_order stream — not by a bulk import or replay.
            ...
    

Required: part_of

Every event handler must specify part_of — the aggregate it belongs to. This association determines the default stream category. You can override the stream with stream_category to listen to a different aggregate's events, but part_of is always required.

Subscription Options

Event handlers can be configured with subscription options that control how messages are consumed when running the Protean server:

  • subscription_type: Type of subscription to use:

    • "stream": Uses Redis Streams with consumer groups (recommended for production)
    • "event_store": Reads directly from event store (for projections/replay)
  • subscription_profile: Pre-configured profile for common scenarios:

    • "production": High throughput with reliability guarantees
    • "fast": Low-latency processing
    • "batch": High-volume batch processing
    • "debug": Development and debugging
    • "projection": Building read models (uses event_store type)
  • subscription_config: Dictionary of specific configuration options:

    • messages_per_tick: Messages to process per batch
    • blocking_timeout_ms: Timeout for blocking reads (stream only)
    • max_retries: Retry attempts before DLQ (stream only)
    • enable_dlq: Enable dead letter queue (stream only)
    • position_update_interval: Position update frequency (event_store only)

Example with Subscription Configuration

@domain.event_handler(
    part_of=Order,
    subscription_profile="production",
    subscription_config={
        "messages_per_tick": 100,
        "enable_dlq": True,
    }
)
class OrderEventHandler:
    @handle(OrderCreated)
    def send_confirmation(self, event):
        ...

Retry and Dead Letter Queue (DLQ)

When using Redis Streams (subscription_type="stream"), Protean supports automatic retries and dead-letter queuing for failed messages:

  • max_retries (default: 3): The number of times a failed message is re-delivered to the consumer before it is moved to the dead letter queue. Each retry re-delivers the message from the pending entries list (PEL).

  • enable_dlq (default: False): When enabled, messages that exhaust their retry budget are moved to a separate dead-letter stream (<stream>:dlq) instead of being silently dropped. You can inspect and reprocess DLQ messages manually or with a scheduled job.

@domain.event_handler(
    part_of=Order,
    subscription_config={
        "max_retries": 5,
        "enable_dlq": True,
    }
)
class CriticalOrderHandler:
    @handle(OrderPlaced)
    def process_order(self, event: OrderPlaced):
        # If this fails 5 times, the message moves to the DLQ
        ...

Event store subscriptions

Retry and DLQ are only available with subscription_type="stream" (Redis Streams). Event store subscriptions track position and re-read on restart but do not have per-message retry semantics.

See Server → Configuration for detailed configuration options and the priority hierarchy.

Error Handling

Protean provides a robust error handling mechanism for event handlers through the optional handle_error method. This method allows event handlers to gracefully recover from exceptions without disrupting the overall event processing pipeline.

The handle_error Method

You can add a handle_error class method to your event handler to implement custom error handling:

@domain.event_handler(part_of=Inventory)
class InventoryEventHandler:
    @handle(OrderShipped)
    def update_inventory(self, event):
        # Event handling logic that might raise exceptions
        ...

    @classmethod
    def handle_error(cls, exc: Exception, message):
        """Custom error handling for event processing failures"""
        # Log the error
        logger.error(f"Failed to process event {message.type}: {exc}")

        # Perform recovery operations
        # Example: store failed events for retry, trigger compensating actions, etc.
        ...

How It Works

  1. When an exception occurs during event processing, the Protean Engine catches it.
  2. The engine logs detailed error information including stack traces.
  3. The engine calls the event handler's handle_error method, passing:
  4. The original exception that was raised
  5. The event message being processed when the exception occurred
  6. After handle_error completes, processing continues with subsequent events.

Error Handler Failures

If an exception occurs within the handle_error method itself, the Protean Engine will catch and log that exception as well, ensuring that the event processing pipeline continues to function. This provides an additional layer of resilience:

@classmethod
def handle_error(cls, exc: Exception, message):
    try:
        # Error handling logic that might itself fail
        ...
    except Exception as error_exc:
        # The engine will catch and log this secondary exception
        logger.error(f"Error handler failed: {error_exc}")
        # Processing continues regardless

See also

Concept overview: Event Handlers — How event handlers consume and react to domain events.

Related guides:

  • Subscribers — For consuming messages from external brokers instead of the internal event store.
  • Projectors — For maintaining read-optimized projections from domain events.

Patterns: