Skip to content

Chapter 13: Check Before You Ship — Domain Services

A customer just complained: they ordered five copies of a book, the order was confirmed, but only three were in stock. The current system confirms orders blindly — it never checks inventory. We need business logic that spans two aggregates (Order and Inventory), and that logic does not belong in either aggregate. It belongs in a domain service.

What Is a Domain Service?

A domain service is a stateless object that encapsulates business logic spanning two or more aggregates. Unlike aggregates, domain services:

  • Have no identity and no lifecycle.
  • Are invoked from command handlers or application services.
  • Can run pre-invariants and post-invariants for validation.
  • Are always associated with the aggregates they coordinate.

When to Use a Domain Service (vs. an Event Handler)

You might wonder: couldn't we just use an event handler to check inventory when an order is confirmed? The key difference is transactional consistency:

Approach Guarantees
Domain service Synchronous, single transaction — inventory is checked before the order is confirmed. If stock is insufficient, the entire operation rolls back.
Event handler Eventually consistent — the order is confirmed first, then the handler runs. If stock is insufficient, you need compensating actions.

Use a domain service when the business rule says "this must not happen" — like confirming an order without sufficient stock. Use event handlers when the reaction can happen after the fact.

The Fulfillment Service

@domain.domain_service(part_of=[Order, Inventory])
class OrderFulfillmentService:
    """Validates inventory availability before confirming an order."""

    def __init__(self, order, inventories):
        super().__init__(order, *inventories)
        self.order = order
        self.inventories = inventories

    @invariant.pre
    def all_items_in_stock(self):
        """Check that every order item has sufficient inventory."""
        inventory_by_title = {inv.title: inv for inv in self.inventories}

        for item in self.order.items:
            inv = inventory_by_title.get(item.book_title)
            if inv is None:
                raise ValidationError(
                    {"_entity": [f"No inventory record for '{item.book_title}'"]}
                )
            if inv.quantity < item.quantity:
                raise ValidationError(
                    {
                        "_entity": [
                            f"Insufficient stock for '{item.book_title}': "
                            f"{inv.quantity} available, {item.quantity} requested"
                        ]
                    }
                )

    def confirm_order(self):
        """Reserve inventory and confirm the order."""
        inventory_by_title = {inv.title: inv for inv in self.inventories}

        for item in self.order.items:
            inv = inventory_by_title[item.book_title]
            inv.reserve(item.quantity)

        self.order.confirm()
        return self.order

Let's break down how this works:

  1. part_of=[Order, Inventory] — The service is associated with both aggregates. Protean requires domain services to declare which aggregates they coordinate.

  2. __init__ — The constructor receives the aggregates and calls super().__init__() with all of them. This is required for Protean to track the aggregates and run invariants.

  3. @invariant.pre — The all_items_in_stock invariant runs before confirm_order() executes. If any item is out of stock, a ValidationError is raised and the order is never confirmed. Pre-invariants are the domain service's main value — they enforce cross-aggregate business rules atomically.

  4. confirm_order() — The domain method that performs the actual mutation: reserving inventory for each item and confirming the order.

Updating the Command Handler

The ConfirmOrder handler now loads inventory records and delegates to the domain service:

@domain.command_handler(part_of=Order)
class OrderCommandHandler:
    @handle(ConfirmOrder)
    def confirm_order(self, command: ConfirmOrder) -> None:
        repo = current_domain.repository_for(Order)
        order = repo.get(command.order_id)

        # Load inventory records for all items in the order
        inv_repo = current_domain.repository_for(Inventory)
        inventories = []
        for item in order.items:
            inv_results = inv_repo.query.filter(title=item.book_title).all()
            if inv_results.items:
                inventories.append(inv_results.items[0])

        # Delegate to the domain service
        service = OrderFulfillmentService(order, inventories)
        service.confirm_order()

        # Persist changes
        repo.add(order)
        for inv in inventories:
            inv_repo.add(inv)

The handler's job is orchestration, not business logic:

  1. Load the order from the repository.
  2. Load the relevant inventory records.
  3. Pass everything to the domain service.
  4. Persist the mutated aggregates.

If the inventory check fails, the ValidationError propagates to the API layer and returns a 400 Bad Request automatically (thanks to the exception handlers we registered in Chapter 10).

Testing the Domain Service

Test both the happy path and the out-of-stock scenario:

# tests/test_domain_services.py (example tests)


def test_confirm_order_with_stock():
    """Order is confirmed when inventory is sufficient."""
    inv = Inventory(book_id="book-1", title="Dune", quantity=10)
    order = Order(
        customer_name="Alice",
        items=[
            OrderItem(book_title="Dune", quantity=2, unit_price=Money(amount=15.99))
        ],
    )

    service = OrderFulfillmentService(order, [inv])
    service.confirm_order()

    assert order.status == "CONFIRMED"
    assert inv.quantity == 8  # 10 - 2


def test_confirm_order_out_of_stock():
    """Order fails when inventory is insufficient."""
    inv = Inventory(book_id="book-1", title="Dune", quantity=1)
    order = Order(
        customer_name="Alice",
        items=[
            OrderItem(book_title="Dune", quantity=5, unit_price=Money(amount=15.99))
        ],
    )

    try:
        service = OrderFulfillmentService(order, [inv])
        service.confirm_order()
        assert False, "Should have raised ValidationError"
    except ValidationError as e:
        assert "Insufficient stock" in str(e.messages)

Notice that we test the domain service directly — no need for a full domain context or command processing. The service is a plain Python object that takes aggregates as input. This makes domain services easy to unit test.

What We Built

  • An OrderFulfillmentService domain service that validates inventory across aggregates before confirming an order.
  • A @invariant.pre that enforces "all items must be in stock" as a cross-aggregate business rule.
  • An updated ConfirmOrder handler that delegates to the service.
  • Tests verifying both success and failure paths.

In the next chapter, we will integrate with an external book supplier using a subscriber.

Full Source

from protean import Domain, handle, invariant
from protean.fields import Float, HasMany, Identifier, Integer, String, ValueObject
from protean.utils.globals import current_domain
from protean.exceptions import ValidationError

domain = Domain("bookshelf")
domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"


@domain.value_object
class Money:
    currency: String(max_length=3, default="USD")
    amount: Float(required=True)


@domain.aggregate
class Inventory:
    book_id: Identifier(required=True)
    title: String(max_length=200, required=True)
    quantity: Integer(default=0)

    def reserve(self, amount: int):
        if self.quantity < amount:
            raise ValidationError(
                {
                    "quantity": [
                        f"Insufficient stock: {self.quantity} available, {amount} requested"
                    ]
                }
            )
        self.quantity -= amount


@domain.aggregate
class Order:
    customer_name: String(max_length=150, required=True)
    status: String(max_length=20, default="PENDING")
    items = HasMany("OrderItem")

    def confirm(self):
        self.status = "CONFIRMED"


@domain.entity(part_of=Order)
class OrderItem:
    book_title: String(max_length=200, required=True)
    quantity: Integer(required=True)
    unit_price = ValueObject(Money)


@domain.domain_service(part_of=[Order, Inventory])
class OrderFulfillmentService:
    """Validates inventory availability before confirming an order."""

    def __init__(self, order, inventories):
        super().__init__(order, *inventories)
        self.order = order
        self.inventories = inventories

    @invariant.pre
    def all_items_in_stock(self):
        """Check that every order item has sufficient inventory."""
        inventory_by_title = {inv.title: inv for inv in self.inventories}

        for item in self.order.items:
            inv = inventory_by_title.get(item.book_title)
            if inv is None:
                raise ValidationError(
                    {"_entity": [f"No inventory record for '{item.book_title}'"]}
                )
            if inv.quantity < item.quantity:
                raise ValidationError(
                    {
                        "_entity": [
                            f"Insufficient stock for '{item.book_title}': "
                            f"{inv.quantity} available, {item.quantity} requested"
                        ]
                    }
                )

    def confirm_order(self):
        """Reserve inventory and confirm the order."""
        inventory_by_title = {inv.title: inv for inv in self.inventories}

        for item in self.order.items:
            inv = inventory_by_title[item.book_title]
            inv.reserve(item.quantity)

        self.order.confirm()
        return self.order




@domain.command(part_of=Order)
class ConfirmOrder:
    order_id: Identifier(required=True)


@domain.command_handler(part_of=Order)
class OrderCommandHandler:
    @handle(ConfirmOrder)
    def confirm_order(self, command: ConfirmOrder) -> None:
        repo = current_domain.repository_for(Order)
        order = repo.get(command.order_id)

        # Load inventory records for all items in the order
        inv_repo = current_domain.repository_for(Inventory)
        inventories = []
        for item in order.items:
            inv_results = inv_repo.query.filter(title=item.book_title).all()
            if inv_results.items:
                inventories.append(inv_results.items[0])

        # Delegate to the domain service
        service = OrderFulfillmentService(order, inventories)
        service.confirm_order()

        # Persist changes
        repo.add(order)
        for inv in inventories:
            inv_repo.add(inv)




domain.init(traverse=False)


# tests/test_domain_services.py (example tests)


def test_confirm_order_with_stock():
    """Order is confirmed when inventory is sufficient."""
    inv = Inventory(book_id="book-1", title="Dune", quantity=10)
    order = Order(
        customer_name="Alice",
        items=[
            OrderItem(book_title="Dune", quantity=2, unit_price=Money(amount=15.99))
        ],
    )

    service = OrderFulfillmentService(order, [inv])
    service.confirm_order()

    assert order.status == "CONFIRMED"
    assert inv.quantity == 8  # 10 - 2


def test_confirm_order_out_of_stock():
    """Order fails when inventory is insufficient."""
    inv = Inventory(book_id="book-1", title="Dune", quantity=1)
    order = Order(
        customer_name="Alice",
        items=[
            OrderItem(book_title="Dune", quantity=5, unit_price=Money(amount=15.99))
        ],
    )

    try:
        service = OrderFulfillmentService(order, [inv])
        service.confirm_order()
        assert False, "Should have raised ValidationError"
    except ValidationError as e:
        assert "Insufficient stock" in str(e.messages)

Next

Chapter 14: Connecting to the Outside World →