Process Managers
CQRS ES
Some business processes span multiple aggregates and take multiple steps to complete — an order fulfillment flow that moves through payment, inventory reservation, and shipping, for example. Event handlers work for simple one-step reactions, but when you need to track where you are in a multi-step workflow and decide what to do next, you need a process manager.
Process managers coordinate multi-step business processes that span multiple aggregates. They react to domain events from different streams, maintain their own state, and issue commands to drive other aggregates forward.
The Event Chain: How Steps Connect
Before looking at the API, it's important to understand how a process manager chains steps together. The mechanism is a command-event loop:
- An event arrives at the PM from an aggregate's stream.
- The PM handler processes the event and issues a command to another aggregate.
- That aggregate's command handler processes the command and raises a new event.
- The new event arrives at the PM (because the PM subscribes to that aggregate's stream).
- The next PM handler runs, potentially issuing another command.
- The cycle repeats until the PM reaches a terminal state.
Here is the full loop for a two-step order fulfillment (order → payment → shipping):
Order aggregate Payment aggregate
────────────── ─────────────────
raises OrderPlaced ──┐ ┌──► processes RequestPayment
│ │ raises PaymentConfirmed ──┐
▼ │ │
┌────────────────────┐ │ │
│ OrderFulfillmentPM │ │ │
│ │ │ │
│ on_order_placed() │─────────┘ │
│ issues RequestPayment │
│ │◄───────────────────────────────────────┘
│ on_payment_confirmed()
│ issues CreateShipment ─────────┐
│ │ │ Shipping aggregate
│ on_shipment_delivered()◄──────┐ │ ─────────────────
│ mark_as_complete() │ │ └──► processes CreateShipment
└────────────────────┘ │ raises ShipmentDelivered
└────────────────────────┘
Each arrow crosses an aggregate boundary through the event store. The PM never calls aggregate methods directly — it issues commands, and the aggregates' command handlers decide how to execute them.
Why stream_categories Must Match the Aggregates You Command
The PM issues commands to Payment and Shipping aggregates. Those
aggregates' command handlers raise events on their own streams
(ecommerce::payment and ecommerce::shipping). For the PM to see
PaymentConfirmed and ShipmentDelivered, it must subscribe to those
streams.
Rule of thumb: if a PM handler issues a command to aggregate X, the PM
must include aggregate X's stream in its stream_categories (or aggregates
list). Otherwise, the PM will never see the response event and the workflow
will stall.
# The PM issues commands to Order, Payment, and Shipping aggregates.
# Therefore, it subscribes to all three streams.
@domain.process_manager(
stream_categories=["ecommerce::order", "ecommerce::payment", "ecommerce::shipping"]
)
class OrderFulfillmentPM:
...
Defining a Process Manager
Process managers are defined with the Domain.process_manager decorator. Each
handler method uses the @handle decorator with additional parameters for
lifecycle management and event correlation.
from protean import Domain, handle
from protean.fields import Float, Identifier, String
domain = Domain(__file__, "ecommerce")
@domain.event(part_of="Order")
class OrderPlaced:
order_id: Identifier(required=True)
customer_id: Identifier(required=True)
total: Float(required=True)
@domain.event(part_of="Payment")
class PaymentConfirmed:
payment_id: Identifier(required=True)
order_id: Identifier(required=True)
amount: Float(required=True)
@domain.event(part_of="Payment")
class PaymentFailed:
payment_id: Identifier(required=True)
order_id: Identifier(required=True)
reason: String(required=True)
@domain.event(part_of="Shipping")
class ShipmentDelivered:
order_id: Identifier(required=True)
@domain.aggregate
class Order:
customer_id: Identifier(required=True)
total: Float(required=True)
status: String(default="new")
@domain.aggregate
class Payment:
order_id: Identifier(required=True)
amount: Float(required=True)
@domain.aggregate
class Shipping:
order_id: Identifier(required=True)
@domain.process_manager(
stream_categories=["ecommerce::order", "ecommerce::payment", "ecommerce::shipping"]
)
class OrderFulfillmentPM:
order_id: Identifier()
payment_id: Identifier()
status: String(default="new")
@handle(OrderPlaced, start=True, correlate="order_id") # (1) (2)
def on_order_placed(self, event: OrderPlaced) -> None:
self.order_id = event.order_id
self.status = "awaiting_payment"
@handle(PaymentConfirmed, correlate="order_id")
def on_payment_confirmed(self, event: PaymentConfirmed) -> None:
self.payment_id = event.payment_id
self.status = "awaiting_shipment"
@handle(PaymentFailed, correlate="order_id", end=True) # (3)
def on_payment_failed(self, event: PaymentFailed) -> None:
self.status = "cancelled"
@handle(ShipmentDelivered, correlate="order_id")
def on_shipment_delivered(self, event: ShipmentDelivered) -> None:
self.status = "completed"
self.mark_as_complete() # (4)
-
start=Truemarks this handler as the entry point — it creates a new PM instance when the event arrives. -
correlate="order_id"extractsevent.order_idto identify which PM instance should handle each event. -
end=Trueautomatically marks the PM as complete after the handler runs. -
mark_as_complete()explicitly marks the PM as complete from within a handler.
Issuing Commands
Process managers drive other aggregates forward by issuing commands:
from protean import Domain, current_domain, handle
from protean.fields import Float, Identifier, String
domain = Domain(__file__, "ecommerce")
@domain.command(part_of="Payment")
class RequestPayment:
order_id: Identifier(required=True)
amount: Float(required=True)
@domain.command(part_of="Order")
class CancelOrder:
order_id: Identifier(required=True)
@domain.event(part_of="Order")
class OrderPlaced:
order_id: Identifier(required=True)
customer_id: Identifier(required=True)
total: Float(required=True)
@domain.event(part_of="Payment")
class PaymentFailed:
payment_id: Identifier(required=True)
order_id: Identifier(required=True)
reason: String(required=True)
@domain.aggregate
class Order:
customer_id: Identifier(required=True)
total: Float(required=True)
@domain.aggregate
class Payment:
order_id: Identifier(required=True)
amount: Float(required=True)
@domain.process_manager(stream_categories=["ecommerce::order", "ecommerce::payment"])
class OrderPaymentPM:
order_id: Identifier()
status: String(default="new")
@handle(OrderPlaced, start=True, correlate="order_id")
def on_order_placed(self, event: OrderPlaced) -> None:
self.order_id = event.order_id
self.status = "awaiting_payment"
current_domain.process( # (1)
RequestPayment(order_id=event.order_id, amount=event.total)
)
@handle(PaymentFailed, correlate="order_id", end=True)
def on_payment_failed(self, event: PaymentFailed) -> None:
self.status = "cancelled"
current_domain.process( # (2)
CancelOrder(order_id=self.order_id)
)
-
The handler issues
RequestPaymentto thePaymentaggregate. This command is processed byPayment's command handler, which will raise eitherPaymentConfirmedorPaymentFailed— and the PM will see that response event because it subscribes to theecommerce::paymentstream. -
On payment failure, the handler issues
CancelOrderto compensate, andend=Truemarks the PM as complete.
Commands issued inside a handler are committed atomically as part of the same Unit of Work.
Process Manager Workflow
Under the hood, this is what happens each time an event is delivered to a PM:
sequenceDiagram
autonumber
participant ES as Event Store
participant PM as Process Manager
participant PMStream as PM Stream
participant D as Domain
ES->>PM: Deliver event
PM->>PMStream: Load PM instance (replay transitions)
PMStream-->>PM: Reconstituted state
PM->>PM: Run handler method
PM->>D: Issue command(s) via domain.process()
PM->>PMStream: Persist transition event
- Event arrives: The event store delivers an event from a subscribed stream.
- Load instance: The framework extracts the correlation value from the event and loads the PM instance by replaying its transition events from the PM's own stream.
- Run handler: The matched handler method executes with the PM's current state available.
- Issue commands: The handler can issue commands to drive other aggregates.
- Persist transition: After the handler completes, the framework captures the PM's field state as a transition event and appends it to the PM's stream.
Correlation
Correlation determines which PM instance handles each event. Every handler must
declare a correlate parameter.
String Correlation
The simplest form — the PM field name matches the event field name:
@handle(OrderPlaced, start=True, correlate="order_id")
def on_order_placed(self, event: OrderPlaced) -> None:
self.order_id = event.order_id
Here, event.order_id is extracted and used to find or create the PM instance.
Dictionary Correlation
When the PM field name differs from the event field name, use a dictionary:
@handle(ExternalPaymentReceived, correlate={"order_id": "ext_order_ref"})
def on_payment_received(self, event: ExternalPaymentReceived) -> None:
...
This extracts event.ext_order_ref and maps it to the PM's order_id field.
Lifecycle Management
Starting a Process
Exactly one handler must be marked with start=True. When a start event
arrives and no PM instance exists for that correlation value, a new instance
is created. If a non-start event arrives with no existing PM, it is silently
skipped.
Completing a Process
There are two ways to mark a PM as complete:
Using end=True — the PM is automatically marked complete after the handler
runs:
@handle(PaymentFailed, correlate="order_id", end=True)
def on_payment_failed(self, event: PaymentFailed) -> None:
self.status = "cancelled"
Using mark_as_complete() — call explicitly within a handler for
conditional completion:
@handle(ShipmentDelivered, correlate="order_id")
def on_shipment_delivered(self, event: ShipmentDelivered) -> None:
self.status = "completed"
self.mark_as_complete()
Completed Process Managers Skip Events
Once a PM is marked complete, any subsequent events for that correlation value are silently skipped. No new transition is persisted and no handler runs.
Configuration Options
Stream Sources
-
stream_categories: List of stream categories the PM subscribes to. Include every aggregate stream that the PM needs to see events from — both the stream that triggers the workflow and the streams of aggregates the PM issues commands to.@domain.process_manager( stream_categories=["ecommerce::order", "ecommerce::payment", "ecommerce::shipping"] ) class OrderFulfillmentPM: ... -
aggregates: Alternative tostream_categories— specify aggregates and Protean infers the stream categories from their stream configurations.@domain.process_manager(aggregates=[Order, Payment, Shipping]) class OrderFulfillmentPM: ...
Subscription Options
Process managers support the same subscription configuration as event handlers:
subscription_type:"stream"or"event_store"subscription_profile:"production","fast","batch","debug","projection"subscription_config: Dictionary of specific configuration options
See Server → Configuration for details.
Handling Events from Other Domains
Process managers often coordinate workflows that span multiple bounded contexts — for example, an order fulfillment PM that reacts to events from Billing and Inventory domains in addition to its own Order domain.
When multiple domains are co-located in the same repository and share
the same event store, use register_external_event() to give the PM typed
access to external events:
from protean.core.event import BaseEvent
from protean.fields import Float, Identifier
# Define external event classes in YOUR domain (no imports from other packages)
class PaymentReceived(BaseEvent):
payment_id = Identifier(required=True)
order_id = Identifier(required=True)
amount = Float()
class InventoryReserved(BaseEvent):
order_id = Identifier(required=True)
product_id = Identifier(required=True)
# Register them with the type strings used by the publishing domains
domain.register_external_event(PaymentReceived, "Billing.PaymentReceived.v1")
domain.register_external_event(InventoryReserved, "Inventory.InventoryReserved.v1")
Then reference these events in the PM's handlers and include the external
streams in stream_categories:
@domain.process_manager(
stream_categories=[
"ecommerce::order", # Own domain
"billing::payment", # External domain
"inventory::inventory_item", # External domain
]
)
class OrderFulfillmentPM:
order_id = Identifier()
status = String(default="new")
@handle(OrderPlaced, start=True, correlate="order_id")
def on_order_placed(self, event: OrderPlaced) -> None:
self.order_id = event.order_id
self.status = "awaiting_payment"
@handle(PaymentReceived, correlate="order_id")
def on_payment_received(self, event: PaymentReceived) -> None:
if self.status != "awaiting_payment":
return
self.status = "awaiting_inventory"
current_domain.process(ReserveInventory(order_id=self.order_id))
@handle(InventoryReserved, correlate="order_id")
def on_inventory_reserved(self, event: InventoryReserved) -> None:
if self.status != "awaiting_inventory":
return
self.status = "completed"
self.mark_as_complete()
When domains are distributed as independent services, use subscribers instead. The subscriber acts as an anti-corruption layer, translating raw broker payloads into internal commands or events that your PM can react to. See Multi-Domain Applications — Cross-domain communication for guidance on choosing between the two approaches.
Error Handling
Process managers can define a handle_error class method for custom error
handling, following the same pattern as
event handlers:
@domain.process_manager(stream_categories=["ecommerce::order", "ecommerce::payment"])
class OrderFulfillmentPM:
...
@classmethod
def handle_error(cls, exc: Exception, message):
logger.error(f"PM failed to process {message.type}: {exc}")
Transition Events
After each handler runs, Protean auto-generates a transition event that captures:
state: A dictionary snapshot of all PM field valueshandler_name: The name of the handler method that ranis_complete: Whether the PM is marked complete
These transition events are stored in the PM's own stream
(<pm_stream_category>-<correlation_value>) and are used to reconstitute the
PM's state when loading.
See also
Concept overview: Process Managers — Why process managers exist, when to use them vs. event handlers, and how the event chain works.
Patterns:
- Coordinating Long-Running Processes — Building resilient process managers with idempotency, compensation, and timeout handling.
Related guide: Message Tracing — Correlation and causation IDs that thread through process manager workflows, plus the programmatic causation chain API.