Event Handlers
DDD CQRS ES
When an aggregate changes state, other parts of the system often need to react — updating a different aggregate, sending a notification, or triggering a downstream process. Putting that logic inside the originating aggregate would violate its boundary. Event handlers solve this: they listen for domain events and execute side effects in their own transaction, keeping aggregates decoupled.
Event handlers consume events raised in an aggregate and help sync the state of the aggregate with other aggregates and other systems. They are the preferred mechanism to update multiple aggregates.
Choosing the Right Consumer
Protean provides four elements that consume events or messages. Each serves a distinct purpose:
| Element | Consumes | Source | Association | Dispatch | Use case |
|---|---|---|---|---|---|
| Event Handler | Domain events | Internal event store | part_of (aggregate) |
@handle(EventClass) |
Side effects, cross-aggregate sync |
| Projector | Domain events | Internal event store | projector_for (projection) |
@on(EventClass) |
Maintain read models (projections) |
| Subscriber | Raw dict payloads |
External message broker | stream (broker stream) |
__call__(payload) |
Anti-corruption layer for external systems |
| Query Handler | Queries | Synchronous dispatch | part_of (projection) |
@read(QueryClass) |
Structured read-side access |
Use event handlers when you need to react to internal domain events with side effects (syncing aggregates, sending notifications, triggering processes). Use projectors when the reaction is specifically maintaining a read model. Use subscribers when messages come from outside your bounded context. Use query handlers when you need structured, validated read access to projections.
Defining an Event Handler
Event Handlers are defined with the Domain.event_handler decorator. Below is
a simplified example of an Event Handler that syncs stock levels in
Inventory in response to changes in the Order aggregate.
from protean import Domain, handle
from protean.fields import Identifier, Integer, String
domain = Domain()
domain.config["event_processing"] = "sync"
@domain.event(part_of="Order")
class OrderShipped:
order_id: Identifier(required=True)
book_id: Identifier(required=True)
quantity: Integer(required=True)
total_amount: Integer(required=True)
@domain.aggregate
class Order:
book_id: Identifier(required=True)
quantity: Integer(required=True)
total_amount: Integer(required=True)
status: String(choices=["PENDING", "SHIPPED", "DELIVERED"], default="PENDING")
def ship_order(self):
self.status = "SHIPPED"
self.raise_( # (1)
OrderShipped(
order_id=self.id,
book_id=self.book_id,
quantity=self.quantity,
total_amount=self.total_amount,
)
)
@domain.aggregate
class Inventory:
book_id: Identifier(required=True)
in_stock: Integer(required=True)
def reduce_stock(self, quantity: int) -> None:
self.in_stock -= quantity
@domain.event_handler(part_of=Order) # (2)
class ManageInventory:
@handle(OrderShipped)
def reduce_stock_level(self, event: OrderShipped):
repo = domain.repository_for(Inventory)
inventory = repo.find_by(book_id=event.book_id)
inventory.reduce_stock(event.quantity) # (3)
repo.add(inventory)
domain.init()
with domain.domain_context():
# Persist Order
order = Order(book_id="book-1", quantity=10, total_amount=100)
domain.repository_for(Order).add(order)
# Persist Inventory
inventory = Inventory(book_id="book-1", in_stock=100)
domain.repository_for(Inventory).add(inventory)
# Ship Order
order.ship_order()
domain.repository_for(Order).add(order)
# Verify that Inventory Level has been reduced
stock = domain.repository_for(Inventory).get(inventory.id)
print(stock.to_dict())
assert stock.in_stock == 90
-
Orderaggregate firesOrderShippedevent on book being shipped. -
Event handler is registered with
part_of=Orderso it subscribes to the Order aggregate's event stream. In production with async processing, you would typically usepart_of=Inventory, stream_category="order"to keep the handler associated with its owning aggregate while listening to another aggregate's stream. -
Event handler calls
reduce_stock()on theInventoryaggregate — delegating to a domain method rather than mutating state directly.
Simulating the example, we can see that the stock levels were decreased in
response to the OrderShipped event.
In [1]: order = Order(book_id="book-1", quantity=10, total_amount=100)
In [2]: domain.repository_for(Order).add(order)
Out[2]: <Order: Order object (id: 62f8fa8d-2963-4539-bd21-860d3bab639e)>
In [3]: inventory = Inventory(book_id="book-1", in_stock=100)
In [4]: domain.repository_for(Inventory).add(inventory)
Out[4]: <Inventory: Inventory object (id: 9272d70f-b796-417d-8f30-e01302d9f1a9)>
In [5]: order.ship_order()
In [6]: domain.repository_for(Order).add(order)
Out[6]: <Order: Order object (id: 62f8fa8d-2963-4539-bd21-860d3bab639e)>
In [7]: stock = domain.repository_for(Inventory).get(inventory.id)
In [8]: stock.to_dict()
Out[8]: {
'book_id': 'book-1',
'in_stock': 90,
'id': '9272d70f-b796-417d-8f30-e01302d9f1a9'
}
Internal workflow
Event handlers follow an asynchronous, fire-and-forget pattern. When an event is published, event handlers process it without returning any values to the caller.
sequenceDiagram
autonumber
Aggregate->>Domain: Publish Event
Domain->>Event Store: Store Event
Event Store-->>Domain: Done
Domain-->>Aggregate: Done
Note over Domain,Event Handler: Asynchronous Processing
Event Store->>Event Handler: Deliver Event
Event Handler->>Event Handler: Process Event
Event Handler->>Repository: Load/Update Aggregates
Repository-->>Event Handler: Done
Event Handler->>Event Handler: Perform Side Effects
Event Handler->>Repository: Persist Aggregates
- Aggregate Publishes Event: An action in an aggregate triggers an event to be published.
- Domain Stores Event: The domain stores the event in the event store.
- Event Store Confirms Storage: The event store confirms the event has been stored.
- Domain Returns to Aggregate: The domain returns control to the aggregate.
- Event Store Delivers Event: Asynchronously, the event store delivers the event to all subscribed event handlers.
- Event Handler Processes Event: The event handler receives and processes the event.
- Event Handler Loads/Updates Aggregates: If needed, the event handler loads and updates relevant aggregates.
- Repository Returns Data: The repository returns requested data to the event handler.
- Event Handler Performs Side Effects: The event handler may perform additional side effects (sending emails, updating other systems, etc.).
- Event Handler Persists Data and Optionally Raises Events: The event handler persists the mutated aggregate, which can also raise events.
The @handle Decorator
The @handle decorator binds a handler method to a specific event type.
Import it from protean:
from protean import handle
Each @handle-decorated method receives exactly one event class as its
argument. When that event type arrives, the framework invokes the
corresponding method with the deserialized event object.
A single event handler class can contain multiple @handle methods, each
processing a different event type:
@domain.event_handler(part_of=Inventory, stream_category="order")
class ManageInventory:
@handle(OrderShipped)
def reduce_stock(self, event: OrderShipped):
repo = current_domain.repository_for(Inventory)
inventory = repo.find_by(book_id=event.book_id)
inventory.reduce_stock(event.quantity)
repo.add(inventory)
@handle(OrderCancelled)
def restore_stock(self, event: OrderCancelled):
repo = current_domain.repository_for(Inventory)
inventory = repo.find_by(book_id=event.book_id)
inventory.increase_stock(event.quantity)
repo.add(inventory)
Each @handle method runs within its own Unit of Work. If the handler
modifies an aggregate and persists it, the changes are committed atomically.
If an error occurs, the transaction is rolled back.
One event class per method
Each @handle method accepts exactly one event class. To handle multiple
event types, define multiple methods in the same handler class.
Return Values from Event Handlers
Event handlers in Protean follow the standard CQRS pattern where event handlers do not return values to the caller. This deliberate design choice ensures:
- Decoupling: The publisher of events remains completely decoupled from the consumers.
- Asynchronous Processing: Events can be processed in the background without blocking.
- Multiple Consumers: Multiple event handlers can process the same event independently.
If an event handler needs to communicate information as part of its processing, it should:
- Emit new events
- Update relevant aggregates that can be queried later
- Log information for monitoring purposes
Configuration Options
Handler Options
part_of: The aggregate to which the event handler is connected.-
stream_category: The event handler listens to events on this stream category. The stream category defaults to the category of the aggregate associated with the handler.An Event Handler can be part of an aggregate, and have the stream category of a different aggregate. This is the mechanism for an aggregate to listen to another aggregate's events to sync its own state. Learn more in the Stream Categories guide.
-
source_stream: When specified, the event handler only consumes events whoseorigin_streammatches this value. This filters events based on what originally triggered them — useful when the same event type can be raised from different contexts and you only want to react to a specific trigger.@domain.event_handler( part_of=Notification, stream_category="order", source_stream="manage_order", ) class EmailNotifications: @handle(OrderShipped) def send_shipping_email(self, event: OrderShipped): # Only invoked when OrderShipped was triggered by a command # in the manage_order stream — not by a bulk import or replay. ...
Required: part_of
Every event handler must specify part_of — the aggregate it belongs to.
This association determines the default stream category. You can override
the stream with stream_category to listen to a different aggregate's
events, but part_of is always required.
Subscription Options
Event handlers can be configured with subscription options that control how messages are consumed when running the Protean server:
-
subscription_type: Type of subscription to use:"stream": Uses Redis Streams with consumer groups (recommended for production)"event_store": Reads directly from event store (for projections/replay)
-
subscription_profile: Pre-configured profile for common scenarios:"production": High throughput with reliability guarantees"fast": Low-latency processing"batch": High-volume batch processing"debug": Development and debugging"projection": Building read models (uses event_store type)
-
subscription_config: Dictionary of specific configuration options:messages_per_tick: Messages to process per batchblocking_timeout_ms: Timeout for blocking reads (stream only)max_retries: Retry attempts before DLQ (stream only)enable_dlq: Enable dead letter queue (stream only)position_update_interval: Position update frequency (event_store only)
Example with Subscription Configuration
@domain.event_handler(
part_of=Order,
subscription_profile="production",
subscription_config={
"messages_per_tick": 100,
"enable_dlq": True,
}
)
class OrderEventHandler:
@handle(OrderCreated)
def send_confirmation(self, event):
...
Retry and Dead Letter Queue (DLQ)
When using Redis Streams (subscription_type="stream"), Protean supports
automatic retries and dead-letter queuing for failed messages:
-
max_retries(default:3): The number of times a failed message is re-delivered to the consumer before it is moved to the dead letter queue. Each retry re-delivers the message from the pending entries list (PEL). -
enable_dlq(default:False): When enabled, messages that exhaust their retry budget are moved to a separate dead-letter stream (<stream>:dlq) instead of being silently dropped. You can inspect and reprocess DLQ messages manually or with a scheduled job.
@domain.event_handler(
part_of=Order,
subscription_config={
"max_retries": 5,
"enable_dlq": True,
}
)
class CriticalOrderHandler:
@handle(OrderPlaced)
def process_order(self, event: OrderPlaced):
# If this fails 5 times, the message moves to the DLQ
...
Event store subscriptions
Retry and DLQ are only available with subscription_type="stream" (Redis
Streams). Event store subscriptions track position and re-read on restart
but do not have per-message retry semantics.
See Server → Configuration for detailed configuration options and the priority hierarchy.
Error Handling
Protean provides a robust error handling mechanism for event handlers through the optional handle_error method. This method allows event handlers to gracefully recover from exceptions without disrupting the overall event processing pipeline.
The handle_error Method
You can add a handle_error class method to your event handler to implement custom error handling:
@domain.event_handler(part_of=Inventory)
class InventoryEventHandler:
@handle(OrderShipped)
def update_inventory(self, event):
# Event handling logic that might raise exceptions
...
@classmethod
def handle_error(cls, exc: Exception, message):
"""Custom error handling for event processing failures"""
# Log the error
logger.error(f"Failed to process event {message.type}: {exc}")
# Perform recovery operations
# Example: store failed events for retry, trigger compensating actions, etc.
...
How It Works
- When an exception occurs during event processing, the Protean Engine catches it.
- The engine logs detailed error information including stack traces.
- The engine calls the event handler's
handle_errormethod, passing: - The original exception that was raised
- The event message being processed when the exception occurred
- After
handle_errorcompletes, processing continues with subsequent events.
Error Handler Failures
If an exception occurs within the handle_error method itself, the Protean Engine will catch and log that exception as well, ensuring that the event processing pipeline continues to function. This provides an additional layer of resilience:
@classmethod
def handle_error(cls, exc: Exception, message):
try:
# Error handling logic that might itself fail
...
except Exception as error_exc:
# The engine will catch and log this secondary exception
logger.error(f"Error handler failed: {error_exc}")
# Processing continues regardless
See also
Concept overview: Event Handlers — How event handlers consume and react to domain events.
Related guides:
- Subscribers — For consuming messages from external brokers instead of the internal event store.
- Projectors — For maintaining read-optimized projections from domain events.
Patterns:
- Idempotent Event Handlers — Ensuring handlers produce correct results even with duplicate delivery.
- Thin Handlers, Rich Domain — Keeping handlers thin by delegating to domain logic.
- Testing Event-Driven Flows — Strategies for testing event handlers with pytest.