Chapter 13: Check Before You Ship — Domain Services
A customer just complained: they ordered five copies of a book, the order was confirmed, but only three were in stock. The current system confirms orders blindly — it never checks inventory. We need business logic that spans two aggregates (Order and Inventory), and that logic does not belong in either aggregate. It belongs in a domain service.
What Is a Domain Service?
A domain service is a stateless object that encapsulates business logic spanning two or more aggregates. Unlike aggregates, domain services:
- Have no identity and no lifecycle.
- Are invoked from command handlers or application services.
- Can run pre-invariants and post-invariants for validation.
- Are always associated with the aggregates they coordinate.
When to Use a Domain Service (vs. an Event Handler)
You might wonder: couldn't we just use an event handler to check inventory when an order is confirmed? The key difference is transactional consistency:
| Approach | Guarantees |
|---|---|
| Domain service | Synchronous, single transaction — inventory is checked before the order is confirmed. If stock is insufficient, the entire operation rolls back. |
| Event handler | Eventually consistent — the order is confirmed first, then the handler runs. If stock is insufficient, you need compensating actions. |
Use a domain service when the business rule says "this must not happen" — like confirming an order without sufficient stock. Use event handlers when the reaction can happen after the fact.
The Fulfillment Service
@domain.domain_service(part_of=[Order, Inventory])
class OrderFulfillmentService:
"""Validates inventory availability before confirming an order."""
def __init__(self, order, inventories):
super().__init__(order, *inventories)
self.order = order
self.inventories = inventories
@invariant.pre
def all_items_in_stock(self):
"""Check that every order item has sufficient inventory."""
inventory_by_title = {inv.title: inv for inv in self.inventories}
for item in self.order.items:
inv = inventory_by_title.get(item.book_title)
if inv is None:
raise ValidationError(
{"_entity": [f"No inventory record for '{item.book_title}'"]}
)
if inv.quantity < item.quantity:
raise ValidationError(
{
"_entity": [
f"Insufficient stock for '{item.book_title}': "
f"{inv.quantity} available, {item.quantity} requested"
]
}
)
def confirm_order(self):
"""Reserve inventory and confirm the order."""
inventory_by_title = {inv.title: inv for inv in self.inventories}
for item in self.order.items:
inv = inventory_by_title[item.book_title]
inv.reserve(item.quantity)
self.order.confirm()
return self.order
Let's break down how this works:
-
part_of=[Order, Inventory]— The service is associated with both aggregates. Protean requires domain services to declare which aggregates they coordinate. -
__init__— The constructor receives the aggregates and callssuper().__init__()with all of them. This is required for Protean to track the aggregates and run invariants. -
@invariant.pre— Theall_items_in_stockinvariant runs beforeconfirm_order()executes. If any item is out of stock, aValidationErroris raised and the order is never confirmed. Pre-invariants are the domain service's main value — they enforce cross-aggregate business rules atomically. -
confirm_order()— The domain method that performs the actual mutation: reserving inventory for each item and confirming the order.
Updating the Command Handler
The ConfirmOrder handler now loads inventory records and delegates to
the domain service:
@domain.command_handler(part_of=Order)
class OrderCommandHandler:
@handle(ConfirmOrder)
def confirm_order(self, command: ConfirmOrder) -> None:
repo = current_domain.repository_for(Order)
order = repo.get(command.order_id)
# Load inventory records for all items in the order
inv_repo = current_domain.repository_for(Inventory)
inventories = []
for item in order.items:
inv_results = inv_repo.query.filter(title=item.book_title).all()
if inv_results.items:
inventories.append(inv_results.items[0])
# Delegate to the domain service
service = OrderFulfillmentService(order, inventories)
service.confirm_order()
# Persist changes
repo.add(order)
for inv in inventories:
inv_repo.add(inv)
The handler's job is orchestration, not business logic:
- Load the order from the repository.
- Load the relevant inventory records.
- Pass everything to the domain service.
- Persist the mutated aggregates.
If the inventory check fails, the ValidationError propagates to the
API layer and returns a 400 Bad Request automatically (thanks to the
exception handlers we registered in Chapter 10).
Testing the Domain Service
Test both the happy path and the out-of-stock scenario:
# tests/test_domain_services.py (example tests)
def test_confirm_order_with_stock():
"""Order is confirmed when inventory is sufficient."""
inv = Inventory(book_id="book-1", title="Dune", quantity=10)
order = Order(
customer_name="Alice",
items=[
OrderItem(book_title="Dune", quantity=2, unit_price=Money(amount=15.99))
],
)
service = OrderFulfillmentService(order, [inv])
service.confirm_order()
assert order.status == "CONFIRMED"
assert inv.quantity == 8 # 10 - 2
def test_confirm_order_out_of_stock():
"""Order fails when inventory is insufficient."""
inv = Inventory(book_id="book-1", title="Dune", quantity=1)
order = Order(
customer_name="Alice",
items=[
OrderItem(book_title="Dune", quantity=5, unit_price=Money(amount=15.99))
],
)
try:
service = OrderFulfillmentService(order, [inv])
service.confirm_order()
assert False, "Should have raised ValidationError"
except ValidationError as e:
assert "Insufficient stock" in str(e.messages)
Notice that we test the domain service directly — no need for a full domain context or command processing. The service is a plain Python object that takes aggregates as input. This makes domain services easy to unit test.
What We Built
- An
OrderFulfillmentServicedomain service that validates inventory across aggregates before confirming an order. - A
@invariant.prethat enforces "all items must be in stock" as a cross-aggregate business rule. - An updated
ConfirmOrderhandler that delegates to the service. - Tests verifying both success and failure paths.
In the next chapter, we will integrate with an external book supplier using a subscriber.
Full Source
from protean import Domain, handle, invariant
from protean.fields import Float, HasMany, Identifier, Integer, String, ValueObject
from protean.utils.globals import current_domain
from protean.exceptions import ValidationError
domain = Domain("bookshelf")
domain.config["command_processing"] = "sync"
domain.config["event_processing"] = "sync"
@domain.value_object
class Money:
currency: String(max_length=3, default="USD")
amount: Float(required=True)
@domain.aggregate
class Inventory:
book_id: Identifier(required=True)
title: String(max_length=200, required=True)
quantity: Integer(default=0)
def reserve(self, amount: int):
if self.quantity < amount:
raise ValidationError(
{
"quantity": [
f"Insufficient stock: {self.quantity} available, {amount} requested"
]
}
)
self.quantity -= amount
@domain.aggregate
class Order:
customer_name: String(max_length=150, required=True)
status: String(max_length=20, default="PENDING")
items = HasMany("OrderItem")
def confirm(self):
self.status = "CONFIRMED"
@domain.entity(part_of=Order)
class OrderItem:
book_title: String(max_length=200, required=True)
quantity: Integer(required=True)
unit_price = ValueObject(Money)
@domain.domain_service(part_of=[Order, Inventory])
class OrderFulfillmentService:
"""Validates inventory availability before confirming an order."""
def __init__(self, order, inventories):
super().__init__(order, *inventories)
self.order = order
self.inventories = inventories
@invariant.pre
def all_items_in_stock(self):
"""Check that every order item has sufficient inventory."""
inventory_by_title = {inv.title: inv for inv in self.inventories}
for item in self.order.items:
inv = inventory_by_title.get(item.book_title)
if inv is None:
raise ValidationError(
{"_entity": [f"No inventory record for '{item.book_title}'"]}
)
if inv.quantity < item.quantity:
raise ValidationError(
{
"_entity": [
f"Insufficient stock for '{item.book_title}': "
f"{inv.quantity} available, {item.quantity} requested"
]
}
)
def confirm_order(self):
"""Reserve inventory and confirm the order."""
inventory_by_title = {inv.title: inv for inv in self.inventories}
for item in self.order.items:
inv = inventory_by_title[item.book_title]
inv.reserve(item.quantity)
self.order.confirm()
return self.order
@domain.command(part_of=Order)
class ConfirmOrder:
order_id: Identifier(required=True)
@domain.command_handler(part_of=Order)
class OrderCommandHandler:
@handle(ConfirmOrder)
def confirm_order(self, command: ConfirmOrder) -> None:
repo = current_domain.repository_for(Order)
order = repo.get(command.order_id)
# Load inventory records for all items in the order
inv_repo = current_domain.repository_for(Inventory)
inventories = []
for item in order.items:
inv_results = inv_repo.query.filter(title=item.book_title).all()
if inv_results.items:
inventories.append(inv_results.items[0])
# Delegate to the domain service
service = OrderFulfillmentService(order, inventories)
service.confirm_order()
# Persist changes
repo.add(order)
for inv in inventories:
inv_repo.add(inv)
domain.init(traverse=False)
# tests/test_domain_services.py (example tests)
def test_confirm_order_with_stock():
"""Order is confirmed when inventory is sufficient."""
inv = Inventory(book_id="book-1", title="Dune", quantity=10)
order = Order(
customer_name="Alice",
items=[
OrderItem(book_title="Dune", quantity=2, unit_price=Money(amount=15.99))
],
)
service = OrderFulfillmentService(order, [inv])
service.confirm_order()
assert order.status == "CONFIRMED"
assert inv.quantity == 8 # 10 - 2
def test_confirm_order_out_of_stock():
"""Order fails when inventory is insufficient."""
inv = Inventory(book_id="book-1", title="Dune", quantity=1)
order = Order(
customer_name="Alice",
items=[
OrderItem(book_title="Dune", quantity=5, unit_price=Money(amount=15.99))
],
)
try:
service = OrderFulfillmentService(order, [inv])
service.confirm_order()
assert False, "Should have raised ValidationError"
except ValidationError as e:
assert "Insufficient stock" in str(e.messages)