Running Data Migrations with Priority Lanes
The Problem
Every growing system eventually needs to backfill data: adding a new field to existing records, enriching records from an external source, or recalculating derived values after a business rule change. In a CQRS system with asynchronous event processing, these migrations are not simple database updates — each record change produces domain events that flow through the same pipeline as production traffic.
Consider this scenario: your e-commerce platform has been running for months.
You need to backfill a loyalty_tier field onto 500,000 existing customer
records. The migration script loads each customer, sets the tier, and saves —
producing 500,000 CustomerUpdated events.
Those events flood the same Redis Stream that handles real-time customer registrations, order placements, and payment confirmations. The Engine processes them in FIFO order, so production events are now stuck behind half a million migration events. A customer who just placed an order waits minutes (or longer) for their confirmation because the projector is busy processing backfill events.
Without priority lanes:
Production event ──┐
Migration event ──┤
Migration event ──┤──► Single Redis Stream ──► [Processed in FIFO order]
Production event ──┤ "customer" Migration events block
Migration event ──┘ production traffic
You could pause the migration, wait for production to catch up, and resume in small batches — but this is manual, error-prone, and slow. You could run the migration at 3 AM, but that only works if your traffic has a quiet window.
The root issue is that migration events and production events are treated identically, even though they have fundamentally different urgency profiles.
The Pattern
Route migration events to a separate backfill lane that the Engine processes only when production work is idle. This way, migrations run continuously at full speed without ever delaying a production event.
The principle is simple: tag migration commands with a low priority, and let the infrastructure route them accordingly.
With priority lanes:
Production events ──► Outbox(priority=0) ──► "customer" ──► [Drained first]
Non-blocking read
│
Migration events ──► Outbox(priority=-50) ──► "customer:backfill" ──► [Drained when idle]
Only when primary empty
The migration script does not need to know about streams, consumer groups, or the Engine's scheduling strategy. It only needs to mark its work as low priority. The framework handles the rest.
How Protean Supports It
Protean provides this pattern through three mechanisms:
1. Processing Priority Context
The processing_priority() context manager tags all commands processed within
a block with a specific priority. Events produced by those commands inherit the
priority in their outbox records:
from protean.utils.processing import processing_priority, Priority
with processing_priority(Priority.LOW):
domain.process(SomeCommand(...)) # Events get priority=-50
2. OutboxProcessor Routing
The OutboxProcessor checks each message's priority against a configurable
threshold (default: 0). Messages below the threshold are published to the
backfill stream (e.g., customer:backfill) instead of the primary stream
(e.g., customer).
3. StreamSubscription Draining
The Engine's StreamSubscription reads from the primary stream first (non-blocking). Only when the primary stream is empty does it fall back to the backfill stream (blocking read capped at 1 second). Production events are never blocked by backfill events.
No handler changes are needed. The same projector processes events identically regardless of which lane they arrived on.
For the full configuration and priority level reference, see Priority Lanes.
Applying the Pattern
A Complete Migration Script
Here is a migration script that backfills a loyalty_tier field on existing
customer records. The key is wrapping all domain.process() calls inside a
processing_priority(Priority.LOW) context manager.
#!/usr/bin/env python
"""migrate_loyalty_tiers.py
Backfill loyalty_tier on all existing customers based on their
total lifetime spend. Uses Priority.LOW so that events produced
by this script are routed to the backfill lane.
"""
import logging
import sys
import time
from my_app.identity.domain import identity as domain
from my_app.identity.customer import Customer, UpdateCustomerTier
from protean.utils.processing import processing_priority, Priority
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
TIER_THRESHOLDS = [
(10000, "platinum"),
(5000, "gold"),
(1000, "silver"),
(0, "bronze"),
]
def determine_tier(lifetime_spend: float) -> str:
for threshold, tier in TIER_THRESHOLDS:
if lifetime_spend >= threshold:
return tier
return "bronze"
def run_migration(batch_size: int = 100, dry_run: bool = False):
domain.init()
with domain.domain_context():
repo = domain.repository_for(Customer)
customers = repo.query.all()
total = len(customers)
logger.info(f"Found {total} customers to migrate")
if dry_run:
logger.info("Dry run mode -- no changes will be made")
return
migrated = 0
errors = 0
start_time = time.time()
# Wrap the entire migration in a LOW priority context.
# Every command processed inside this block will produce
# events tagged with priority=-50, routed to backfill.
with processing_priority(Priority.LOW):
for i, customer in enumerate(customers):
try:
tier = determine_tier(customer.lifetime_spend)
domain.process(UpdateCustomerTier(
customer_id=customer.id,
loyalty_tier=tier,
))
migrated += 1
except Exception as exc:
logger.error(
f"Failed to migrate customer {customer.id}: {exc}"
)
errors += 1
if (i + 1) % batch_size == 0:
elapsed = time.time() - start_time
rate = (i + 1) / elapsed if elapsed > 0 else 0
logger.info(
f"Progress: {i + 1}/{total} "
f"({rate:.0f} records/sec, "
f"{migrated} migrated, {errors} errors)"
)
elapsed = time.time() - start_time
logger.info(
f"Migration complete: {migrated}/{total} migrated, "
f"{errors} errors, {elapsed:.1f}s elapsed"
)
if __name__ == "__main__":
dry_run = "--dry-run" in sys.argv
run_migration(dry_run=dry_run)
Running the Script
Run a dry run first to verify the migration logic:
python migrate_loyalty_tiers.py --dry-run
Then run the migration while production is running:
python migrate_loyalty_tiers.py
Key Points
-
processing_priority(Priority.LOW)wraps alldomain.process()calls. Every event produced inside the block is tagged withpriority=-50in the outbox record. -
The OutboxProcessor reads these outbox records and checks
message.priority < threshold(default threshold is 0). Since -50 < 0, the events are published tocustomer:backfillinstead ofcustomer. -
The StreamSubscription first does a non-blocking read on
customer(primary). Only when the primary stream is empty does it fall back tocustomer:backfill. Production events are always processed first. -
No handler changes needed. The
CustomerProjectorprocesses events identically regardless of which lane they arrived on.
Monitoring During the Migration
Monitor the backfill stream length to track progress:
# Primary -- should stay near 0 during migration
redis-cli XLEN customer
# Backfill -- will grow during migration, drain after
redis-cli XLEN customer:backfill
When the backfill stream returns 0, all migration events have been processed.
If you are running the Observatory,
look for outbox.published traces with stream=customer:backfill to confirm
migration events are being routed correctly.
Anti-Patterns
Running migrations directly against the database
# BAD: Bypasses the domain model entirely
db.execute("UPDATE customers SET loyalty_tier = 'gold' WHERE ...")
This skips domain events, projections, and any downstream side effects. The
read model will not be updated. Event-sourced aggregates will have no record
of the change. Direct database writes work for truly schema-only changes, but
any migration that changes domain state should go through domain.process().
Running migrations during a maintenance window "because it's simpler"
Taking the system offline to run a migration avoids the priority lane problem
but creates a different one: downtime. For systems that need to stay available,
priority lanes let you run migrations during normal operation. The added
complexity is minimal — a single with processing_priority(Priority.LOW):
wrapper.
Parallelizing migration scripts without coordination
Running multiple migration script processes simultaneously can cause contention on the same aggregates. If two processes try to update the same customer concurrently, one will fail with an optimistic concurrency error. Run migration scripts sequentially, or partition the data so each process handles a disjoint subset.
When Not to Use
-
Small datasets. If the migration processes fewer events than the Engine can handle in a few seconds, priority lanes add no value. Just run the script and let the events flow through the normal pipeline.
-
Schema-only migrations. If you are adding a column, changing an index, or altering table structure without changing domain state, use database migration tools directly. Priority lanes are for domain-level data changes that produce events.
-
Migrations requiring strict ordering with production. If migration events must be interleaved with production events in a specific order, priority lanes will break that ordering. The two-lane system intentionally allows production events to jump ahead of migration events.
Summary
| Aspect | Without Priority Lanes | With Priority Lanes |
|---|---|---|
| Migration events | Share the production stream | Routed to a separate backfill stream |
| Production latency | Degrades proportionally to migration volume | Unaffected |
| Handler changes | None | None |
| Script changes | None | Wrap in processing_priority(Priority.LOW) |
| Monitoring | One stream to watch | Two streams to watch |
| Ordering | Global FIFO | Per-lane FIFO; production events processed first |
Related reading
Guide: Using Priority Lanes -- How to enable and configure priority lanes.
Concept: Priority Lanes -- How priority lanes work, configuration options, and ordering guarantees.
Related patterns:
- Event Versioning and Evolution -- If migration events produce events with an outdated schema, upcasters can transform them.
- One Aggregate Per Transaction -- Each migration command modifies one aggregate per transaction, consistent with this pattern.