Skip to content

Chapter 20: Orchestrating Multi-Step Workflows — Process Managers

Order fulfillment in a real bookstore is a multi-step process: confirm the order, reserve inventory, arrange shipping. If shipping fails (invalid address), the inventory reservation must be reversed. This is a long-running workflow that spans multiple aggregates and must handle failures gracefully.

A domain service (Chapter 13) handles synchronous, single-step cross-aggregate validation. A process manager handles asynchronous, multi-step workflows with compensation logic.

Domain Service vs. Process Manager

Domain Service Process Manager
When Synchronous, same transaction Asynchronous, across multiple transactions
State Stateless Stateful — remembers what happened so far
Failure Rolls back the transaction Issues compensating commands
Use case "Check inventory before confirming" "Confirm → reserve → ship → complete"

The Shipping Aggregate

First, we need a Shipping aggregate to represent shipments:

@domain.aggregate
class Shipment:
    order_id: Identifier(required=True)
    status: String(max_length=20, default="PENDING")
    tracking_number: String(max_length=50)

    def create_shipment(self, address: str):
        """Attempt to create a shipment. May fail if address is invalid."""
        if not address or address.strip() == "":
            self.status = "FAILED"
            self.raise_(
                ShipmentFailed(order_id=self.order_id, reason="Invalid address")
            )
        else:
            self.status = "CREATED"
            self.tracking_number = f"TRK-{self.order_id[:8]}"
            self.raise_(
                ShipmentCreated(
                    order_id=self.order_id, tracking_number=self.tracking_number
                )
            )

The create_shipment method validates the address and raises either ShipmentCreated or ShipmentFailed. This is important — the process manager reacts to these events to decide the next step.

The Process Manager

The OrderFulfillmentPM coordinates the fulfillment workflow:

@domain.process_manager(stream_categories=["order", "inventory", "shipment"])
class OrderFulfillmentPM:
    """Coordinates the order fulfillment workflow across aggregates."""

    order_id: Identifier(required=True)

    @handle(OrderConfirmed, start=True, correlate="order_id")
    def on_order_confirmed(self, event: OrderConfirmed):
        """Step 1: Order confirmed — reserve inventory."""
        current_domain.process(
            ReserveInventory(
                order_id=event.order_id,
                book_id="placeholder",  # In a real system, load order items
                quantity=1,
            )
        )

    @handle(InventoryReserved, correlate={"order_id": "book_id"})
    def on_inventory_reserved(self, event: InventoryReserved):
        """Step 2: Inventory reserved — create shipment."""
        order = current_domain.repository_for(Order).get(self.order_id)
        current_domain.process(
            CreateShipment(
                order_id=self.order_id,
                address=order.shipping_address or "",
            )
        )

    @handle(ShipmentCreated, correlate="order_id")
    def on_shipment_created(self, event: ShipmentCreated):
        """Step 3: Shipment created — complete the order."""
        current_domain.process(CompleteOrder(order_id=event.order_id))
        self.mark_as_complete()

    @handle(ShipmentFailed, correlate="order_id")
    def on_shipment_failed(self, event: ShipmentFailed):
        """Compensation: Shipment failed — release inventory and cancel order."""
        current_domain.process(ReleaseInventory(book_id="placeholder", quantity=1))
        current_domain.process(CancelOrder(order_id=event.order_id))
        self.mark_as_complete()

Key Concepts

stream_categories — The process manager subscribes to events from the order, inventory, and shipment streams. It sees all events from these three aggregates.

order_id field — Process managers are stateful. The order_id field is persisted between events, so the PM remembers which order it is tracking across the entire workflow.

start=True — The on_order_confirmed handler is marked as the start event. When an OrderConfirmed event arrives, Protean creates a new PM instance and stores the correlated order_id.

correlate — Each handler declares how to match incoming events to existing PM instances:

  • correlate="order_id" — The event's order_id field matches the PM's order_id field directly.
  • correlate={"order_id": "book_id"} — Maps the PM's order_id to the event's book_id field (for events that don't carry order_id).

mark_as_complete() — Marks the PM instance as finished. Subsequent events for this order_id are skipped.

How It Works

  1. OrderConfirmed fires → PM starts (via start=True), issues ReserveInventory command.
  2. InventoryReserved fires → PM issues CreateShipment command.
  3. ShipmentCreated fires → PM issues CompleteOrder command, marks itself complete.
  4. ShipmentFailed fires → PM issues ReleaseInventory and CancelOrder commands (compensation).

The Compensation Pattern

When ShipmentFailed fires, the PM must undo the reservation:

OrderConfirmed ──► ReserveInventory ──► CreateShipment
                                              │
                                       ShipmentFailed
                                              │
                                  ┌───────────┴───────────┐
                                  ▼                       ▼
                          ReleaseInventory          CancelOrder

This is the saga pattern — each step has a compensating action that undoes it if a subsequent step fails. The process manager is the coordinator that decides when to compensate.

Design compensating actions to be idempotent — if ReleaseInventory is processed twice (e.g., due to a retry), the second invocation should be harmless. This is critical for reliability in asynchronous systems.

Testing the Process Manager

# tests/test_process_managers.py (example)


def test_fulfillment_happy_path():
    """Order confirmed → inventory reserved → shipment created → order completed."""
    # This test would use sync processing to verify the full chain
    pass


def test_fulfillment_compensation():
    """Shipment failed → inventory released → order cancelled."""
    # This test would verify compensation logic
    pass

Process managers are best tested through integration tests that verify the full event chain. In sync processing mode (domain.config["event_processing"] = "sync"), events are processed immediately, making it straightforward to assert the final state after triggering the initial event.

What We Built

  • A Shipping aggregate representing shipments with success and failure outcomes.
  • An OrderFulfillmentPM process manager that coordinates fulfillment across Order, Inventory, and Shipping.
  • Event correlation using correlate to route events to the correct PM instance.
  • Compensation logic that reverses reservations when shipping fails.
  • The saga pattern for long-running, multi-aggregate workflows.

In the next chapter, we will explore advanced query patterns for building a rich storefront.

Full Source

from protean import Domain, handle
from protean.fields import Float, Identifier, Integer, String
from protean.utils.globals import current_domain

domain = Domain()
domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"


@domain.value_object
class Money:
    currency: String(max_length=3, default="USD")
    amount: Float(required=True)


@domain.aggregate
class Order:
    customer_name: String(max_length=150, required=True)
    status: String(max_length=20, default="PENDING")
    shipping_address: String(max_length=500)

    def confirm(self):
        self.status = "CONFIRMED"
        self.raise_(OrderConfirmed(order_id=self.id, customer_name=self.customer_name))

    def complete(self):
        self.status = "COMPLETED"

    def cancel(self):
        self.status = "CANCELLED"


@domain.aggregate
class Inventory:
    book_id: Identifier(required=True)
    title: String(max_length=200, required=True)
    quantity: Integer(default=0)

    def reserve(self, amount: int):
        self.quantity -= amount
        self.raise_(InventoryReserved(book_id=self.book_id, quantity=amount))

    def release(self, amount: int):
        self.quantity += amount


@domain.aggregate
class Shipment:
    order_id: Identifier(required=True)
    status: String(max_length=20, default="PENDING")
    tracking_number: String(max_length=50)

    def create_shipment(self, address: str):
        """Attempt to create a shipment. May fail if address is invalid."""
        if not address or address.strip() == "":
            self.status = "FAILED"
            self.raise_(
                ShipmentFailed(order_id=self.order_id, reason="Invalid address")
            )
        else:
            self.status = "CREATED"
            self.tracking_number = f"TRK-{self.order_id[:8]}"
            self.raise_(
                ShipmentCreated(
                    order_id=self.order_id, tracking_number=self.tracking_number
                )
            )




# Events
@domain.event(part_of=Order)
class OrderConfirmed:
    order_id: Identifier(required=True)
    customer_name: String(max_length=150, required=True)


@domain.event(part_of=Inventory)
class InventoryReserved:
    book_id: Identifier(required=True)
    quantity: Integer(required=True)


@domain.event(part_of=Shipment)
class ShipmentCreated:
    order_id: Identifier(required=True)
    tracking_number: String(max_length=50)


@domain.event(part_of=Shipment)
class ShipmentFailed:
    order_id: Identifier(required=True)
    reason: String(max_length=500)


# Commands
@domain.command(part_of=Inventory)
class ReserveInventory:
    order_id: Identifier(required=True)
    book_id: Identifier(required=True)
    quantity: Integer(required=True)


@domain.command(part_of=Shipment)
class CreateShipment:
    order_id: Identifier(required=True)
    address: String(max_length=500)


@domain.command(part_of=Order)
class CompleteOrder:
    order_id: Identifier(required=True)


@domain.command(part_of=Order)
class CancelOrder:
    order_id: Identifier(required=True)


@domain.command(part_of=Inventory)
class ReleaseInventory:
    book_id: Identifier(required=True)
    quantity: Integer(required=True)


@domain.process_manager(stream_categories=["order", "inventory", "shipment"])
class OrderFulfillmentPM:
    """Coordinates the order fulfillment workflow across aggregates."""

    order_id: Identifier(required=True)

    @handle(OrderConfirmed, start=True, correlate="order_id")
    def on_order_confirmed(self, event: OrderConfirmed):
        """Step 1: Order confirmed — reserve inventory."""
        current_domain.process(
            ReserveInventory(
                order_id=event.order_id,
                book_id="placeholder",  # In a real system, load order items
                quantity=1,
            )
        )

    @handle(InventoryReserved, correlate={"order_id": "book_id"})
    def on_inventory_reserved(self, event: InventoryReserved):
        """Step 2: Inventory reserved — create shipment."""
        order = current_domain.repository_for(Order).get(self.order_id)
        current_domain.process(
            CreateShipment(
                order_id=self.order_id,
                address=order.shipping_address or "",
            )
        )

    @handle(ShipmentCreated, correlate="order_id")
    def on_shipment_created(self, event: ShipmentCreated):
        """Step 3: Shipment created — complete the order."""
        current_domain.process(CompleteOrder(order_id=event.order_id))
        self.mark_as_complete()

    @handle(ShipmentFailed, correlate="order_id")
    def on_shipment_failed(self, event: ShipmentFailed):
        """Compensation: Shipment failed — release inventory and cancel order."""
        current_domain.process(ReleaseInventory(book_id="placeholder", quantity=1))
        current_domain.process(CancelOrder(order_id=event.order_id))
        self.mark_as_complete()




domain.init(traverse=False)


# tests/test_process_managers.py (example)


def test_fulfillment_happy_path():
    """Order confirmed → inventory reserved → shipment created → order completed."""
    # This test would use sync processing to verify the full chain
    pass


def test_fulfillment_compensation():
    """Shipment failed → inventory released → order cancelled."""
    # This test would verify compensation logic
    pass

Next

Chapter 21: Advanced Query Patterns →