Skip to content

Domain Services

DDD CQRS ES

Some business operations naturally span two or more aggregates. For example, placing an order requires confirming the order and reserving inventory — two aggregates that must be validated together. If you put this logic in the Order aggregate, it needs to know about Inventory; if you put it in the command handler, business rules leak into the application layer. Domain services solve this: they encapsulate cross-aggregate business logic in the domain layer, where it belongs.

For background on when and why to use domain services, see Domain Services concept.

Persist one aggregate, let events handle the rest

Even though a domain service mutates multiple aggregates to validate invariants, the command handler invoking the service should only persist one aggregate. The other aggregate's state will be updated eventually through domain events. This preserves the one aggregate per transaction rule.

Defining a Domain Service

A Domain Service is defined with the Domain.domain_service decorator, and associated with at least two aggregates with the part_of option. The part_of option is required and must be a list of two or more aggregates — a domain service that operates on a single aggregate should be logic on the aggregate itself.

The service methods in a Domain Service can be structured in three flavors:

1. Class with class methods

If you don't have any invariants to be managed by the Domain Service, each method in the Domain Service can simply be a class method, that receives all the input necessary for performing the business function.

@domain.domain_service(part_of=[Order, Inventory])
class OrderPlacementService:
    @classmethod
    def place_order(cls, order, inventories):
        for item in order.items:
            inventory = next(
                (i for i in inventories if i.product_id == item.product_id), None
            )
            inventory.reserve_stock(item.quantity)

        order.confirm()

Invoking it is straightforward:

OrderPlacementService.place_order(order, inventories)

Note

The class-method flavor cannot have invariants, because invariants require an instance with stored state to validate against. If you need pre/post invariants, use the instance-method or callable-class flavor.

2. Class with instance methods

In this flavor, the Domain Service is instantiated with the aggregates and each method performs a distinct business function.

@domain.domain_service(part_of=[Order, Inventory])
class OrderPlacementService:
    def __init__(self, order, inventories):
        super().__init__(order, inventories)

        self.order = order
        self.inventories = inventories

    def place_order(self):
        for item in self.order.items:
            inventory = next(
                (i for i in self.inventories if i.product_id == item.product_id), None
            )
            inventory.reserve_stock(item.quantity)

        self.order.confirm()

    @invariant.pre
    def inventory_should_have_sufficient_stock(self):
        for item in self.order.items:
            inventory = next(
                (i for i in self.inventories if i.product_id == item.product_id), None
            )
            if inventory is None or inventory.quantity < item.quantity:
                raise ValidationError({"_service": ["Product is out of stock"]})

You would then instantiate the Domain Service, passing the relevant aggregates and invoke the methods on the instance.

service = OrderPlacementService(order, inventories)
service.place_order()

3. Callable class

If you have a single business function, you can simply model it as a callable class:

@domain.domain_service(part_of=[Order, Inventory])
class place_order:
    def __init__(self, order, inventories):
        super().__init__(order, inventories)

        self.order = order
        self.inventories = inventories

    def __call__(self):
        for item in self.order.items:
            inventory = next(
                (i for i in self.inventories if i.product_id == item.product_id), None
            )
            inventory.reserve_stock(item.quantity)

        self.order.confirm()

    @invariant.pre
    def inventory_should_have_sufficient_stock(self):
        for item in self.order.items:
            inventory = next(
                (i for i in self.inventories if i.product_id == item.product_id), None
            )
            if inventory is None or inventory.quantity < item.quantity:
service = place_order(order, inventories)
service()

Deciding between flavors

Criterion Class methods Instance methods Callable class
Invariants needed No Yes Yes
Multiple operations Yes Yes No (single __call__)
Simplest for single operation Yes

The decision between instance methods and a callable class boils down to:

  1. How many business functions does the Domain Service have? If only one, a callable class is more elegant.
  2. Do you have pre invariants that only apply to specific methods? Then construct each method as a separate callable class. If invariants apply to all methods, a class with instance methods is preferable.

As your domain model matures, review regularly and decide on the best way to model the Domain Service.

Note

Invariants only wrap public methods and __call__ — they skip dunder methods (other than __call__) and private methods (prefixed with _). If you encounter a RecursionError: maximum recursion depth exceeded, it is likely that a public method is calling another public method on the same instance. Extract the shared logic into a private method (prefixed with _) to break the cycle.

Domain Service vs Application Service vs Command Handler

These three constructs coordinate behavior, but at different levels:

Aspect Domain Service Application Service Command Handler
Contains business logic Yes — cross-aggregate rules No — orchestration only No — orchestration only
Operates on 2+ aggregates 1 aggregate 1 aggregate
Has invariants Yes (@invariant.pre/.post) No No
Invoked by Command handlers, app services External callers (API, CLI) domain.process(command)
Returns values Optional Yes (synchronous) No (fire-and-forget)

For detailed guidance, see the Application Service vs Command Handler pattern.

Typical Workflow

Let us consider an example OrderPlacementService that places an order and updates inventory stocks simultaneously. The typical workflow of a Domain Service is below:

sequenceDiagram
  autonumber
  Command Handler->>Repository: Fetch order and product inventories
  Repository-->>Command Handler: order and inventories
  Command Handler->>Domain Service: Invoke operation
  Domain Service->>Domain Service: Validate (pre-invariants)
  Domain Service->>Domain Service: Mutate aggregates
  Domain Service->>Domain Service: Validate (post-invariants)
  Domain Service-->>Command Handler: Done
  Command Handler->>Repository: Persist order (one aggregate)

The handler loads the necessary aggregates through repositories, invokes the domain service to perform the cross-aggregate operation, and then persists only the primary aggregate. The other aggregate's state changes propagate through domain events.

Handler Integration Example

@domain.command_handler(part_of=Order)
class OrderCommandHandler:
    @handle(PlaceOrder)
    def handle_place_order(self, command: PlaceOrder):
        order_repo = current_domain.repository_for(Order)
        inventory_repo = current_domain.repository_for(Inventory)

        # Load aggregates
        order = order_repo.get(command.order_id)
        inventories = [
            inventory_repo.get(item.product_id)
            for item in order.items
        ]

        # Invoke domain service — invariants run here
        service = place_order(order, inventories)
        service()

        # Persist only the primary aggregate
        order_repo.add(order)

Invariants

Just like Aggregates and Entities, Domain Services can also have invariants. These invariants validate the state of the aggregates passed to the service method. Unlike in Aggregates, invariants in Domain Services typically deal with validations that span across multiple aggregates.

pre invariants check the state of the aggregates before they are mutated, while post invariants check the state after the mutation. When a pre-invariant fails, a ValidationError is raised and the mutation never happens.

Note

It is a good practice to step back and review the business logic placed in the Domain Service now and then. If an invariant does not use multiple aggregates, it is likely that it belongs within an aggregate and not in the service.

A Full-Blown Example

from datetime import datetime, timezone
from enum import Enum

from protean import Domain, invariant
from protean.exceptions import ValidationError
from protean.fields import (
    DateTime,
    Float,
    HasMany,
    Identifier,
    Integer,
    String,
    ValueObject,
)

domain = Domain()


class OrderStatus(Enum):
    PENDING = "PENDING"
    CONFIRMED = "CONFIRMED"
    SHIPPED = "SHIPPED"
    DELIVERED = "DELIVERED"


@domain.aggregate
class Order:
    customer_id: Identifier(required=True)
    items = HasMany("OrderItem")
    status: String(choices=OrderStatus, default=OrderStatus.PENDING.value)
    payment_id: Identifier()

    @invariant.post
    def order_should_contain_items(self):
        if not self.items or len(self.items) == 0:
            raise ValidationError({"_entity": ["Order must contain at least one item"]})

    def confirm(self):
        self.status = OrderStatus.CONFIRMED.value
        self.raise_(
            OrderConfirmed(order_id=self.id, confirmed_at=datetime.now(timezone.utc))
        )


@domain.event(part_of=Order)
class OrderConfirmed:
    order_id: Identifier(required=True)
    confirmed_at: DateTime(required=True)


@domain.entity(part_of=Order)
class OrderItem:
    product_id: Identifier(required=True)
    quantity: Integer()
    price: Float()


@domain.value_object(part_of="Inventory")
class Warehouse:
    location: String()
    contact: String()


@domain.event(part_of="Inventory")
class StockReserved:
    product_id: Identifier(required=True)
    quantity: Integer(required=True)
    reserved_at: DateTime(required=True)


@domain.aggregate
class Inventory:
    product_id: Identifier(required=True)
    quantity: Integer()
    warehouse = ValueObject(Warehouse)

    def reserve_stock(self, quantity: int):
        self.quantity -= quantity
        self.raise_(
            StockReserved(
                product_id=self.product_id,
                quantity=quantity,
                reserved_at=datetime.now(timezone.utc),
            )
        )


@domain.domain_service(part_of=[Order, Inventory])
class place_order:
    def __init__(self, order, inventories):
        super().__init__(order, inventories)

        self.order = order
        self.inventories = inventories

    def __call__(self):
        for item in self.order.items:
            inventory = next(
                (i for i in self.inventories if i.product_id == item.product_id), None
            )
            inventory.reserve_stock(item.quantity)

        self.order.confirm()

    @invariant.pre
    def inventory_should_have_sufficient_stock(self):
        for item in self.order.items:
            inventory = next(
                (i for i in self.inventories if i.product_id == item.product_id), None
            )
            if inventory is None or inventory.quantity < item.quantity:
                raise ValidationError({"_service": ["Product is out of stock"]})

    @invariant.pre
    def order_payment_method_should_be_valid(self):
        if not self.order.payment_id:
            raise ValidationError(
                {"_service": ["Order must have a valid payment method"]}
            )

    @invariant.post
    def total_reserved_value_should_match_order_value(self):
        order_total = sum(item.quantity * item.price for item in self.order.items)
        reserved_total = 0
        for item in self.order.items:
            inventory = next(
                (i for i in self.inventories if i.product_id == item.product_id), None
            )
            if inventory:
                reserved_total += inventory._events[0].quantity * item.price

        if order_total != reserved_total:
            raise ValidationError(
                {"_service": ["Total reserved value does not match order value"]}
            )

    @invariant.post
    def total_quantity_reserved_should_match_order_quantity(self):
        order_quantity = sum(item.quantity for item in self.order.items)
        reserved_quantity = sum(
            inventory._events[0].quantity
            for inventory in self.inventories
            if inventory._events
        )

        if order_quantity != reserved_quantity:
            raise ValidationError(
                {"_service": ["Total reserved quantity does not match order quantity"]}
            )

When an order is placed, Order status has to be CONFIRMED and the stock record of each product in Inventory has to be reduced.

This change could be performed with events, with Order generating an event and Inventory aggregate consuming the event and updating its records. But there is a possibility of encountering stock-depleted issues if multiple orders are placed at the same time.

So a Domain Service works best here because it validates the states of both the Order aggregate and the Inventory aggregate together, enforcing cross-aggregate invariants before any mutation occurs.


See also

Concept overview: Domain Services — When and why to use domain services for cross-aggregate business logic.

Related guides:

Patterns: