Skip to content

Command Handlers

Command handlers are responsible for executing commands and persisting system state. They typically interact with aggregate roots to perform the required operations, ensuring that all business rules and invariants are upheld.

Key Facts

  • Command Handlers extract relevant data from a command and invoke the appropriate aggregate method with the necessary parameters.
  • Command Handlers are responsible for hydrating (loading) and persisting aggregates.
  • A Command Handler method can hydrate more than one aggregate at a time, depending on the business process requirements, but it should always persist one aggregate root. Other aggregates should be synced eventually through domain events.

Defining a Command Handler

Command Handlers are defined with the Domain.command_handler decorator:

from datetime import datetime, timezone
from enum import Enum

from protean import Domain, handle
from protean.fields import DateTime, Identifier, String
from protean.utils.globals import current_domain

publishing = Domain(__file__, "Publishing", load_toml=False)


def utc_now():
    return datetime.now(timezone.utc)


class ArticleStatus(Enum):
    DRAFT = "DRAFT"
    PUBLISHED = "PUBLISHED"


@publishing.command(part_of="Article")
class PublishArticle:
    article_id = Identifier(required=True)
    published_at = DateTime(default=utc_now)


@publishing.event(part_of="Article")
class ArticlePublished:
    article_id = Identifier(required=True)
    published_at = DateTime()


@publishing.aggregate
class Article:
    article_id = Identifier(required=True)
    status = String(choices=ArticleStatus, default=ArticleStatus.DRAFT.value)
    published_at = DateTime(default=utc_now)

    def publish(self, published_at: DateTime) -> None:
        self.status = ArticleStatus.PUBLISHED.value
        self.published_at = published_at

        self.raise_(
            ArticlePublished(article_id=self.article_id, published_at=published_at)
        )


@publishing.command_handler(part_of=Article)
class ArticleCommandHandler:
    @handle(PublishArticle)
    def publish_article(self, command):
        article = current_domain.repository_for(Article).get(command.article_id)
        article.publish()
        current_domain.repository_for(Article).add(article)

Workflow

sequenceDiagram
  autonumber
  App->>Command Handler: Command object
  Command Handler->>Command Handler: Extract data and Load aggregate
  Command Handler->>Aggregate: Invoke method
  Aggregate->>Aggregate: Mutate
  Aggregate-->>Command Handler: 
  Command Handler->>Command Handler: Persist aggregate
  1. Domain Sends Command Object to Command Handler: The domain layer initiates the process by sending a command object to the command handler. This command object encapsulates the intent to perform a specific action or operation within the domain.

  2. Command Handler Loads Aggregate: Upon receiving the command object, the command handler begins by loading the necessary aggregate from the repository or data store. The aggregate is the key entity that will be acted upon based on the command.

  3. Command Handler Extracts Data and Invokes Aggregate Method: The command handler extracts the relevant data from the command object and invokes the appropriate method on the aggregate. This method call triggers the aggregate to perform the specified operation.

  4. Aggregate Mutates: Within the aggregate, the invoked method processes the data and performs the necessary business logic, resulting in a change (mutation) of the aggregate's state. This ensures that the operation adheres to the business rules and maintains consistency.

  5. Aggregate Responds to Command Handler: After mutating its state, the aggregate completes its operation and returns control to the command handler. The response may include confirmation of the successful operation or any relevant data resulting from the mutation.

  6. Command Handler Persists Aggregate: Finally, the command handler persists the modified aggregate back to the repository or data store. This ensures that the changes made to the aggregate's state are saved and reflected in the system's state.

Unit of Work

Command handler methods always execute within a UnitOfWork context by default. The UnitOfWork pattern ensures that the series of changes to an aggregate cluster are treated as a single, atomic transaction. If an error occurs, the UnitOfWork rolls back all changes, ensuring no partial updates are applied.

Each command handler method is wrapped in a UnitOfWork context, without having to explicitly specify it. Both handler methods in AccountCommandHandler below are equivalent:

from protean import handle, UnitOfWork


@domain.command_handler(part_of=Account)
class AccountCommandHandler:
    @handle(RegisterCommand)
    def register(self, command: RegisterCommand):
        with UnitOfWork():
            ...  # code to register account

    @handle(ActivateCommand)
    def activate(self, command: ActivateCommand):
        ...  # code to activate account

Note

A UnitOfWork context applies to objects in the aggregate cluster, and not multiple aggregates. A Command Handler method can load multiple aggregates to perform the business process, but should never persist more than one at a time. Other aggregates should be synced eventually through domain events.