Skip to content

Chapter 3: Commands and the Processing Pipeline

Calling account.deposit() directly works for in-process code, but external APIs need a clear contract. In this chapter we will introduce commands — typed data objects that represent the intent to do something — and command handlers that receive those commands, load the aggregate from the event store, call the appropriate domain method, and persist the result.

Defining Commands

Commands are imperative: they describe what we want to happen.

@domain.command(part_of=Account)
class OpenAccount:
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.command(part_of=Account)
class MakeDeposit:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command(part_of=Account)
class MakeWithdrawal:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()

Notice the naming convention:

  • Commands use imperative verbs: OpenAccount, MakeDeposit, MakeWithdrawal.
  • Events (from the previous chapter) use past tense: AccountOpened, DepositMade, WithdrawalMade.

Commands express intent ("I want to make a deposit"). Events express facts ("A deposit was made"). This distinction matters — a command can be rejected, but an event is an immutable record of something that already happened.

Each command is linked to an aggregate via part_of=Account. This tells Protean which aggregate the command targets.

The Command Handler

A command handler receives commands and orchestrates the work:

@domain.command_handler(part_of=Account)
class AccountCommandHandler:
    @handle(OpenAccount)
    def handle_open_account(self, command: OpenAccount):
        account = Account.open(
            account_number=command.account_number,
            holder_name=command.holder_name,
            opening_deposit=command.opening_deposit,
        )
        current_domain.repository_for(Account).add(account)
        return str(account.id)

    @handle(MakeDeposit)
    def handle_make_deposit(self, command: MakeDeposit):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.deposit(command.amount, reference=command.reference)
        repo.add(account)

    @handle(MakeWithdrawal)
    def handle_make_withdrawal(self, command: MakeWithdrawal):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.withdraw(command.amount, reference=command.reference)
        repo.add(account)

The command handler follows a consistent pattern:

  1. Load the aggregate from the repository (or create a new one)
  2. Call the domain method with data from the command
  3. Save the aggregate back to the repository

The handler is a thin coordinator — it does not contain business logic. Business logic lives in the aggregate (the deposit() and withdraw() methods we wrote in Chapter 2).

current_domain

current_domain is a thread-local reference to the active domain. Inside handlers, it gives you access to repositories and other domain services without passing the domain around explicitly.

Processing Commands

The domain.process() method ties everything together:

if __name__ == "__main__":
    with domain.domain_context():
        # Open an account via a command
        account_id = domain.process(
            OpenAccount(
                account_number="ACC-001",
                holder_name="Alice Johnson",
                opening_deposit=1000.00,
            )
        )
        print(f"Account opened: {account_id}")

        # Make a deposit via a command
        domain.process(
            MakeDeposit(
                account_id=account_id,
                amount=500.00,
                reference="paycheck",
            )
        )

        # Make a withdrawal via a command
        domain.process(
            MakeWithdrawal(
                account_id=account_id,
                amount=150.00,
                reference="groceries",
            )
        )

        # Verify the final state
        repo = domain.repository_for(Account)
        account = repo.get(account_id)
        print(f"Account: {account.holder_name}")
        print(f"Balance: ${account.balance:.2f}")  # 1000 + 500 - 150 = 1350

        assert account.balance == 1350.00
        print("\nAll checks passed!")

Run it:

$ python fidelis.py
Account opened: 5eb04301-f191-4bca-9e49-8e5a948f07f6
Account: Alice Johnson
Balance: $1350.00

All checks passed!

How It Works

When you call domain.process(OpenAccount(...)), Protean:

  1. Looks up the registered command handler for OpenAccount's part_of aggregate (Account)
  2. Calls the matching @handle(OpenAccount) method
  3. The handler creates the aggregate, raises events, and persists it
  4. Events are written to the event store
  5. The command handler's return value is returned to the caller

For now we configure synchronous processing:

domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"

In Chapter 8, we will switch to asynchronous processing where commands flow through a message broker.

Stream Categories

Behind the scenes, events are organized into streams. Each aggregate instance has its own stream:

  • fidelis::account-{id} — events for a specific account
  • fidelis::account — the category stream containing all account events

Commands are dispatched through a command stream:

  • fidelis::account:command-{id} — commands for a specific account

Stream categories are derived from the aggregate's class name. You rarely need to think about them, but they become important when configuring subscriptions in Chapter 8.

What We Built

  • Commands (OpenAccount, MakeDeposit, MakeWithdrawal) that express intent with typed fields.
  • A Command Handler that follows the load-mutate-save pattern.
  • The domain.process() pipeline that routes commands to handlers.
  • Synchronous processing for development.

The domain now has a clean external contract: submit a command, and the system handles the rest. Next, we will add business rules that protect the aggregate from invalid state transitions.

Full Source

from protean import Domain, apply, handle
from protean.exceptions import ValidationError
from protean.fields import Float, Identifier, String
from protean.utils.globals import current_domain

domain = Domain("fidelis")


@domain.event(part_of="Account")
class AccountOpened:
    account_id: Identifier(required=True)
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.event(part_of="Account")
class DepositMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class WithdrawalMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.aggregate(is_event_sourced=True)
class Account:
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")

    @classmethod
    def open(cls, account_number: str, holder_name: str, opening_deposit: float):
        account = cls._create_new()
        account.raise_(
            AccountOpened(
                account_id=str(account.id),
                account_number=account_number,
                holder_name=holder_name,
                opening_deposit=opening_deposit,
            )
        )
        return account

    def deposit(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Deposit amount must be positive"]})
        self.raise_(
            DepositMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def withdraw(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
        if amount > self.balance:
            raise ValidationError({"amount": ["Insufficient funds"]})
        self.raise_(
            WithdrawalMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    @apply
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        self.account_number = event.account_number
        self.holder_name = event.holder_name
        self.balance = event.opening_deposit
        self.status = "ACTIVE"

    @apply
    def on_deposit_made(self, event: DepositMade):
        self.balance += event.amount

    @apply
    def on_withdrawal_made(self, event: WithdrawalMade):
        self.balance -= event.amount


@domain.command(part_of=Account)
class OpenAccount:
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.command(part_of=Account)
class MakeDeposit:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command(part_of=Account)
class MakeWithdrawal:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command_handler(part_of=Account)
class AccountCommandHandler:
    @handle(OpenAccount)
    def handle_open_account(self, command: OpenAccount):
        account = Account.open(
            account_number=command.account_number,
            holder_name=command.holder_name,
            opening_deposit=command.opening_deposit,
        )
        current_domain.repository_for(Account).add(account)
        return str(account.id)

    @handle(MakeDeposit)
    def handle_make_deposit(self, command: MakeDeposit):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.deposit(command.amount, reference=command.reference)
        repo.add(account)

    @handle(MakeWithdrawal)
    def handle_make_withdrawal(self, command: MakeWithdrawal):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.withdraw(command.amount, reference=command.reference)
        repo.add(account)


domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"


if __name__ == "__main__":
    with domain.domain_context():
        # Open an account via a command
        account_id = domain.process(
            OpenAccount(
                account_number="ACC-001",
                holder_name="Alice Johnson",
                opening_deposit=1000.00,
            )
        )
        print(f"Account opened: {account_id}")

        # Make a deposit via a command
        domain.process(
            MakeDeposit(
                account_id=account_id,
                amount=500.00,
                reference="paycheck",
            )
        )

        # Make a withdrawal via a command
        domain.process(
            MakeWithdrawal(
                account_id=account_id,
                amount=150.00,
                reference="groceries",
            )
        )

        # Verify the final state
        repo = domain.repository_for(Account)
        account = repo.get(account_id)
        print(f"Account: {account.holder_name}")
        print(f"Balance: ${account.balance:.2f}")  # 1000 + 500 - 150 = 1350

        assert account.balance == 1350.00
        print("\nAll checks passed!")

Next

Chapter 4: Business Rules That Never Break →