Skip to content

Commands

Commands represent actions or operations that change the state of the system. They encapsulate the intent to perform a specific task, often containing data necessary for the action, and are (typically) processed by command handlers to ensure business rules and invariants are upheld.

In Protean, command objects are essentially DTOs (Data Transfer Objects) that carry intent and information necessary to perform a specific action.

Key Facts

  • Commands are unique throughout the domain.
  • Commands are typically named using imperative verbs that clearly describe the intended action or change. E.g. CreateOrder, UpdateCustomerAddress, ShipProduct, and CancelReservation.
  • Commands are typically related to an aggregate, because aggregates are the entry point for all modifications, ensuring consistency and enforcing business rules.
  • When commands represent a domain concept that spans across aggregates, one aggregate takes the responsibility of processing the command and raising events to eventually make the rest of the system consistent.

Defining Commands

A command is defined with the Domain.command decorator:

from datetime import datetime, timezone

from protean import Domain
from protean.fields import DateTime, Identifier

publishing = Domain(name="Publishing")


def utc_now():
    return datetime.now(timezone.utc)


@publishing.command(part_of="Article")
class PublishArticle:
    article_id = Identifier(required=True)
    published_at = DateTime(default=utc_now)


@publishing.aggregate
class Article:
    article_id = Identifier(required=True)
    published_at = DateTime(default=utc_now)

A command is always associated with an aggregate class with the part_of option, as seen in the example above.

Submitting Commands

Since a Protean domain constructs and manages the object graph of all domain elements, you don't need to identify the appropriate Command Handler for your commands.

You can simply submit the command to the domain for processing with domain.process.

In [1]: command = PublishArticle(article_id="1")

In [2]: publishing.process(command)

Synchronous vs Asynchronous Processing

Commands in Protean can be processed either synchronously or asynchronously:

  • Synchronous processing: The command is processed immediately by the command handler when domain.process() is called. The execution flow is blocked until the command is fully processed.
  • Asynchronous processing: The command is stored in the event store and later processed by a background worker. The domain.process() call returns immediately without waiting for the command to be fully processed.

You can control the processing mode in two ways:

1. Using the asynchronous parameter

When submitting a command, you can explicitly specify whether it should be processed synchronously or asynchronously:

# Process synchronously (default is based on domain configuration)
domain.process(command, asynchronous=False)

# Process asynchronously
domain.process(command, asynchronous=True)

2. Domain Configuration

You can set the default processing mode for all commands in the domain configuration:

In domain.toml:

command_processing = "sync"  # or "async"

In code:

# Configure default command processing as synchronous
domain.config["command_processing"] = "sync"  # or "async"

By default, Protean sets command_processing to async in the domain configuration.

When to use each mode

  • Synchronous processing is useful when:
  • You need immediate feedback from the command execution
  • You want to ensure the command was processed successfully before continuing
  • The operation is part of a transaction that needs to be completed atomically

  • Asynchronous processing is beneficial when:

  • You want to improve UI responsiveness by not blocking the execution flow
  • The command processing might take a long time
  • You want to distribute load across background workers
  • You're implementing CQRS with event sourcing patterns

How Asynchronous Processing Works

Asynchronous commands processing in Protean uses a server/engine component that:

  1. Creates subscriptions for command handlers to listen to their respective command streams
  2. Polls the event store for new commands that haven't been processed yet
  3. Dispatches those commands to the appropriate handlers

To run the Protean server for processing asynchronous commands, use the CLI:

protean server --domain path/to/domain.py

See CLI documentation for more details about the server command and other available CLI options.

The server continually polls the event store for new commands that have the asynchronous flag set to True in their metadata. When found, it dispatches them to the appropriate handlers, keeping track of processed commands to avoid duplicate processing.

Workflow

Command objects are often instantiated by the API controller, which acts as the entry point for external requests into the system. When a client makes a request to the API, the controller receives this request and translates the incoming data into the appropriate command object.

In Protean, the API controller submits the command to the domain object, which then dispatches the command to the appropriate command handler. We will explore how the domain identifies the command handler in the Command Handlers section.

The workflow differs slightly depending on whether synchronous or asynchronous processing is used:

Synchronous Command Flow

sequenceDiagram
  autonumber
  API Controller->>Domain: command object (asynchronous=False)
  Domain->>Event Store: Store command
  Domain->>Command Handler: Process command immediately
  Command Handler->>Command Handler: Process command
  Command Handler-->>Domain: Return result (if any)
  Domain-->>API Controller: Return result

Asynchronous Command Flow

sequenceDiagram
  autonumber
  API Controller->>Domain: command object (asynchronous=True)
  Domain->>Event Store: Store command with asynchronous=True
  Domain-->>API Controller: Acknowledge receipt (return immediately)

  Note over Protean Server: Later, asynchronously...

  Protean Server->>Event Store: Poll for unprocessed commands
  Event Store-->>Protean Server: Return command
  Protean Server->>Command Handler: Process command
  Command Handler->>Command Handler: Process command
  Protean Server->>Event Store: Update processed position

Immutability

Like Events, Commands in Protean are immutable. This means that once a command is created, it cannot be changed.

In [1]: from datetime import datetime, timedelta

In [2]: publish_article_command = PublishArticle(article_id="1")

In [3]: publish_article_command
Out[3]: <PublishArticle: PublishArticle object ({'article_id': '1', 'published_at': '2024-05-28 17:47:35.570857+00:00'})>

In [4]: publish_article_command.published_at = datetime.now() - timedelta(hours=24)
...
IncorrectUsageError: 'Command Objects are immutable and cannot be modified once created'
}

Relationship with Event Processing

Protean offers similar configuration options for events through: - The event_processing domain configuration setting - The ability to raise events with specific asynchronous flags

Events and commands in Protean follow the same processing patterns, enabling you to build consistent, predictable workflows. You can configure both to suit your specific domain needs:

# Domain-wide configuration
domain.config["command_processing"] = "sync"  # or "async"
domain.config["event_processing"] = "async"   # or "sync"

# Per-instance control
domain.process(command, asynchronous=False)   # Override domain setting for a specific command
aggregate.raise_(event, asynchronous=True)    # Override domain setting for a specific event

This flexibility allows you to implement various architectural patterns like CQRS, Event Sourcing, and Workflow-driven architectures within your Protean applications.