Skip to content

Chapter 14: Connecting to the Outside World

Fidelis integrates with "PayFlow," an external payment gateway that sends webhook notifications when payments are processed. These external messages are raw JSON payloads — not typed domain events.

In this chapter we will build a subscriber that acts as an anti-corruption layer, and add enrichers to attach cross-cutting metadata to every message flowing through the system.

Subscribers: The Anti-Corruption Layer

A subscriber consumes messages from an external system and translates them into domain operations:

@domain.subscriber(stream="external::payflow")
class PayFlowWebhookSubscriber:
    """Translates PayFlow payment gateway webhooks into domain commands.

    Subscribers act as an anti-corruption layer: they receive raw dict
    payloads from an external broker stream and translate them into
    domain operations, keeping the outside world's data model out of
    the core domain.
    """

    def __call__(self, payload: dict) -> None:
        event_type = payload.get("type")

        if event_type == "payment.completed":
            account_id = payload["account_id"]
            amount = payload["amount"]
            reference = payload.get("reference", f"payflow-{payload.get('id', 'N/A')}")

            current_domain.process(
                MakeDeposit(
                    account_id=account_id,
                    amount=amount,
                    reference=reference,
                )
            )
            print(
                f"[PayFlow] Processed payment.completed: ${amount:.2f} -> {account_id}"
            )

        elif event_type == "payment.failed":
            account_id = payload.get("account_id", "unknown")
            reason = payload.get("reason", "no reason provided")
            print(f"[PayFlow] Payment failed for {account_id}: {reason}")

        else:
            print(f"[PayFlow] Ignoring unknown event type: {event_type}")

    @classmethod
    def handle_error(cls, exc: Exception, message: dict) -> None:
        """Custom error handler for PayFlow webhook processing failures."""
        print(f"[PayFlow] Error processing webhook: {exc} | Payload: {message}")

Key differences from event handlers:

Event Handlers Subscribers
Input Typed domain events Raw dict payloads
Source Internal event streams External broker streams
Interface @handle(EventType) methods __call__(self, payload)
Purpose React to domain events Translate external messages

The subscriber is the translation layer: it converts external concepts (PayFlow webhook schema) into domain concepts (Fidelis commands). The domain never sees the raw webhook format.

Error Handling

Subscribers can override handle_error for custom error handling:

@classmethod
def handle_error(cls, exc: Exception, message: dict) -> None:
    print(f"[PayFlow Error] Failed to process webhook: {exc}")

This is called when __call__ raises an exception, giving you a chance to log, alert, or transform the error before it propagates to the subscription's retry/DLQ mechanism.

Message Enrichment

Event and command enrichers add metadata that does not belong in the event payload itself — infrastructure concerns like tenant IDs, request IDs, and user context:

@domain.event_enricher
def add_tenant_context(event, aggregate) -> dict:
    """Attach tenant context to every domain event.

    Event enrichers are called during raise_() and their return values
    are merged into the event's metadata.extensions dict.  This is
    useful for multi-tenant systems where every event needs to carry
    the tenant identifier without polluting the event's payload fields.
    """
    return {"tenant_id": "fidelis-main"}
@domain.command_enricher
def add_request_context(command) -> dict:
    """Attach request context to every command.

    Command enrichers are called during domain.process() and their
    return values are merged into the command's metadata.extensions dict.
    This is ideal for tracing: attaching request IDs, user IDs, or
    other cross-cutting concerns without adding fields to every command.
    """
    return {"request_id": "req-demo-001"}

Key concepts:

  • Event enrichers receive (event, aggregate) and return a dict of metadata extensions.
  • Command enrichers receive (command,) and return a dict.
  • Enrichment data appears in metadata.extensions, not in the event fields. This keeps domain events clean.
  • Multiple enrichers execute in FIFO order, with results merged.
  • Enrichers run automatically — no code changes needed in aggregates or handlers.

When to Use Enrichers

Enrichers are for cross-cutting concerns that apply to all messages:

  • Tenant ID (multi-tenancy)
  • User ID / actor context (audit trails)
  • Request ID (request tracing)
  • Feature flags or experiment IDs

Do not put domain-specific data in enrichers. If a field is part of the business event (like amount or reference), it belongs in the event definition.

Accessing Enrichment Data

In event handlers and projectors, access enrichment data through the event's metadata:

@handle(DepositMade)
def on_deposit(self, event: DepositMade):
    tenant_id = event._metadata.extensions.get("tenant_id")
    request_id = event._metadata.extensions.get("request_id")

What We Built

  • A PayFlowWebhookSubscriber that translates external webhooks into domain commands.
  • @domain.event_enricher for attaching metadata to events.
  • @domain.command_enricher for attaching metadata to commands.
  • The anti-corruption layer pattern for external integrations.
  • metadata.extensions for cross-cutting metadata.

Part III is complete. We have evolved the system with schema changes (upcasting), performance optimization (snapshots), historical queries (temporal), and external integration (subscribers and enrichers). In Part IV, we tackle production operations — the daily reality of running an event-sourced system.

Full Source

"""Chapter 14: Connecting to the Outside World

Demonstrates how to integrate external systems with your domain using
subscribers, event enrichers, and command enrichers.  Subscribers act as
an anti-corruption layer, translating external webhook payloads into
domain commands.  Enrichers attach cross-cutting metadata to every event
or command automatically.
"""

from protean import Domain, apply, handle, invariant
from protean.exceptions import ValidationError
from protean.fields import Float, Identifier, String
from protean.utils.globals import current_domain

domain = Domain("fidelis")


@domain.event(part_of="Account")
class AccountOpened:
    account_id: Identifier(required=True)
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.event(part_of="Account")
class DepositMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class WithdrawalMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class AccountClosed:
    account_id: Identifier(required=True)
    reason: String()


@domain.aggregate(is_event_sourced=True)
class Account:
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")

    @invariant.post
    def balance_must_not_be_negative(self):
        if self.balance is not None and self.balance < 0:
            raise ValidationError(
                {"balance": ["Insufficient funds: balance cannot be negative"]}
            )

    @invariant.post
    def closed_account_must_have_zero_balance(self):
        if self.status == "CLOSED" and self.balance != 0:
            raise ValidationError(
                {"status": ["Cannot close account with non-zero balance"]}
            )

    @classmethod
    def open(cls, account_number: str, holder_name: str, opening_deposit: float):
        account = cls._create_new()
        account.raise_(
            AccountOpened(
                account_id=str(account.id),
                account_number=account_number,
                holder_name=holder_name,
                opening_deposit=opening_deposit,
            )
        )
        return account

    def deposit(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Deposit amount must be positive"]})
        self.raise_(
            DepositMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def withdraw(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
        self.raise_(
            WithdrawalMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def close(self, reason: str = None) -> None:
        self.raise_(
            AccountClosed(
                account_id=str(self.id),
                reason=reason,
            )
        )

    @apply
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        self.account_number = event.account_number
        self.holder_name = event.holder_name
        self.balance = event.opening_deposit
        self.status = "ACTIVE"

    @apply
    def on_deposit_made(self, event: DepositMade):
        self.balance += event.amount

    @apply
    def on_withdrawal_made(self, event: WithdrawalMade):
        self.balance -= event.amount

    @apply
    def on_account_closed(self, event: AccountClosed):
        self.status = "CLOSED"


@domain.command(part_of=Account)
class OpenAccount:
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.command(part_of=Account)
class MakeDeposit:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command(part_of=Account)
class MakeWithdrawal:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command(part_of=Account)
class CloseAccount:
    account_id: Identifier(required=True)
    reason: String()


@domain.command_handler(part_of=Account)
class AccountCommandHandler:
    @handle(OpenAccount)
    def handle_open_account(self, command: OpenAccount):
        account = Account.open(
            account_number=command.account_number,
            holder_name=command.holder_name,
            opening_deposit=command.opening_deposit,
        )
        current_domain.repository_for(Account).add(account)
        return str(account.id)

    @handle(MakeDeposit)
    def handle_make_deposit(self, command: MakeDeposit):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.deposit(command.amount, reference=command.reference)
        repo.add(account)

    @handle(MakeWithdrawal)
    def handle_make_withdrawal(self, command: MakeWithdrawal):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.withdraw(command.amount, reference=command.reference)
        repo.add(account)

    @handle(CloseAccount)
    def handle_close_account(self, command: CloseAccount):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.close(reason=command.reason)
        repo.add(account)


@domain.subscriber(stream="external::payflow")
class PayFlowWebhookSubscriber:
    """Translates PayFlow payment gateway webhooks into domain commands.

    Subscribers act as an anti-corruption layer: they receive raw dict
    payloads from an external broker stream and translate them into
    domain operations, keeping the outside world's data model out of
    the core domain.
    """

    def __call__(self, payload: dict) -> None:
        event_type = payload.get("type")

        if event_type == "payment.completed":
            account_id = payload["account_id"]
            amount = payload["amount"]
            reference = payload.get("reference", f"payflow-{payload.get('id', 'N/A')}")

            current_domain.process(
                MakeDeposit(
                    account_id=account_id,
                    amount=amount,
                    reference=reference,
                )
            )
            print(
                f"[PayFlow] Processed payment.completed: ${amount:.2f} -> {account_id}"
            )

        elif event_type == "payment.failed":
            account_id = payload.get("account_id", "unknown")
            reason = payload.get("reason", "no reason provided")
            print(f"[PayFlow] Payment failed for {account_id}: {reason}")

        else:
            print(f"[PayFlow] Ignoring unknown event type: {event_type}")

    @classmethod
    def handle_error(cls, exc: Exception, message: dict) -> None:
        """Custom error handler for PayFlow webhook processing failures."""
        print(f"[PayFlow] Error processing webhook: {exc} | Payload: {message}")


@domain.event_enricher
def add_tenant_context(event, aggregate) -> dict:
    """Attach tenant context to every domain event.

    Event enrichers are called during raise_() and their return values
    are merged into the event's metadata.extensions dict.  This is
    useful for multi-tenant systems where every event needs to carry
    the tenant identifier without polluting the event's payload fields.
    """
    return {"tenant_id": "fidelis-main"}


@domain.command_enricher
def add_request_context(command) -> dict:
    """Attach request context to every command.

    Command enrichers are called during domain.process() and their
    return values are merged into the command's metadata.extensions dict.
    This is ideal for tracing: attaching request IDs, user IDs, or
    other cross-cutting concerns without adding fields to every command.
    """
    return {"request_id": "req-demo-001"}


domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"


if __name__ == "__main__":
    with domain.domain_context():
        # First, open an account so we have something to deposit into
        account_id = domain.process(
            OpenAccount(
                account_number="ACC-001",
                holder_name="Alice Johnson",
                opening_deposit=1000.00,
            )
        )
        print(f"Account opened: {account_id}\n")

        # Simulate a PayFlow webhook payload for a completed payment
        webhook_payload = {
            "id": "pf-txn-42",
            "type": "payment.completed",
            "account_id": account_id,
            "amount": 250.00,
            "reference": "payflow-pf-txn-42",
        }

        # In production, the broker delivers this payload to the subscriber.
        # Here we invoke it directly to demonstrate the translation logic.
        subscriber = PayFlowWebhookSubscriber()
        subscriber(webhook_payload)

        # Simulate a failed payment webhook
        failed_payload = {
            "id": "pf-txn-43",
            "type": "payment.failed",
            "account_id": account_id,
            "reason": "insufficient funds at source",
        }
        subscriber(failed_payload)

        # Verify the deposit was processed
        repo = current_domain.repository_for(Account)
        account = repo.get(account_id)
        print(f"\nAccount balance: ${account.balance:.2f}")
        assert account.balance == 1250.00  # 1000 + 250

        # Verify enrichers attached metadata to events
        # (In a real system you'd inspect the event store messages)
        print("Event enricher registered: add_tenant_context")
        print("Command enricher registered: add_request_context")
        assert add_tenant_context in domain._event_enrichers
        assert add_request_context in domain._command_enrichers

        print("\nAll checks passed!")

Next

Chapter 15: Fact Events and the Reporting Pipeline →