Chapter 9: Transferring Funds
The product team wants account-to-account transfers. This is not a single-aggregate operation — it involves debiting one account and crediting another. If the debit succeeds but the credit fails, we have inconsistent state.
In this chapter we will introduce a process manager — a stateful coordinator that orchestrates multi-step workflows across aggregates using events and commands.
Why Not a Single Transaction?
In DDD, each aggregate is a transaction boundary. You cannot modify two aggregates in the same transaction. A funds transfer must:
- Debit the source account
- Credit the destination account
- Handle failures (reverse the debit if the credit fails)
A process manager coordinates these steps through events and commands, maintaining eventual consistency.
The Transfer Aggregate
First, we define a Transfer aggregate to represent the transfer
itself:
@domain.event(part_of="Transfer")
class TransferInitiated:
transfer_id: Identifier(required=True)
source_account_id: String(required=True)
destination_account_id: String(required=True)
amount: Float(required=True)
@domain.event(part_of="Transfer")
class TransferCompleted:
transfer_id: Identifier(required=True)
@domain.event(part_of="Transfer")
class TransferFailed:
transfer_id: Identifier(required=True)
reason: String(required=True)
@domain.aggregate(is_event_sourced=True)
class Transfer:
source_account_id: String(max_length=50, required=True)
destination_account_id: String(max_length=50, required=True)
amount: Float(required=True)
status: String(max_length=20, default="INITIATED")
@classmethod
def initiate(
cls,
source_account_id: str,
destination_account_id: str,
amount: float,
):
transfer = cls._create_new()
transfer.raise_(
TransferInitiated(
transfer_id=str(transfer.id),
source_account_id=source_account_id,
destination_account_id=destination_account_id,
amount=amount,
)
)
return transfer
def complete(self) -> None:
self.raise_(TransferCompleted(transfer_id=str(self.id)))
def fail(self, reason: str) -> None:
self.raise_(TransferFailed(transfer_id=str(self.id), reason=reason))
@apply
def on_transfer_initiated(self, event: TransferInitiated):
self.id = event.transfer_id
self.source_account_id = event.source_account_id
self.destination_account_id = event.destination_account_id
self.amount = event.amount
self.status = "INITIATED"
@apply
def on_transfer_completed(self, event: TransferCompleted):
self.status = "COMPLETED"
@apply
def on_transfer_failed(self, event: TransferFailed):
self.status = "FAILED"
The transfer tracks its own lifecycle: INITIATED → COMPLETED or FAILED.
The Process Manager
The FundsTransferPM process manager listens to events from both
the Transfer and Account streams and coordinates the flow:
@domain.process_manager(stream_categories=["fidelis::transfer", "fidelis::account"])
class FundsTransferPM:
transfer_id: Identifier()
source_account_id: String()
destination_account_id: String()
amount: Float()
status: String(default="new")
@handle(TransferInitiated, start=True, correlate="transfer_id")
def on_transfer_initiated(self, event: TransferInitiated) -> None:
self.transfer_id = event.transfer_id
self.source_account_id = event.source_account_id
self.destination_account_id = event.destination_account_id
self.amount = event.amount
self.status = "withdrawing"
# Withdraw from source account
current_domain.process(
MakeWithdrawal(
account_id=event.source_account_id,
amount=event.amount,
reference=f"transfer:{event.transfer_id}",
)
)
@handle(WithdrawalMade, correlate="account_id")
def on_withdrawal_made(self, event: WithdrawalMade) -> None:
self.status = "depositing"
# Deposit into destination account
current_domain.process(
MakeDeposit(
account_id=self.destination_account_id,
amount=self.amount,
reference=f"transfer:{self.transfer_id}",
)
)
@handle(DepositMade, correlate="account_id")
def on_deposit_made(self, event: DepositMade) -> None:
self.status = "completing"
# Mark transfer as complete
current_domain.process(CompleteTransfer(transfer_id=self.transfer_id))
@handle(TransferCompleted, correlate="transfer_id")
def on_transfer_completed(self, event: TransferCompleted) -> None:
self.status = "completed"
self.mark_as_complete()
@handle(TransferFailed, correlate="transfer_id", end=True)
def on_transfer_failed(self, event: TransferFailed) -> None:
self.status = "failed"
Key concepts:
@handle(TransferInitiated, start=True, correlate="transfer_id")—start=Truemeans "create a new PM instance when this event arrives." Thecorrelateparameter maps the event to the PM instance.correlate="transfer_id"on subsequent events routes them to the correct PM instance based on the transfer ID.- Each handler can issue commands via
current_domain.process()to trigger work in other aggregates. mark_as_complete()ends the process — no further events will be processed for this instance.
The Flow
InitiateTransfer (command)
└── TransferInitiated (event)
└── PM: on_transfer_initiated()
└── MakeWithdrawal (command to source account)
└── WithdrawalMade (event)
└── PM: on_source_debited()
└── MakeDeposit (command to dest)
└── DepositMade (event)
└── PM: on_dest_credited()
└── CompleteTransfer
└── TransferCompleted
If the withdrawal fails (insufficient funds):
MakeWithdrawal (command)
└── ValidationError
└── FailTransfer (command)
└── TransferFailed (event)
Process Manager State
Process managers are themselves event-sourced. Their state changes are persisted as transition events in the event store. When the server restarts, PM instances are reconstituted from their transition events, just like aggregates.
What We Built
- A Transfer aggregate with its own event stream.
- A FundsTransferPM process manager that coordinates the multi-step transfer workflow.
- Cross-aggregate coordination through events and commands.
start=Trueandcorrelate=for PM lifecycle management.mark_as_complete()for process termination.- Failure handling that reverses partial operations.
Process managers are the event-sourcing answer to distributed transactions. They maintain eventual consistency without distributed locks.
Full Source
from protean import Domain, apply, handle, invariant
from protean.exceptions import ValidationError
from protean.fields import Float, Identifier, String
from protean.utils.globals import current_domain
domain = Domain("fidelis")
@domain.event(part_of="Account")
class AccountOpened:
account_id: Identifier(required=True)
account_number: String(required=True)
holder_name: String(required=True)
opening_deposit: Float(required=True)
@domain.event(part_of="Account")
class DepositMade:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.event(part_of="Account")
class WithdrawalMade:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.aggregate(is_event_sourced=True)
class Account:
account_number: String(max_length=20, required=True)
holder_name: String(max_length=100, required=True)
balance: Float(default=0.0)
status: String(max_length=20, default="ACTIVE")
@invariant.post
def balance_must_not_be_negative(self):
if self.balance is not None and self.balance < 0:
raise ValidationError(
{"balance": ["Insufficient funds: balance cannot be negative"]}
)
@classmethod
def open(cls, account_number: str, holder_name: str, opening_deposit: float):
account = cls._create_new()
account.raise_(
AccountOpened(
account_id=str(account.id),
account_number=account_number,
holder_name=holder_name,
opening_deposit=opening_deposit,
)
)
return account
def deposit(self, amount: float, reference: str = None) -> None:
if amount <= 0:
raise ValidationError({"amount": ["Deposit amount must be positive"]})
self.raise_(
DepositMade(
account_id=str(self.id),
amount=amount,
reference=reference,
)
)
def withdraw(self, amount: float, reference: str = None) -> None:
if amount <= 0:
raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
self.raise_(
WithdrawalMade(
account_id=str(self.id),
amount=amount,
reference=reference,
)
)
@apply
def on_account_opened(self, event: AccountOpened):
self.id = event.account_id
self.account_number = event.account_number
self.holder_name = event.holder_name
self.balance = event.opening_deposit
self.status = "ACTIVE"
@apply
def on_deposit_made(self, event: DepositMade):
self.balance += event.amount
@apply
def on_withdrawal_made(self, event: WithdrawalMade):
self.balance -= event.amount
@domain.command(part_of=Account)
class OpenAccount:
account_number: String(required=True)
holder_name: String(required=True)
opening_deposit: Float(required=True)
@domain.command(part_of=Account)
class MakeDeposit:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.command(part_of=Account)
class MakeWithdrawal:
account_id: Identifier(required=True)
amount: Float(required=True)
reference: String()
@domain.command_handler(part_of=Account)
class AccountCommandHandler:
@handle(OpenAccount)
def handle_open_account(self, command: OpenAccount):
account = Account.open(
account_number=command.account_number,
holder_name=command.holder_name,
opening_deposit=command.opening_deposit,
)
current_domain.repository_for(Account).add(account)
return str(account.id)
@handle(MakeDeposit)
def handle_make_deposit(self, command: MakeDeposit):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.deposit(command.amount, reference=command.reference)
repo.add(account)
@handle(MakeWithdrawal)
def handle_make_withdrawal(self, command: MakeWithdrawal):
repo = current_domain.repository_for(Account)
account = repo.get(command.account_id)
account.withdraw(command.amount, reference=command.reference)
repo.add(account)
@domain.event(part_of="Transfer")
class TransferInitiated:
transfer_id: Identifier(required=True)
source_account_id: String(required=True)
destination_account_id: String(required=True)
amount: Float(required=True)
@domain.event(part_of="Transfer")
class TransferCompleted:
transfer_id: Identifier(required=True)
@domain.event(part_of="Transfer")
class TransferFailed:
transfer_id: Identifier(required=True)
reason: String(required=True)
@domain.aggregate(is_event_sourced=True)
class Transfer:
source_account_id: String(max_length=50, required=True)
destination_account_id: String(max_length=50, required=True)
amount: Float(required=True)
status: String(max_length=20, default="INITIATED")
@classmethod
def initiate(
cls,
source_account_id: str,
destination_account_id: str,
amount: float,
):
transfer = cls._create_new()
transfer.raise_(
TransferInitiated(
transfer_id=str(transfer.id),
source_account_id=source_account_id,
destination_account_id=destination_account_id,
amount=amount,
)
)
return transfer
def complete(self) -> None:
self.raise_(TransferCompleted(transfer_id=str(self.id)))
def fail(self, reason: str) -> None:
self.raise_(TransferFailed(transfer_id=str(self.id), reason=reason))
@apply
def on_transfer_initiated(self, event: TransferInitiated):
self.id = event.transfer_id
self.source_account_id = event.source_account_id
self.destination_account_id = event.destination_account_id
self.amount = event.amount
self.status = "INITIATED"
@apply
def on_transfer_completed(self, event: TransferCompleted):
self.status = "COMPLETED"
@apply
def on_transfer_failed(self, event: TransferFailed):
self.status = "FAILED"
@domain.command(part_of=Transfer)
class InitiateTransfer:
source_account_id: String(required=True)
destination_account_id: String(required=True)
amount: Float(required=True)
@domain.command(part_of=Transfer)
class CompleteTransfer:
transfer_id: Identifier(required=True)
@domain.command(part_of=Transfer)
class FailTransfer:
transfer_id: Identifier(required=True)
reason: String(required=True)
@domain.command_handler(part_of=Transfer)
class TransferCommandHandler:
@handle(InitiateTransfer)
def handle_initiate_transfer(self, command: InitiateTransfer):
transfer = Transfer.initiate(
source_account_id=command.source_account_id,
destination_account_id=command.destination_account_id,
amount=command.amount,
)
current_domain.repository_for(Transfer).add(transfer)
return str(transfer.id)
@handle(CompleteTransfer)
def handle_complete_transfer(self, command: CompleteTransfer):
repo = current_domain.repository_for(Transfer)
transfer = repo.get(command.transfer_id)
transfer.complete()
repo.add(transfer)
@handle(FailTransfer)
def handle_fail_transfer(self, command: FailTransfer):
repo = current_domain.repository_for(Transfer)
transfer = repo.get(command.transfer_id)
transfer.fail(reason=command.reason)
repo.add(transfer)
@domain.process_manager(stream_categories=["fidelis::transfer", "fidelis::account"])
class FundsTransferPM:
transfer_id: Identifier()
source_account_id: String()
destination_account_id: String()
amount: Float()
status: String(default="new")
@handle(TransferInitiated, start=True, correlate="transfer_id")
def on_transfer_initiated(self, event: TransferInitiated) -> None:
self.transfer_id = event.transfer_id
self.source_account_id = event.source_account_id
self.destination_account_id = event.destination_account_id
self.amount = event.amount
self.status = "withdrawing"
# Withdraw from source account
current_domain.process(
MakeWithdrawal(
account_id=event.source_account_id,
amount=event.amount,
reference=f"transfer:{event.transfer_id}",
)
)
@handle(WithdrawalMade, correlate="account_id")
def on_withdrawal_made(self, event: WithdrawalMade) -> None:
self.status = "depositing"
# Deposit into destination account
current_domain.process(
MakeDeposit(
account_id=self.destination_account_id,
amount=self.amount,
reference=f"transfer:{self.transfer_id}",
)
)
@handle(DepositMade, correlate="account_id")
def on_deposit_made(self, event: DepositMade) -> None:
self.status = "completing"
# Mark transfer as complete
current_domain.process(CompleteTransfer(transfer_id=self.transfer_id))
@handle(TransferCompleted, correlate="transfer_id")
def on_transfer_completed(self, event: TransferCompleted) -> None:
self.status = "completed"
self.mark_as_complete()
@handle(TransferFailed, correlate="transfer_id", end=True)
def on_transfer_failed(self, event: TransferFailed) -> None:
self.status = "failed"
domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"
if __name__ == "__main__":
with domain.domain_context():
# Open two accounts
alice_id = domain.process(
OpenAccount(
account_number="ACC-001",
holder_name="Alice Johnson",
opening_deposit=10000.00,
)
)
bob_id = domain.process(
OpenAccount(
account_number="ACC-002",
holder_name="Bob Smith",
opening_deposit=5000.00,
)
)
print(f"Alice's account: {alice_id}")
print(f"Bob's account: {bob_id}")
# Initiate a transfer of $3000 from Alice to Bob
transfer_id = domain.process(
InitiateTransfer(
source_account_id=alice_id,
destination_account_id=bob_id,
amount=3000.00,
)
)
print(f"\nTransfer initiated: {transfer_id}")
# The transfer aggregate is created with INITIATED status
transfer_repo = current_domain.repository_for(Transfer)
transfer = transfer_repo.get(transfer_id)
print(f"Transfer status: {transfer.status}")
assert transfer.status == "INITIATED"
# The process manager coordinates the remaining steps
# (withdrawal, deposit, completion) when the server runs:
# $ protean server --domain=fidelis
#
# With the server running, the PM would:
# 1. React to TransferInitiated → withdraw from Alice
# 2. React to WithdrawalMade → deposit into Bob
# 3. React to DepositMade → complete the transfer
#
# Final state after PM completes:
# Alice: $7,000 (10000 - 3000)
# Bob: $8,000 (5000 + 3000)
# Transfer: COMPLETED
print("\nAll checks passed!")