Skip to content

Chapter 19: The Great Catalog Import — Priority Lanes

Bookshelf has acquired the catalog of Vintage Press — 50,000 books that need to be imported. Running 50,000 AddBook commands through the normal pipeline would flood the Redis streams and starve real-time orders and browsing. We need a way to run the import without affecting production traffic.

Priority Lanes

Protean supports priority lanes — two separate processing streams:

Lane Purpose Priority Levels
Primary Normal production traffic NORMAL, HIGH, CRITICAL
Backfill Bulk operations, migrations BULK, LOW

The server always drains the primary lane first. Backfill messages are only processed when the primary lane is empty. This guarantees that production traffic is never delayed by bulk operations.

Configuration

Enable priority lanes in domain.toml:

[server]
priority_lanes = true

The Import Script

# import_vintage_press.py
def import_catalog(csv_path: str):
    """Import the Vintage Press catalog with BULK priority."""
    with domain.domain_context():
        with processing_priority(Priority.BULK):
            with open(csv_path) as f:
                reader = csv.DictReader(f)
                for i, row in enumerate(reader, 1):
                    domain.process(
                        AddBook(
                            title=row["title"],
                            author=row["author"],
                            isbn=row.get("isbn", ""),
                            price_amount=float(row["price"]),
                        )
                    )
                    if i % 10000 == 0:
                        print(f"Progress: {i:,} books imported...")

            print(f"Import complete: {i:,} books imported with BULK priority.")


if __name__ == "__main__":
    import sys

    import_catalog(sys.argv[1])

The processing_priority(Priority.BULK) context manager tags all commands dispatched within it as bulk priority. These flow to the backfill stream instead of the primary stream.

You can also set priority per command:

domain.process(
    AddBook(title="...", author="...", price_amount=9.99),
    priority=Priority.BULK,
)

Running the Import

Start the import in one terminal while the server processes both lanes:

# Terminal 1 — server (processes both lanes)
$ protean server --domain bookshelf

# Terminal 2 — run the bulk import
$ python import_vintage_press.py
Importing 50,000 books with BULK priority...
Progress: 10,000 / 50,000
Progress: 20,000 / 50,000
...

# Terminal 3 — production traffic continues normally
$ curl -X POST http://localhost:8000/orders ...   # instant response

Monitoring the Import

Check the backfill lane progress:

$ protean subscriptions status --domain bookshelf
Subscription              Stream                    Lag (primary)  Lag (backfill)
BookCommandHandler        bookshelf::book:command   0              32,451
BookCatalogProjector      bookshelf::book           0              28,109
...

The primary lane stays at zero lag — production traffic is unaffected. The backfill lane processes the import at its own pace.

Priority Levels

Level Value Use Case
BULK -100 Data migrations, large imports
LOW -50 Background tasks, non-urgent updates
NORMAL 0 Standard production traffic (default)
HIGH 50 Time-sensitive operations
CRITICAL 100 System-critical operations

What We Built

  • Priority lanes for isolating bulk operations from production traffic.
  • A bulk import script using processing_priority(Priority.BULK).
  • Understanding of how the server prioritizes primary over backfill lanes.

Part IV is complete! The bookstore now has full production operations: message tracing, dead-letter queue management, monitoring, and bulk import support. In the next chapter, we will tackle advanced patterns starting with process managers.

Full Source

import csv

from protean import Domain
from protean.fields import Float, String, Text
from protean.utils.processing import Priority, processing_priority

domain = Domain("bookshelf")


@domain.aggregate
class Book:
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    isbn: String(max_length=13)
    price: Float(default=0.0)
    description: Text()


@domain.command(part_of=Book)
class AddBook:
    title: String(max_length=200, required=True)
    author: String(max_length=150, required=True)
    isbn: String(max_length=13)
    price_amount: Float(required=True)
    description: Text()


domain.init(traverse=False)


# import_vintage_press.py
def import_catalog(csv_path: str):
    """Import the Vintage Press catalog with BULK priority."""
    with domain.domain_context():
        with processing_priority(Priority.BULK):
            with open(csv_path) as f:
                reader = csv.DictReader(f)
                for i, row in enumerate(reader, 1):
                    domain.process(
                        AddBook(
                            title=row["title"],
                            author=row["author"],
                            isbn=row.get("isbn", ""),
                            price_amount=float(row["price"]),
                        )
                    )
                    if i % 10000 == 0:
                        print(f"Progress: {i:,} books imported...")

            print(f"Import complete: {i:,} books imported with BULK priority.")


if __name__ == "__main__":
    import sys

    import_catalog(sys.argv[1])

Next

Chapter 20: Orchestrating Multi-Step Workflows →