Handlers
Command handlers, event handlers, and query handlers process messages. Command
and event handlers are associated with aggregates and use the @handle
decorator. Query handlers are associated with projections and use the @read
decorator.
Guides: Command Handlers · Event Handlers · Query Handlers
BaseCommandHandler
Bases: Element, HandlerMixin, OptionsMixin
Base Command Handler class that should be implemented by all Domain CommandHandlers.
Command handlers process domain commands asynchronously. They can be configured with subscription settings to control message consumption behavior.
Meta Options
| Option | Type | Description |
|---|---|---|
part_of |
type |
The aggregate this handler is associated with. Required. |
stream_category |
str |
Read-only. Derived from the associated aggregate's stream category. |
subscription_type |
str |
The subscription type (STREAM or EVENT_STORE). |
subscription_profile |
str |
A predefined profile (PRODUCTION, FAST, BATCH, DEBUG, PROJECTION). |
subscription_config |
dict |
Custom configuration overrides that take precedence over profile defaults. |
Note
Unlike event handlers, command handlers cannot have their stream_category explicitly set. It is always derived from the aggregate specified in part_of.
Configuration Priority (highest to lowest): 1. Handler Meta subscription_config 2. Handler Meta subscription_profile 3. Handler Meta subscription_type 4. Server-level handler-specific config 5. Server-level defaults 6. Profile defaults 7. Hardcoded defaults
Example::
@domain.command_handler(
part_of=Order,
subscription_profile=SubscriptionProfile.PRODUCTION,
subscription_config={"messages_per_tick": 50},
)
class OrderCommandHandler(BaseCommandHandler):
@handle(PlaceOrder)
def handle_place_order(self, command):
pass
BaseEventHandler
Bases: Element, HandlerMixin, OptionsMixin
Base Event Handler to be inherited by all event handlers.
Event handlers process domain events asynchronously. They can be configured with subscription settings to control message consumption behavior.
Meta Options
| Option | Type | Description |
|---|---|---|
part_of |
type |
The aggregate this handler is associated with. |
source_stream |
str |
Optional source stream filter for origin filtering. |
stream_category |
str |
The stream category to subscribe to. Defaults to aggregate's category. |
subscription_type |
str |
The subscription type (STREAM or EVENT_STORE). |
subscription_profile |
str |
A predefined profile (PRODUCTION, FAST, BATCH, DEBUG, PROJECTION). |
subscription_config |
dict |
Custom configuration overrides that take precedence over profile defaults. |
Configuration Priority (highest to lowest): 1. Handler Meta subscription_config 2. Handler Meta subscription_profile 3. Handler Meta subscription_type 4. Server-level handler-specific config 5. Server-level defaults 6. Profile defaults 7. Hardcoded defaults
Example::
@domain.event_handler(
part_of=Order,
subscription_profile=SubscriptionProfile.PRODUCTION,
subscription_config={"messages_per_tick": 50},
)
class OrderEventHandler(BaseEventHandler):
@handle(OrderPlaced)
def handle_order_placed(self, event):
pass
BaseQueryHandler
Bases: Element, HandlerMixin, OptionsMixin
Base Query Handler class that should be implemented by all Domain QueryHandlers.
Query handlers process domain queries synchronously and return results.
They are always associated with a Projection via part_of.
Meta Options
| Option | Type | Description |
|---|---|---|
part_of |
type |
The projection this handler is associated with. Required. |
Note
Query handlers are synchronous only. They have no stream category, no subscription configuration, and no UnitOfWork wrapping.
Example::
@domain.query_handler(part_of=OrderSummary)
class OrderSummaryQueryHandler(BaseQueryHandler):
@read(GetOrdersByCustomer)
def get_by_customer(self, query):
view = current_domain.view_for(OrderSummary)
return view.query.filter(
customer_id=query.customer_id
).all()