Command Handlers
Command handlers are responsible for executing commands and persisting system state. They typically interact with aggregate roots to perform the required operations, ensuring that all business rules and invariants are upheld.
Key Facts
- Command Handlers extract relevant data from a command and invoke the appropriate aggregate method with the necessary parameters.
- Command Handlers are responsible for hydrating (loading) and persisting aggregates.
- A Command Handler method can hydrate more than one aggregate at a time, depending on the business process requirements, but it should always persist one aggregate root. Other aggregates should be synced eventually through domain events.
Defining a Command Handler
Command Handlers are defined with the Domain.command_handler
decorator:
from datetime import datetime, timezone
from enum import Enum
from protean import Domain, handle
from protean.fields import DateTime, Identifier, String
from protean.utils.globals import current_domain
publishing = Domain(name="Publishing")
def utc_now():
return datetime.now(timezone.utc)
class ArticleStatus(Enum):
DRAFT = "DRAFT"
PUBLISHED = "PUBLISHED"
@publishing.command(part_of="Article")
class PublishArticle:
article_id = Identifier(required=True)
published_at = DateTime(default=utc_now)
@publishing.event(part_of="Article")
class ArticlePublished:
article_id = Identifier(required=True)
published_at = DateTime()
@publishing.aggregate
class Article:
article_id = Identifier(required=True)
status = String(choices=ArticleStatus, default=ArticleStatus.DRAFT.value)
published_at = DateTime(default=utc_now)
def publish(self, published_at: DateTime) -> None:
self.status = ArticleStatus.PUBLISHED.value
self.published_at = published_at
self.raise_(
ArticlePublished(article_id=self.article_id, published_at=published_at)
)
@publishing.command_handler(part_of=Article)
class ArticleCommandHandler:
@handle(PublishArticle)
def publish_article(self, command):
article = current_domain.repository_for(Article).get(command.article_id)
article.publish()
current_domain.repository_for(Article).add(article)
Workflow
sequenceDiagram
autonumber
App->>Command Handler: Command object
Command Handler->>Command Handler: Extract data and Load aggregate
Command Handler->>Aggregate: Invoke method
Aggregate->>Aggregate: Mutate
Aggregate-->>Command Handler:
Command Handler->>Command Handler: Persist aggregate
-
Domain Sends Command Object to Command Handler: The domain layer initiates the process by sending a command object to the command handler. This command object encapsulates the intent to perform a specific action or operation within the domain.
-
Command Handler Loads Aggregate: Upon receiving the command object, the command handler begins by loading the necessary aggregate from the repository or data store. The aggregate is the key entity that will be acted upon based on the command.
-
Command Handler Extracts Data and Invokes Aggregate Method: The command handler extracts the relevant data from the command object and invokes the appropriate method on the aggregate. This method call triggers the aggregate to perform the specified operation.
-
Aggregate Mutates: Within the aggregate, the invoked method processes the data and performs the necessary business logic, resulting in a change (mutation) of the aggregate's state. This ensures that the operation adheres to the business rules and maintains consistency.
-
Aggregate Responds to Command Handler: After mutating its state, the aggregate completes its operation and returns control to the command handler. The response may include confirmation of the successful operation or any relevant data resulting from the mutation.
-
Command Handler Persists Aggregate: Finally, the command handler persists the modified aggregate back to the repository or data store. This ensures that the changes made to the aggregate's state are saved and reflected in the system's state.
Return Values from Command Handlers
Command handlers can optionally return values to the caller when processed synchronously. This behavior is determined by how the command is processed by the domain.
Synchronous Processing
When commands are processed synchronously, the command handler's return value is passed back to the caller. This is useful for:
- Returning newly created resource identifiers
- Providing validation or processing results
- Returning calculated values or status information
To process a command synchronously and receive its return value:
# Process command synchronously and get the return value
result = domain.process(command, asynchronous=False)
Example of a command handler that returns a value:
@domain.command_handler(part_of=Account)
class AccountCommandHandler:
@handle(RegisterCommand)
def register(self, command: RegisterCommand):
account = Account(
email=command.email,
name=command.name
)
self.repository_for(Account).add(account)
# Return the account ID for immediate use
return account.id
Asynchronous Processing
When commands are processed asynchronously (the default behavior), the command handler's return value is not passed back to the caller. Instead, the domain's process
method returns the position of the command in the event store:
# Process command asynchronously (default)
position = domain.process(command) # or domain.process(command, asynchronous=True)
In asynchronous processing, commands are handled in the background by the Protean Engine, and any return values from the command handler are ignored.
Configuring Default Processing Behavior
The default command processing behavior can be configured in the domain's configuration:
# ...
command_processing = "sync" # or "async"
# ...
When set to "sync", all commands will be processed synchronously by default unless explicitly specified as asynchronous, and vice versa.
Unit of Work
Command handler methods always execute within a UnitOfWork
context by
default. The UnitOfWork pattern ensures that the series of changes to an
aggregate cluster are treated as a single, atomic transaction. If an error
occurs, the UnitOfWork rolls back all changes, ensuring no partial updates
are applied.
Each command handler method is wrapped in a UnitOfWork
context, without
having to explicitly specify it. Both handler methods in
AccountCommandHandler
below are equivalent:
from protean import handle, UnitOfWork
@domain.command_handler(part_of=Account)
class AccountCommandHandler:
@handle(RegisterCommand)
def register(self, command: RegisterCommand):
with UnitOfWork():
... # code to register account
@handle(ActivateCommand)
def activate(self, command: ActivateCommand):
... # code to activate account
Note
A UnitOfWork
context applies to objects in the aggregate cluster,
and not multiple aggregates. A Command Handler method can load multiple
aggregates to perform the business process, but should never persist more
than one at a time. Other aggregates should be synced eventually through
domain events.
Error Handling
Command handlers support custom error handling through the optional handle_error
method. This method is called when an exception occurs during command processing, allowing you to implement specialized error handling strategies.
The handle_error
Method
You can define a handle_error
class method in your command handler to handle exceptions:
@domain.command_handler(part_of=Account)
class AccountCommandHandler:
@handle(RegisterCommand)
def register(self, command: RegisterCommand):
# Command handling logic that might raise exceptions
...
@classmethod
def handle_error(cls, exc: Exception, message):
"""Custom error handling logic for command processing failures"""
# Log the error
logger.error(f"Failed to process command: {exc}")
# Perform recovery operations
# Example: notify monitoring systems, attempt retry, etc.
...
How It Works
- When an exception occurs in a command handler method, the Protean Engine catches it.
- The engine logs detailed error information including stack traces.
- The engine calls the command handler's
handle_error
method, passing: - The original exception that was raised
- The command message being processed when the exception occurred
- After
handle_error
returns, processing continues with the next command.
Handling Errors in the Error Handler
If an exception occurs within the handle_error
method itself, the Protean Engine will catch that exception too, log it, and continue processing. This ensures that even failures in error handling don't crash the system.
@classmethod
def handle_error(cls, exc: Exception, message):
try:
# Potentially risky error handling logic
...
except Exception as error_exc:
# This secondary exception will be caught by the engine
logger.error(f"Error in error handler: {error_exc}")
# The engine will continue processing regardless
Best Practices
- Make error handlers robust and avoid complex logic that might fail.
- Use error handlers for logging, notification, and simple recovery.
- Don't throw exceptions from error handlers unless absolutely necessary.
- Consider implementing retry logic for transient failures.