Domain Services
Domain services act as orchestrators, centralizing complex domain logic that doesn't neatly fit within an entity or aggregate. They encapsulate business rules and domain decisions that need multiple aggregates as input.
Domain services free us from having to shoehorn business logic into aggregate clusters. This keeps domain objects focused on their core state and behavior, while domain services handle the broader workflows and complex interactions within the domain.
Even though Domain services can access multiple aggregates, they are not meant to propagate state changes in more than one aggregate. A combination of Application Services, Events, and eventual consistency sync aggregates when a transaction spans beyond an aggregate's boundary. We will discuss these aspects more thoroughly in the Application Layer section.
Key Facts
- Stateless: Domain services are stateless - they don’t hold any internal state between method calls. They operate on the state provided to them through their method parameters.
- Encapsulate Business Logic: Domain Services encapsulate complex business logic or operations that involve multiple aggregate, specifically, logic that doesn't naturally fit within any single aggregate.
- Pure Domain Concepts: Domain services focus purely on domain logic and cannot handle technical aspects like persistence or messaging, though they mutate aggregates to a state that they is ready to be persisted. Technical concerns are typically handled by invoking services like application services or command/event handlers.
Defining a Domain Service
A Domain Service is defined with the Domain.domain_service
decorator, and
associated with at least two aggregates with the part_of
option.
The service methods in a Domain Service can be structured in three flavors:
1. Class with class methods
If you don't have any invariants to be managed by the Domain Service, each method in the Domain Service can simply be a class method, that receives all the input necessary for performing the business function.
@domain.domain_service(part_of=[Order, Inventory])
class OrderPlacementService:
@classmethod
def place_order(self, order, inventories):
for item in self.order.items:
inventory = next(
(i for i in self.inventories if i.product_id == item.product_id), None
)
inventory.reserve_stock(item.quantity)
self.order.confirm()
Invoking it is straight forward:
OrderPlacementService.place_order(order, inventories)
2. Class with instance methods
In this flavor, the Domain Service is instantiated with the aggregates and each method performs a distinct business function.
@domain.domain_service(part_of=[Order, Inventory])
class OrderPlacementService:
def __init__(self, order, inventories):
super().__init__(*(order, inventories))
self.order = order
self.inventories = inventories
def place_order(self):
for item in self.order.items:
inventory = next(
(i for i in self.inventories if i.product_id == item.product_id), None
)
inventory.reserve_stock(item.quantity)
self.order.confirm()
@invariant.pre
def inventory_should_have_sufficient_stock(self):
for item in self.order.items:
inventory = next(
(i for i in self.inventories if i.product_id == item.product_id), None
)
if inventory is None or inventory.quantity < item.quantity:
raise ValidationError({"_service": ["Product is out of stock"]})
You would then instantiate the Domain Service, passing the relevant aggregates and invoke the methods on the instance.
service = OrderPlacementService(order, inventories)
service.place_order()
3. Callable class
If you have a single business function, you can simply model it as a callable class:
@domain.domain_service(part_of=[Order, Inventory])
class place_order:
def __init__(self, order, inventories):
super().__init__(*(order, inventories))
self.order = order
self.inventories = inventories
def __call__(self):
for item in self.order.items:
inventory = next(
(i for i in self.inventories if i.product_id == item.product_id), None
)
inventory.reserve_stock(item.quantity)
self.order.confirm()
@invariant.pre
def inventory_should_have_sufficient_stock(self):
for item in self.order.items:
inventory = next(
(i for i in self.inventories if i.product_id == item.product_id), None
)
if inventory is None or inventory.quantity < item.quantity:
raise ValidationError({"_service": ["Product is out of stock"]})
service = OrderPlacementService(order, inventories)
service()
Deciding between different flavors
The decision between a class with instance methods and a callable class boils
down to two factors:
1. How many business functions does the Domain Service have? If it has only one,
then a callable class is more elegant.
2. Do you have pre
invariants that only apply to specific methods? Then it
makes sense to construct each method as a separate callable class. If your
invariant methods apply to all methods, then a class with instance methods is
preferable.
As usual, you will probably have not enough insight to take this decision upfront. As your domain model matures, review regularly and decide on the best way to model the Domain Service.
Note
You can include private methods in a Domain Service class by prefixing the
method name with an underscore. If you encounter a RecursionError:
maximum recursion depth exceeded
error, it is likely that a domain method
is calling a private method, but the private method name is not prefixed
with an underscore.
Typical Workflow
Let us consider an example OrderPlacementService
that places an order and
updates inventory stocks simultaneously. The typical workflow of a Domain
Service is below:
sequenceDiagram
autonumber
Application Service->>Repository: Fetch order and product invehtories
Repository-->>Application Service: order and inventories
Application Service->>Domain Service: Invoke operation
Domain Service->>Domain Service: Mutate aggregates
Domain Service-->>Application Service:
Application Service->>Repository: Persist aggregates
The application service loads the necessary aggregates through repositories, and invokes the service method to place order. The service method executes the business logic, mutates the aggregates, and returns them to the application service, which then persists them again with the help of repositories.
Invariants
Just like Aggregates and Entities, Domain Services can also have invariants. These invariants are used to validate the state of the aggregates passed to the service method. Unlike in Aggregates though, invariants in Domain Services typically deal with validations that span across multiple aggregates.
pre
invariants check the state of the aggregates before they are mutated,
while post
invariants check the state after the mutation.
Note
It is a good practice to step back and review the business logic placed in the Domain Service now and then. If an invariant does not use multiple aggregates, it is likely that it belongs within an aggregate and not in the service.
A full-blown example
from datetime import datetime, timezone
from enum import Enum
from protean import Domain, invariant
from protean.exceptions import ValidationError
from protean.fields import (
DateTime,
Float,
HasMany,
Identifier,
Integer,
String,
ValueObject,
)
domain = Domain(__file__, load_toml=False)
class OrderStatus(Enum):
PENDING = "PENDING"
CONFIRMED = "CONFIRMED"
SHIPPED = "SHIPPED"
DELIVERED = "DELIVERED"
@domain.event(part_of="Order")
class OrderConfirmed:
order_id = Identifier(required=True)
confirmed_at = DateTime(required=True)
@domain.aggregate
class Order:
customer_id = Identifier(required=True)
items = HasMany("OrderItem")
status = String(choices=OrderStatus, default=OrderStatus.PENDING.value)
payment_id = Identifier()
@invariant.post
def order_should_contain_items(self):
if not self.items or len(self.items) == 0:
raise ValidationError({"_entity": ["Order must contain at least one item"]})
def confirm(self):
self.status = OrderStatus.CONFIRMED.value
self.raise_(
OrderConfirmed(order_id=self.id, confirmed_at=datetime.now(timezone.utc))
)
@domain.entity(part_of=Order)
class OrderItem:
product_id = Identifier(required=True)
quantity = Integer()
price = Float()
@domain.value_object(part_of="Inventory")
class Warehouse:
location = String()
contact = String()
@domain.event(part_of="Inventory")
class StockReserved:
product_id = Identifier(required=True)
quantity = Integer(required=True)
reserved_at = DateTime(required=True)
@domain.aggregate
class Inventory:
product_id = Identifier(required=True)
quantity = Integer()
warehouse = ValueObject(Warehouse)
def reserve_stock(self, quantity: int):
self.quantity -= quantity
self.raise_(
StockReserved(
product_id=self.product_id,
quantity=quantity,
reserved_at=datetime.now(timezone.utc),
)
)
@domain.domain_service(part_of=[Order, Inventory])
class place_order:
def __init__(self, order, inventories):
super().__init__(*(order, inventories))
self.order = order
self.inventories = inventories
def __call__(self):
for item in self.order.items:
inventory = next(
(i for i in self.inventories if i.product_id == item.product_id), None
)
inventory.reserve_stock(item.quantity)
self.order.confirm()
@invariant.pre
def inventory_should_have_sufficient_stock(self):
for item in self.order.items:
inventory = next(
(i for i in self.inventories if i.product_id == item.product_id), None
)
if inventory is None or inventory.quantity < item.quantity:
raise ValidationError({"_service": ["Product is out of stock"]})
@invariant.pre
def order_payment_method_should_be_valid(self):
if not self.order.payment_id:
raise ValidationError(
{"_service": ["Order must have a valid payment method"]}
)
@invariant.post
def total_reserved_value_should_match_order_value(self):
order_total = sum(item.quantity * item.price for item in self.order.items)
reserved_total = 0
for item in self.order.items:
inventory = next(
(i for i in self.inventories if i.product_id == item.product_id), None
)
if inventory:
reserved_total += inventory._events[0].quantity * item.price
if order_total != reserved_total:
raise ValidationError(
{"_service": ["Total reserved value does not match order value"]}
)
@invariant.post
def total_quantity_reserved_should_match_order_quantity(self):
order_quantity = sum(item.quantity for item in self.order.items)
reserved_quantity = sum(
inventory._events[0].quantity
for inventory in self.inventories
if inventory._events
)
if order_quantity != reserved_quantity:
raise ValidationError(
{"_service": ["Total reserved quantity does not match order quantity"]}
)
When an order is placed, Order
status has to be CONFIRMED
and the stock
record of each product in Inventory
has to be reduced.
This change could be performed with events, with Order
generating an event
and Inventory
aggregate consuming the event and updating its records. But
there is a possibility of encountering stock depleted issues if multiple
orders are placed at the same time.
So a Domain Service works best here because it updates the states of both
the Order
aggregate as well as the Inventory
aggregate in a single
transaction.
IMPORTANT: Even though the inventory aggregate is mutated here to ensure
all invariants are satisified, the Command Handler method invoking the Domain
Service should only persist the Order
aggregate. The Inventory
aggregate
will eventually be updated through the domain event OrderConfirmed
.