Skip to content

Chapter 16: Following the Trail — Message Tracing

An auditor asks: "Show me every system action triggered by deposit DEP-9921." This deposit was over $10,000 — it triggered a compliance alert, which generated an investigation, which led to an account hold.

To answer this, we need to trace the complete causal chain from the original command through every downstream event and reaction.

Correlation and Causation IDs

Every message in Protean carries two tracing identifiers:

  • Correlation ID — shared by all messages in a causal chain. If you set it on the initial command, every event raised by that command, every command issued by event handlers reacting to those events, and so on, all share the same correlation ID.
  • Causation ID — points to the message that directly caused this one, creating a parent-child relationship.
correlation_id: "audit-trail-dep-9921" (same for all)

MakeDeposit (causation_id: null)
  └── DepositMade (causation_id: MakeDeposit.id)
        ├── AccountSummary updated (causation_id: DepositMade.id)
        └── ComplianceAlert created (causation_id: DepositMade.id)
              └── AccountHoldPlaced (causation_id: ComplianceAlert.id)

Setting the Correlation ID

Set it explicitly when processing a command:

domain.process(
    MakeDeposit(
        account_id="acc-7742",
        amount=15_000.00,
        reference="DEP-9921",
    ),
    correlation_id="audit-trail-dep-9921",
)

If you do not set a correlation ID, Protean generates one automatically.

Tracing with the CLI

The protean events trace command visualizes the causal tree:

$ protean events trace audit-trail-dep-9921 --domain=fidelis
CMD MakeDeposit (...) @ 2025-06-15 10:30:00
├── EVT DepositMade (...) @ 2025-06-15 10:30:00
│   ├── EVT AccountFactEvent (...) @ 2025-06-15 10:30:00
│   └── CMD CreateComplianceAlert (...) @ 2025-06-15 10:30:01
│       └── EVT ComplianceAlertCreated (...) @ 2025-06-15 10:30:01
│           └── CMD PlaceAccountHold (...) @ 2025-06-15 10:30:02
│               └── EVT AccountHoldPlaced (...) @ 2025-06-15 10:30:02

Causation tree: 7 message(s) for correlation 'audit-trail-dep-9921'

For a flat, chronological view:

$ protean events trace audit-trail-dep-9921 --flat --domain=fidelis

Tracing Events with Metadata

Add --trace to event CLI commands to see correlation and causation IDs:

$ protean events read "fidelis::account-acc-7742" --trace --domain=fidelis
 Pos  Type          Time                 Correlation ID     Causation ID
 15   DepositMade   2025-06-15 10:30     audit-trail-d...   fidelis::ac...
 ...
$ protean events search --type=DepositMade --trace --domain=fidelis

Programmatic Tracing

Use the causation chain API for programmatic access:

with domain.domain_context():
    event_store = domain.event_store

    # Walk UP from a message to the root command
    chain = event_store.trace_causation(message_id)

    # Walk DOWN from a message to all its effects
    effects = event_store.trace_effects(message_id, recursive=True)

    # Build the full tree for a correlation ID
    tree = event_store.build_causation_tree("audit-trail-dep-9921")

The CausationNode tree structure contains:

  • message_id — the message's unique identifier
  • message_type — the event or command type string
  • stream — the stream the message was written to
  • timestamp — when it was created
  • children — list of CausationNode children

Automatic Propagation

Correlation IDs propagate automatically through the system:

  1. You set correlation_id on domain.process()
  2. The command handler runs — events raised by the aggregate inherit the correlation ID
  3. Event handlers receive events with the correlation ID
  4. When a handler issues a new command via domain.process(), the correlation ID is forwarded automatically
  5. The chain continues through the entire causal flow

You never need to manually pass correlation IDs between handlers.

What We Built

  • Correlation IDs that tie together entire causal chains.
  • Causation IDs that establish parent-child relationships.
  • protean events trace CLI for tree and flat visualization.
  • --trace flag on event CLI commands.
  • Programmatic API with trace_causation(), trace_effects(), and build_causation_tree().
  • Automatic propagation through handlers and commands.

Message tracing is essential for debugging, auditing, and understanding the behavior of event-driven systems. Next, we face a production incident — a handler bug that fills the dead-letter queue.

Full Source

"""Chapter 16: Message Tracing

Demonstrates how to use correlation IDs and causation trees to trace the flow
of commands and events through the system.  Every command processed by
``domain.process()`` can carry an explicit ``correlation_id`` that propagates
to all downstream events and commands in the causal chain.  The event store
provides ``build_causation_tree(correlation_id)`` to reconstruct this chain
programmatically.
"""

from protean import Domain, apply, handle, invariant
from protean.exceptions import ValidationError
from protean.fields import Float, Identifier, String
from protean.utils.globals import current_domain

domain = Domain("fidelis")


@domain.event(part_of="Account")
class AccountOpened:
    account_id: Identifier(required=True)
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.event(part_of="Account")
class DepositMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class WithdrawalMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.aggregate(is_event_sourced=True)
class Account:
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")

    @invariant.post
    def balance_must_not_be_negative(self):
        if self.balance is not None and self.balance < 0:
            raise ValidationError(
                {"balance": ["Insufficient funds: balance cannot be negative"]}
            )

    @classmethod
    def open(cls, account_number: str, holder_name: str, opening_deposit: float):
        account = cls._create_new()
        account.raise_(
            AccountOpened(
                account_id=str(account.id),
                account_number=account_number,
                holder_name=holder_name,
                opening_deposit=opening_deposit,
            )
        )
        return account

    def deposit(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Deposit amount must be positive"]})
        self.raise_(
            DepositMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def withdraw(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
        self.raise_(
            WithdrawalMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    @apply
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        self.account_number = event.account_number
        self.holder_name = event.holder_name
        self.balance = event.opening_deposit
        self.status = "ACTIVE"

    @apply
    def on_deposit_made(self, event: DepositMade):
        self.balance += event.amount

    @apply
    def on_withdrawal_made(self, event: WithdrawalMade):
        self.balance -= event.amount


@domain.command(part_of=Account)
class OpenAccount:
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.command(part_of=Account)
class MakeDeposit:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command(part_of=Account)
class MakeWithdrawal:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command_handler(part_of=Account)
class AccountCommandHandler:
    @handle(OpenAccount)
    def handle_open_account(self, command: OpenAccount):
        account = Account.open(
            account_number=command.account_number,
            holder_name=command.holder_name,
            opening_deposit=command.opening_deposit,
        )
        current_domain.repository_for(Account).add(account)
        return str(account.id)

    @handle(MakeDeposit)
    def handle_make_deposit(self, command: MakeDeposit):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.deposit(command.amount, reference=command.reference)
        repo.add(account)

    @handle(MakeWithdrawal)
    def handle_make_withdrawal(self, command: MakeWithdrawal):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.withdraw(command.amount, reference=command.reference)
        repo.add(account)


@domain.event_handler(part_of=Account)
class ComplianceAlertHandler:
    @handle(DepositMade)
    def on_large_deposit(self, event: DepositMade):
        if event.amount >= 10000:
            print(
                f"  [COMPLIANCE] Large deposit alert: "
                f"${event.amount:.2f} into account {event.account_id}"
            )

    @handle(WithdrawalMade)
    def on_large_withdrawal(self, event: WithdrawalMade):
        if event.amount >= 5000:
            print(
                f"  [COMPLIANCE] Large withdrawal alert: "
                f"${event.amount:.2f} from account {event.account_id}"
            )


domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"


if __name__ == "__main__":
    with domain.domain_context():
        # Open an account with an explicit correlation_id for tracing
        account_id = domain.process(
            OpenAccount(
                account_number="ACC-001",
                holder_name="Alice Johnson",
                opening_deposit=5000.00,
            ),
            correlation_id="audit-trail-dep-9921",
        )
        print(f"Account opened: {account_id}")

        # Make a large deposit using the same correlation_id
        domain.process(
            MakeDeposit(
                account_id=account_id,
                amount=15000.00,
                reference="wire-transfer-001",
            ),
            correlation_id="audit-trail-dep-9921",
        )
        print("Deposit of $15,000.00 made")

        # Build the causation tree for the correlation ID
        tree = domain.event_store.store.build_causation_tree("audit-trail-dep-9921")

        if tree:
            print("\n=== Causation Tree ===")
            print(f"Root: {tree.message_type} ({tree.kind})")
            print(f"  Stream: {tree.stream}")

            def print_children(node, indent=1):
                for child in node.children:
                    prefix = "  " * indent
                    print(f"{prefix}-> {child.message_type} ({child.kind})")
                    print(f"{prefix}   Stream: {child.stream}")
                    print_children(child, indent + 1)

            print_children(tree)

        # Verify final balance
        repo = current_domain.repository_for(Account)
        account = repo.get(account_id)
        print(f"\nFinal balance: ${account.balance:.2f}")

        assert account.balance == 20000.00  # 5000 + 15000
        print("\nAll checks passed!")

Next

Chapter 17: When Things Go Wrong — Dead Letter Queues →