Chapter 3: Commands and the Processing Pipeline
Calling account.deposit() directly works for in-process code, but
external APIs need a clear contract. In this chapter we will introduce
commands — typed data objects that represent the intent to do
something — and command handlers that receive those commands, load
the aggregate from the event store, call the appropriate domain method,
and persist the result.
Defining Commands
Commands are imperative: they describe what we want to happen.
@domain.command(part_of=Account)
class OpenAccount:
account_number: String(required=True)
holder_name: String(required=True)
opening_deposit: Float(required=True)
@domain.command(part_of=Account)
class MakeDeposit:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.command(part_of=Account)
class MakeWithdrawal:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
Notice the naming convention:
- Commands use imperative verbs:
OpenAccount,MakeDeposit,MakeWithdrawal. - Events (from the previous chapter) use past tense:
AccountOpened,DepositMade,WithdrawalMade.
Commands express intent ("I want to make a deposit"). Events express facts ("A deposit was made"). This distinction matters — a command can be rejected, but an event is an immutable record of something that already happened.
Each command is linked to an aggregate via part_of=Account. This tells
Protean which aggregate the command targets.
The Command Handler
A command handler receives commands and orchestrates the work:
@domain.command_handler(part_of=Account)
class AccountCommandHandler:
@handle(OpenAccount)
def handle_open_account(self, command: OpenAccount):
account = Account.open(
account_number=command.account_number,
holder_name=command.holder_name,
opening_deposit=command.opening_deposit,
)
current_domain.repository_for(Account).add(account)
return str(account.id)
@handle(MakeDeposit)
def handle_make_deposit(self, command: MakeDeposit):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.deposit(command.amount, reference=command.reference)
repo.add(account)
@handle(MakeWithdrawal)
def handle_make_withdrawal(self, command: MakeWithdrawal):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.withdraw(command.amount, reference=command.reference)
repo.add(account)
The command handler follows a consistent pattern:
- Load the aggregate from the repository (or create a new one)
- Call the domain method with data from the command
- Save the aggregate back to the repository
The handler is a thin coordinator — it does not contain business logic.
Business logic lives in the aggregate (the deposit() and withdraw()
methods we wrote in Chapter 2).
current_domain
current_domain is a thread-local reference to the active domain.
Inside handlers, it gives you access to repositories and other
domain services without passing the domain around explicitly.
Processing Commands
The domain.process() method ties everything together:
if __name__ == "__main__":
with domain.domain_context():
# Open an account via a command
account_id = domain.process(
OpenAccount(
account_number="ACC-001",
holder_name="Alice Johnson",
opening_deposit=1000.00,
)
)
print(f"Account opened: {account_id}")
# Make a deposit via a command
domain.process(
MakeDeposit(
account_id=account_id,
amount=500.00,
reference="paycheck",
)
)
# Make a withdrawal via a command
domain.process(
MakeWithdrawal(
account_id=account_id,
amount=150.00,
reference="groceries",
)
)
# Verify the final state
repo = domain.repository_for(Account)
account = repo.get(account_id)
print(f"Account: {account.holder_name}")
print(f"Balance: ${account.balance:.2f}") # 1000 + 500 - 150 = 1350
assert account.balance == 1350.00
print("\nAll checks passed!")
Run it:
$ python fidelis.py
Account opened: 5eb04301-f191-4bca-9e49-8e5a948f07f6
Account: Alice Johnson
Balance: $1350.00
All checks passed!
How It Works
When you call domain.process(OpenAccount(...)), Protean:
- Looks up the registered command handler for
OpenAccount'spart_ofaggregate (Account) - Calls the matching
@handle(OpenAccount)method - The handler creates the aggregate, raises events, and persists it
- Events are written to the event store
- The command handler's return value is returned to the caller
For now we configure synchronous processing:
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"
In Chapter 8, we will switch to asynchronous processing where commands flow through a message broker.
Stream Categories
Behind the scenes, events are organized into streams. Each aggregate instance has its own stream:
fidelis::account-{id}— events for a specific accountfidelis::account— the category stream containing all account events
Commands are dispatched through a command stream:
fidelis::account:command-{id}— commands for a specific account
Stream categories are derived from the aggregate's class name. You rarely need to think about them, but they become important when configuring subscriptions in Chapter 8.
What We Built
- Commands (
OpenAccount,MakeDeposit,MakeWithdrawal) that express intent with typed fields. - A Command Handler that follows the load-mutate-save pattern.
- The
domain.process()pipeline that routes commands to handlers. - Synchronous processing for development.
The domain now has a clean external contract: submit a command, and the system handles the rest. Next, we will add business rules that protect the aggregate from invalid state transitions.
Full Source
from protean import Domain, apply, handle
from protean.exceptions import ValidationError
from protean.fields import Float, Identifier, String
from protean.utils.globals import current_domain
domain = Domain("fidelis")
@domain.event(part_of="Account")
class AccountOpened:
account_id: Identifier(required=True)
account_number: String(required=True)
holder_name: String(required=True)
opening_deposit: Float(required=True)
@domain.event(part_of="Account")
class DepositMade:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.event(part_of="Account")
class WithdrawalMade:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.aggregate(is_event_sourced=True)
class Account:
account_number: String(max_length=20, required=True)
holder_name: String(max_length=100, required=True)
balance: Float(default=0.0)
status: String(max_length=20, default="ACTIVE")
@classmethod
def open(cls, account_number: str, holder_name: str, opening_deposit: float):
account = cls._create_new()
account.raise_(
AccountOpened(
account_id=str(account.id),
account_number=account_number,
holder_name=holder_name,
opening_deposit=opening_deposit,
)
)
return account
def deposit(self, amount: float, reference: str = None) -> None:
if amount <= 0:
raise ValidationError({"amount": ["Deposit amount must be positive"]})
self.raise_(
DepositMade(
account_id=str(self.id),
amount=amount,
reference=reference,
)
)
def withdraw(self, amount: float, reference: str = None) -> None:
if amount <= 0:
raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
if amount > self.balance:
raise ValidationError({"amount": ["Insufficient funds"]})
self.raise_(
WithdrawalMade(
account_id=str(self.id),
amount=amount,
reference=reference,
)
)
@apply
def on_account_opened(self, event: AccountOpened):
self.id = event.account_id
self.account_number = event.account_number
self.holder_name = event.holder_name
self.balance = event.opening_deposit
self.status = "ACTIVE"
@apply
def on_deposit_made(self, event: DepositMade):
self.balance += event.amount
@apply
def on_withdrawal_made(self, event: WithdrawalMade):
self.balance -= event.amount
@domain.command(part_of=Account)
class OpenAccount:
account_number: String(required=True)
holder_name: String(required=True)
opening_deposit: Float(required=True)
@domain.command(part_of=Account)
class MakeDeposit:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.command(part_of=Account)
class MakeWithdrawal:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.command_handler(part_of=Account)
class AccountCommandHandler:
@handle(OpenAccount)
def handle_open_account(self, command: OpenAccount):
account = Account.open(
account_number=command.account_number,
holder_name=command.holder_name,
opening_deposit=command.opening_deposit,
)
current_domain.repository_for(Account).add(account)
return str(account.id)
@handle(MakeDeposit)
def handle_make_deposit(self, command: MakeDeposit):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.deposit(command.amount, reference=command.reference)
repo.add(account)
@handle(MakeWithdrawal)
def handle_make_withdrawal(self, command: MakeWithdrawal):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.withdraw(command.amount, reference=command.reference)
repo.add(account)
domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"
if __name__ == "__main__":
with domain.domain_context():
# Open an account via a command
account_id = domain.process(
OpenAccount(
account_number="ACC-001",
holder_name="Alice Johnson",
opening_deposit=1000.00,
)
)
print(f"Account opened: {account_id}")
# Make a deposit via a command
domain.process(
MakeDeposit(
account_id=account_id,
amount=500.00,
reference="paycheck",
)
)
# Make a withdrawal via a command
domain.process(
MakeWithdrawal(
account_id=account_id,
amount=150.00,
reference="groceries",
)
)
# Verify the final state
repo = domain.repository_for(Account)
account = repo.get(account_id)
print(f"Account: {account.holder_name}")
print(f"Balance: ${account.balance:.2f}") # 1000 + 500 - 150 = 1350
assert account.balance == 1350.00
print("\nAll checks passed!")