Skip to content

Chapter 19: The Great Migration — Priority Lanes

Fidelis acquires another bank and must migrate 2 million historical accounts. Running all 2 million OpenAccount commands through the normal pipeline would starve real-time production traffic — deposits and withdrawals from existing customers would queue behind millions of migration events.

Priority lanes solve this by separating production traffic from bulk operations.

The Problem

Without priority lanes, the migration floods the event stream:

Redis Stream (fidelis::account):
  [migration] [migration] [migration] [deposit] [migration] [migration] ...

A customer trying to deposit money waits behind thousands of migration commands. Response times spike. The support team gets angry calls.

Configuring Priority Lanes

Add to domain.toml:

[server.priority_lanes]
enabled = true
threshold = 0          # Events with priority < 0 go to backfill
backfill_suffix = "backfill"

With this configuration, Protean creates two streams per category:

  • Primary: fidelis::account — production traffic
  • Backfill: fidelis::account:backfill — bulk/migration traffic

The StreamSubscription always drains the primary stream first. Backfill events are processed only when the primary is empty.

The Priority Enum

from protean.utils.processing import Priority

# Priority.BULK     = -100
# Priority.LOW      = -50
# Priority.NORMAL   =  0   (default)
# Priority.HIGH     =  50
# Priority.CRITICAL =  100

Events with priority below the threshold (default: 0) are routed to the backfill stream. Everything else goes to the primary stream.

Running the Migration

Wrap bulk operations in the processing_priority() context manager:

from protean.utils.processing import processing_priority, Priority


def migrate_legacy_accounts(legacy_accounts: list[dict]) -> None:
    with processing_priority(Priority.BULK):
        for i, legacy in enumerate(legacy_accounts):
            domain.process(OpenAccount(
                account_number=legacy["number"],
                holder_name=legacy["name"],
                opening_deposit=legacy["balance"],
            ))
            if i % 10_000 == 0:
                print(f"Migrated {i:,} accounts...")

    print(f"Migration complete: {len(legacy_accounts):,} accounts queued.")

All commands within the processing_priority(Priority.BULK) context are routed to the backfill stream. Production traffic continues flowing through the primary stream at full speed.

Nested Priorities

Contexts nest — the innermost wins:

with processing_priority(Priority.BULK):
    # These go to backfill stream
    domain.process(OpenAccount(account_number="MIGR-001", ...))

    # But a critical real-time deposit still goes to primary
    with processing_priority(Priority.CRITICAL):
        domain.process(MakeDeposit(account_id="acc-vip", amount=1_000_000))

    # Back to BULK for remaining migration
    domain.process(OpenAccount(account_number="MIGR-002", ...))

Per-Command Priority

You can also set priority on individual commands:

domain.process(
    OpenAccount(account_number="MIGR-001", ...),
    priority=Priority.BULK,
)

Monitoring the Migration

Track backfill progress with the CLI:

$ protean subscriptions status --domain=fidelis
 Handler                     Type    Stream                Lag     Pending  DLQ  Status
 AccountCommandHandler       stream  fidelis::account:cmd      0        0    -  ok
 AccountCommandHandler       stream  fidelis::...:backfill   450K       10    -  ok
 AccountSummaryProjector     stream  fidelis::account          0        0    -  ok
 AccountSummaryProjector     stream  fidelis::...:backfill   450K        0    -  ok

The backfill lag decreases over hours as the server processes migration events during quiet periods.

How the Engine Processes

The StreamSubscription polling cycle:

  1. Read from primary stream (non-blocking)
  2. If messages found → process them, go to step 1
  3. If primary is empty → read from backfill stream (blocking, 1s max)
  4. If backfill messages found → process them, go to step 1
  5. If both empty → wait and repeat

Production traffic always takes precedence. The backfill stream is only touched when production is idle.

What We Built

  • Priority lanes with primary and backfill streams.
  • processing_priority(Priority.BULK) context manager for bulk operations.
  • Per-command priority with domain.process(..., priority=...).
  • Nested priority contexts for mixed workloads.
  • Production traffic isolation during large migrations.

Part IV is complete. We have a production-grade platform with fact events, message tracing, DLQ management, monitoring, and priority lanes. In Part V, we achieve mastery over the complete system.

Full Source

"""Chapter 19: Priority Lanes

Demonstrates how to use priority lanes to separate production traffic from
bulk/migration operations.  When priority lanes are enabled in the server
configuration, events tagged with low priority are routed to a separate
"backfill" Redis Stream and processed only when the primary stream is empty.

The ``processing_priority`` context manager tags all commands processed within
its scope with the specified priority level.
"""

from protean import Domain, apply, handle, invariant
from protean.exceptions import ValidationError
from protean.fields import Float, Identifier, String
from protean.utils.globals import current_domain
from protean.utils.processing import Priority, processing_priority

domain = Domain("fidelis")


@domain.event(part_of="Account")
class AccountOpened:
    account_id: Identifier(required=True)
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.event(part_of="Account")
class DepositMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.event(part_of="Account")
class WithdrawalMade:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.aggregate(is_event_sourced=True)
class Account:
    account_number: String(max_length=20, required=True)
    holder_name: String(max_length=100, required=True)
    balance: Float(default=0.0)
    status: String(max_length=20, default="ACTIVE")

    @invariant.post
    def balance_must_not_be_negative(self):
        if self.balance is not None and self.balance < 0:
            raise ValidationError(
                {"balance": ["Insufficient funds: balance cannot be negative"]}
            )

    @classmethod
    def open(cls, account_number: str, holder_name: str, opening_deposit: float):
        account = cls._create_new()
        account.raise_(
            AccountOpened(
                account_id=str(account.id),
                account_number=account_number,
                holder_name=holder_name,
                opening_deposit=opening_deposit,
            )
        )
        return account

    def deposit(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Deposit amount must be positive"]})
        self.raise_(
            DepositMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    def withdraw(self, amount: float, reference: str = None) -> None:
        if amount <= 0:
            raise ValidationError({"amount": ["Withdrawal amount must be positive"]})
        self.raise_(
            WithdrawalMade(
                account_id=str(self.id),
                amount=amount,
                reference=reference,
            )
        )

    @apply
    def on_account_opened(self, event: AccountOpened):
        self.id = event.account_id
        self.account_number = event.account_number
        self.holder_name = event.holder_name
        self.balance = event.opening_deposit
        self.status = "ACTIVE"

    @apply
    def on_deposit_made(self, event: DepositMade):
        self.balance += event.amount

    @apply
    def on_withdrawal_made(self, event: WithdrawalMade):
        self.balance -= event.amount


@domain.command(part_of=Account)
class OpenAccount:
    account_number: String(required=True)
    holder_name: String(required=True)
    opening_deposit: Float(required=True)


@domain.command(part_of=Account)
class MakeDeposit:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command(part_of=Account)
class MakeWithdrawal:
    account_id: Identifier(required=True)
    amount: Float(required=True)
    reference: String()


@domain.command_handler(part_of=Account)
class AccountCommandHandler:
    @handle(OpenAccount)
    def handle_open_account(self, command: OpenAccount):
        account = Account.open(
            account_number=command.account_number,
            holder_name=command.holder_name,
            opening_deposit=command.opening_deposit,
        )
        current_domain.repository_for(Account).add(account)
        return str(account.id)

    @handle(MakeDeposit)
    def handle_make_deposit(self, command: MakeDeposit):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.deposit(command.amount, reference=command.reference)
        repo.add(account)

    @handle(MakeWithdrawal)
    def handle_make_withdrawal(self, command: MakeWithdrawal):
        repo = current_domain.repository_for(Account)
        account = repo.get(command.account_id)
        account.withdraw(command.amount, reference=command.reference)
        repo.add(account)


domain.init(traverse=False)
domain.config["event_processing"] = "sync"
domain.config["command_processing"] = "sync"


if __name__ == "__main__":
    with domain.domain_context():
        # Normal production traffic — uses default NORMAL priority
        account_id = domain.process(
            OpenAccount(
                account_number="ACC-001",
                holder_name="Alice Johnson",
                opening_deposit=5000.00,
            )
        )
        print(f"[NORMAL] Account opened: {account_id}")

        domain.process(
            MakeDeposit(
                account_id=account_id,
                amount=1000.00,
                reference="paycheck",
            )
        )
        print("[NORMAL] Deposit of $1,000.00 made")

        # Bulk migration — uses BULK priority so events are routed to the
        # backfill lane and processed only when the primary lane is empty
        migration_deposits = [
            {"amount": 100.00, "reference": "migration-batch-001"},
            {"amount": 200.00, "reference": "migration-batch-002"},
            {"amount": 150.00, "reference": "migration-batch-003"},
        ]

        with processing_priority(Priority.BULK):
            for item in migration_deposits:
                domain.process(
                    MakeDeposit(
                        account_id=account_id,
                        amount=item["amount"],
                        reference=item["reference"],
                    )
                )
                print(f"[BULK] Deposit of ${item['amount']:.2f} made")

        # Verify final balance
        repo = current_domain.repository_for(Account)
        account = repo.get(account_id)
        print(f"\nFinal balance: ${account.balance:.2f}")

        # 5000 + 1000 + 100 + 200 + 150 = 6450
        assert account.balance == 6450.00
        print("\nAll checks passed!")

Next

Chapter 20: Rebuilding the World →