Chapter 19: The Great Catalog Import — Priority Lanes
Bookshelf has acquired the catalog of Vintage Press — 50,000 books
that need to be imported. Running 50,000 AddBook commands through the
normal pipeline would flood the Redis streams and starve real-time
orders and browsing. We need a way to run the import without
affecting production traffic.
Priority Lanes
Protean supports priority lanes — two separate processing streams:
| Lane | Purpose | Priority Levels |
|---|---|---|
| Primary | Normal production traffic | NORMAL, HIGH, CRITICAL |
| Backfill | Bulk operations, migrations | BULK, LOW |
The server always drains the primary lane first. Backfill messages are only processed when the primary lane is empty. This guarantees that production traffic is never delayed by bulk operations.
Configuration
Enable priority lanes in domain.toml:
[server]
priority_lanes = true
The Import Script
# import_vintage_press.py
def import_catalog(csv_path: str):
"""Import the Vintage Press catalog with BULK priority."""
with domain.domain_context():
with processing_priority(Priority.BULK):
with open(csv_path) as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader, 1):
domain.process(
AddBook(
title=row["title"],
author=row["author"],
isbn=row.get("isbn", ""),
price_amount=float(row["price"]),
)
)
if i % 10000 == 0:
print(f"Progress: {i:,} books imported...")
print(f"Import complete: {i:,} books imported with BULK priority.")
if __name__ == "__main__":
import sys
import_catalog(sys.argv[1])
The processing_priority(Priority.BULK) context manager tags all
commands dispatched within it as bulk priority. These flow to the
backfill stream instead of the primary stream.
You can also set priority per command:
domain.process(
AddBook(title="...", author="...", price_amount=9.99),
priority=Priority.BULK,
)
Running the Import
Start the import in one terminal while the server processes both lanes:
# Terminal 1 — server (processes both lanes)
$ protean server --domain bookshelf
# Terminal 2 — run the bulk import
$ python import_vintage_press.py
Importing 50,000 books with BULK priority...
Progress: 10,000 / 50,000
Progress: 20,000 / 50,000
...
# Terminal 3 — production traffic continues normally
$ curl -X POST http://localhost:8000/orders ... # instant response
Monitoring the Import
Check the backfill lane progress:
$ protean subscriptions status --domain bookshelf
Subscription Stream Lag (primary) Lag (backfill)
BookCommandHandler bookshelf::book:command 0 32,451
BookCatalogProjector bookshelf::book 0 28,109
...
The primary lane stays at zero lag — production traffic is unaffected. The backfill lane processes the import at its own pace.
Priority Levels
| Level | Value | Use Case |
|---|---|---|
BULK |
-100 | Data migrations, large imports |
LOW |
-50 | Background tasks, non-urgent updates |
NORMAL |
0 | Standard production traffic (default) |
HIGH |
50 | Time-sensitive operations |
CRITICAL |
100 | System-critical operations |
What We Built
- Priority lanes for isolating bulk operations from production traffic.
- A bulk import script using
processing_priority(Priority.BULK). - Understanding of how the server prioritizes primary over backfill lanes.
Part IV is complete! The bookstore now has full production operations: message tracing, dead-letter queue management, monitoring, and bulk import support. In the next chapter, we will tackle advanced patterns starting with process managers.
Full Source
import csv
from protean import Domain
from protean.fields import Float, String, Text
from protean.utils.processing import Priority, processing_priority
domain = Domain("bookshelf")
@domain.aggregate
class Book:
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
isbn: String(max_length=13)
price: Float(default=0.0)
description: Text()
@domain.command(part_of=Book)
class AddBook:
title: String(max_length=200, required=True)
author: String(max_length=150, required=True)
isbn: String(max_length=13)
price_amount: Float(required=True)
description: Text()
domain.init(traverse=False)
# import_vintage_press.py
def import_catalog(csv_path: str):
"""Import the Vintage Press catalog with BULK priority."""
with domain.domain_context():
with processing_priority(Priority.BULK):
with open(csv_path) as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader, 1):
domain.process(
AddBook(
title=row["title"],
author=row["author"],
isbn=row.get("isbn", ""),
price_amount=float(row["price"]),
)
)
if i % 10000 == 0:
print(f"Progress: {i:,} books imported...")
print(f"Import complete: {i:,} books imported with BULK priority.")
if __name__ == "__main__":
import sys
import_catalog(sys.argv[1])