Skip to content

Chapter 9: Transferring Funds

The product team wants account-to-account transfers. This is not a single-aggregate operation — it involves debiting one account and crediting another. If the debit succeeds but the credit fails, we have inconsistent state.

In this chapter we will introduce a process manager — a stateful coordinator that orchestrates multi-step workflows across aggregates using events and commands.

Why Not a Single Transaction?

In DDD, each aggregate is a transaction boundary. You cannot modify two aggregates in the same transaction. A funds transfer must:

  1. Debit the source account
  2. Credit the destination account
  3. Handle failures (reverse the debit if the credit fails)

A process manager coordinates these steps through events and commands, maintaining eventual consistency.

The Transfer Aggregate

First, we define a Transfer aggregate to represent the transfer itself:

@domain.event(part_of="Transfer")
class TransferInitiated:
    transfer_id: Identifier(required=True)
    source_account_id: String(required=True)
    destination_account_id: String(required=True)
    amount: Float(required=True)


@domain.event(part_of="Transfer")
class TransferCompleted:
    transfer_id: Identifier(required=True)


@domain.event(part_of="Transfer")
class TransferFailed:
    transfer_id: Identifier(required=True)
    reason: String(required=True)
@domain.aggregate(is_event_sourced=True)
class Transfer:
    source_account_id: String(max_length=50, required=True)
    destination_account_id: String(max_length=50, required=True)
    amount: Float(required=True)
    status: String(max_length=20, default="INITIATED")

    @classmethod
    def initiate(
        cls,
        source_account_id: str,
        destination_account_id: str,
        amount: float,
    ):
        transfer = cls._create_new()
        transfer.raise_(
            TransferInitiated(
                transfer_id=str(transfer.id),
                source_account_id=source_account_id,
                destination_account_id=destination_account_id,
                amount=amount,
            )
        )
        return transfer

    def complete(self) -> None:
        self.raise_(TransferCompleted(transfer_id=str(self.id)))

    def fail(self, reason: str) -> None:
        self.raise_(TransferFailed(transfer_id=str(self.id), reason=reason))

    @apply
    def on_transfer_initiated(self, event: TransferInitiated):
        self.id = event.transfer_id
        self.source_account_id = event.source_account_id
        self.destination_account_id = event.destination_account_id
        self.amount = event.amount
        self.status = "INITIATED"

    @apply
    def on_transfer_completed(self, event: TransferCompleted):
        self.status = "COMPLETED"

    @apply
    def on_transfer_failed(self, event: TransferFailed):
        self.status = "FAILED"

The transfer tracks its own lifecycle: INITIATED → COMPLETED or FAILED.

The Process Manager

The FundsTransferPM process manager listens to events from both the Transfer and Account streams and coordinates the flow:

@domain.process_manager(stream_categories=["fidelis::transfer", "fidelis::account"])
class FundsTransferPM:
    transfer_id: Identifier()
    source_account_id: String()
    destination_account_id: String()
    amount: Float()
    status: String(default="new")

    @handle(TransferInitiated, start=True, correlate="transfer_id")
    def on_transfer_initiated(self, event: TransferInitiated) -> None:
        self.transfer_id = event.transfer_id
        self.source_account_id = event.source_account_id
        self.destination_account_id = event.destination_account_id
        self.amount = event.amount
        self.status = "withdrawing"

        # Withdraw from source account
        current_domain.process(
            MakeWithdrawal(
                account_id=event.source_account_id,
                amount=event.amount,
                reference=f"transfer:{event.transfer_id}",
            )
        )

    @handle(WithdrawalMade, correlate="account_id")
    def on_withdrawal_made(self, event: WithdrawalMade) -> None:
        self.status = "depositing"

        # Deposit into destination account
        current_domain.process(
            MakeDeposit(
                account_id=self.destination_account_id,
                amount=self.amount,
                reference=f"transfer:{self.transfer_id}",
            )
        )

    @handle(DepositMade, correlate="account_id")
    def on_deposit_made(self, event: DepositMade) -> None:
        self.status = "completing"

        # Mark transfer as complete
        current_domain.process(CompleteTransfer(transfer_id=self.transfer_id))

    @handle(TransferCompleted, correlate="transfer_id")
    def on_transfer_completed(self, event: TransferCompleted) -> None:
        self.status = "completed"
        self.mark_as_complete()

    @handle(TransferFailed, correlate="transfer_id", end=True)
    def on_transfer_failed(self, event: TransferFailed) -> None:
        self.status = "failed"

Key concepts:

  • @handle(TransferInitiated, start=True, correlate="transfer_id")start=True means "create a new PM instance when this event arrives." The correlate parameter maps the event to the PM instance.
  • correlate="transfer_id" on subsequent events routes them to the correct PM instance based on the transfer ID.
  • Each handler can issue commands via current_domain.process() to trigger work in other aggregates.
  • mark_as_complete() ends the process — no further events will be processed for this instance.

The Flow

InitiateTransfer (command)
    └── TransferInitiated (event)
            └── PM: on_transfer_initiated()
                    └── MakeWithdrawal (command to source account)
                            └── WithdrawalMade (event)
                                    └── PM: on_source_debited()
                                            └── MakeDeposit (command to dest)
                                                    └── DepositMade (event)
                                                            └── PM: on_dest_credited()
                                                                    └── CompleteTransfer
                                                                            └── TransferCompleted

If the withdrawal fails (insufficient funds):

MakeWithdrawal (command)
    └── ValidationError
            └── FailTransfer (command)
                    └── TransferFailed (event)

Process Manager State

Process managers are themselves event-sourced. Their state changes are persisted as transition events in the event store. When the server restarts, PM instances are reconstituted from their transition events, just like aggregates.

What We Built

  • A Transfer aggregate with its own event stream.
  • A FundsTransferPM process manager that coordinates the multi-step transfer workflow.
  • Cross-aggregate coordination through events and commands.
  • start=True and correlate= for PM lifecycle management.
  • mark_as_complete() for process termination.
  • Failure handling that reverses partial operations.

Process managers are the event-sourcing answer to distributed transactions. They maintain eventual consistency without distributed locks.

Full Source

from protean import Domain, apply, handle, invariant
from protean.exceptions import ValidationError
from protean.fields import Float, Identifier, String
from protean.utils.globals import current_domain

domain = Domain("fidelis")


@domain.event(part_of="Account")
class AccountOpened:
    account_id: Identifier(required=True)
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.event(part_of="Account")
class DepositMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class WithdrawalMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.aggregate(is_event_sourced=True)
class Account:
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")

    @invariant.post
    def balance_must_not_be_negative(self):
        if self.balance is not None and self.balance < 0:
            raise ValidationError(
                {"balance": ["Insufficient funds: balance cannot be negative"]}
            )

    @classmethod
    def open(cls, account_number: str, holder_name: str, opening_deposit: float):
        account = cls._create_new()
        account.raise_(
            AccountOpened(
                account_id=str(account.id),
                account_number=account_number,
                holder_name=holder_name,
                opening_deposit=opening_deposit,
            )
        )
        return account

    def deposit(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Deposit amount must be positive"]})
        self.raise_(
            DepositMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def withdraw(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
        self.raise_(
            WithdrawalMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    @apply
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        self.account_number = event.account_number
        self.holder_name = event.holder_name
        self.balance = event.opening_deposit
        self.status = "ACTIVE"

    @apply
    def on_deposit_made(self, event: DepositMade):
        self.balance += event.amount

    @apply
    def on_withdrawal_made(self, event: WithdrawalMade):
        self.balance -= event.amount


@domain.command(part_of=Account)
class OpenAccount:
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.command(part_of=Account)
class MakeDeposit:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command(part_of=Account)
class MakeWithdrawal:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command_handler(part_of=Account)
class AccountCommandHandler:
    @handle(OpenAccount)
    def handle_open_account(self, command: OpenAccount):
        account = Account.open(
            account_number=command.account_number,
            holder_name=command.holder_name,
            opening_deposit=command.opening_deposit,
        )
        current_domain.repository_for(Account).add(account)
        return str(account.id)

    @handle(MakeDeposit)
    def handle_make_deposit(self, command: MakeDeposit):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.deposit(command.amount, reference=command.reference)
        repo.add(account)

    @handle(MakeWithdrawal)
    def handle_make_withdrawal(self, command: MakeWithdrawal):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.withdraw(command.amount, reference=command.reference)
        repo.add(account)


@domain.event(part_of="Transfer")
class TransferInitiated:
    transfer_id: Identifier(required=True)
    source_account_id: String(required=True)
    destination_account_id: String(required=True)
    amount: Float(required=True)


@domain.event(part_of="Transfer")
class TransferCompleted:
    transfer_id: Identifier(required=True)


@domain.event(part_of="Transfer")
class TransferFailed:
    transfer_id: Identifier(required=True)
    reason: String(required=True)


@domain.aggregate(is_event_sourced=True)
class Transfer:
    source_account_id: String(max_length=50, required=True)
    destination_account_id: String(max_length=50, required=True)
    amount: Float(required=True)
    status: String(max_length=20, default="INITIATED")

    @classmethod
    def initiate(
        cls,
        source_account_id: str,
        destination_account_id: str,
        amount: float,
    ):
        transfer = cls._create_new()
        transfer.raise_(
            TransferInitiated(
                transfer_id=str(transfer.id),
                source_account_id=source_account_id,
                destination_account_id=destination_account_id,
                amount=amount,
            )
        )
        return transfer

    def complete(self) -> None:
        self.raise_(TransferCompleted(transfer_id=str(self.id)))

    def fail(self, reason: str) -> None:
        self.raise_(TransferFailed(transfer_id=str(self.id), reason=reason))

    @apply
    def on_transfer_initiated(self, event: TransferInitiated):
        self.id = event.transfer_id
        self.source_account_id = event.source_account_id
        self.destination_account_id = event.destination_account_id
        self.amount = event.amount
        self.status = "INITIATED"

    @apply
    def on_transfer_completed(self, event: TransferCompleted):
        self.status = "COMPLETED"

    @apply
    def on_transfer_failed(self, event: TransferFailed):
        self.status = "FAILED"


@domain.command(part_of=Transfer)
class InitiateTransfer:
    source_account_id: String(required=True)
    destination_account_id: String(required=True)
    amount: Float(required=True)


@domain.command(part_of=Transfer)
class CompleteTransfer:
    transfer_id: Identifier(required=True)


@domain.command(part_of=Transfer)
class FailTransfer:
    transfer_id: Identifier(required=True)
    reason: String(required=True)


@domain.command_handler(part_of=Transfer)
class TransferCommandHandler:
    @handle(InitiateTransfer)
    def handle_initiate_transfer(self, command: InitiateTransfer):
        transfer = Transfer.initiate(
            source_account_id=command.source_account_id,
            destination_account_id=command.destination_account_id,
            amount=command.amount,
        )
        current_domain.repository_for(Transfer).add(transfer)
        return str(transfer.id)

    @handle(CompleteTransfer)
    def handle_complete_transfer(self, command: CompleteTransfer):
        repo = current_domain.repository_for(Transfer)
        transfer = repo.get(command.transfer_id)
        transfer.complete()
        repo.add(transfer)

    @handle(FailTransfer)
    def handle_fail_transfer(self, command: FailTransfer):
        repo = current_domain.repository_for(Transfer)
        transfer = repo.get(command.transfer_id)
        transfer.fail(reason=command.reason)
        repo.add(transfer)


@domain.process_manager(stream_categories=["fidelis::transfer", "fidelis::account"])
class FundsTransferPM:
    transfer_id: Identifier()
    source_account_id: String()
    destination_account_id: String()
    amount: Float()
    status: String(default="new")

    @handle(TransferInitiated, start=True, correlate="transfer_id")
    def on_transfer_initiated(self, event: TransferInitiated) -> None:
        self.transfer_id = event.transfer_id
        self.source_account_id = event.source_account_id
        self.destination_account_id = event.destination_account_id
        self.amount = event.amount
        self.status = "withdrawing"

        # Withdraw from source account
        current_domain.process(
            MakeWithdrawal(
                account_id=event.source_account_id,
                amount=event.amount,
                reference=f"transfer:{event.transfer_id}",
            )
        )

    @handle(WithdrawalMade, correlate="account_id")
    def on_withdrawal_made(self, event: WithdrawalMade) -> None:
        self.status = "depositing"

        # Deposit into destination account
        current_domain.process(
            MakeDeposit(
                account_id=self.destination_account_id,
                amount=self.amount,
                reference=f"transfer:{self.transfer_id}",
            )
        )

    @handle(DepositMade, correlate="account_id")
    def on_deposit_made(self, event: DepositMade) -> None:
        self.status = "completing"

        # Mark transfer as complete
        current_domain.process(CompleteTransfer(transfer_id=self.transfer_id))

    @handle(TransferCompleted, correlate="transfer_id")
    def on_transfer_completed(self, event: TransferCompleted) -> None:
        self.status = "completed"
        self.mark_as_complete()

    @handle(TransferFailed, correlate="transfer_id", end=True)
    def on_transfer_failed(self, event: TransferFailed) -> None:
        self.status = "failed"


domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"


if __name__ == "__main__":
    with domain.domain_context():
        # Open two accounts
        alice_id = domain.process(
            OpenAccount(
                account_number="ACC-001",
                holder_name="Alice Johnson",
                opening_deposit=10000.00,
            )
        )
        bob_id = domain.process(
            OpenAccount(
                account_number="ACC-002",
                holder_name="Bob Smith",
                opening_deposit=5000.00,
            )
        )
        print(f"Alice's account: {alice_id}")
        print(f"Bob's account: {bob_id}")

        # Initiate a transfer of $3000 from Alice to Bob
        transfer_id = domain.process(
            InitiateTransfer(
                source_account_id=alice_id,
                destination_account_id=bob_id,
                amount=3000.00,
            )
        )
        print(f"\nTransfer initiated: {transfer_id}")

        # The transfer aggregate is created with INITIATED status
        transfer_repo = current_domain.repository_for(Transfer)
        transfer = transfer_repo.get(transfer_id)
        print(f"Transfer status: {transfer.status}")
        assert transfer.status == "INITIATED"

        # The process manager coordinates the remaining steps
        # (withdrawal, deposit, completion) when the server runs:
        #   $ protean server --domain=fidelis
        #
        # With the server running, the PM would:
        # 1. React to TransferInitiated → withdraw from Alice
        # 2. React to WithdrawalMade → deposit into Bob
        # 3. React to DepositMade → complete the transfer
        #
        # Final state after PM completes:
        #   Alice: $7,000 (10000 - 3000)
        #   Bob:   $8,000 (5000 + 3000)
        #   Transfer: COMPLETED

        print("\nAll checks passed!")

Next

Chapter 10: Entities Inside Aggregates →