Skip to content

Processing priorities

Priority-based event routing for production and backfill lanes. Events can be routed to different processing streams based on priority, enabling bulk migrations without disrupting real-time event processing.

See Priority Lanes for the conceptual explanation.

Priority

Bases: IntEnum

Message processing priority levels.

Higher values = higher priority. Production traffic uses the default (NORMAL = 0). Migration and bulk operations should use LOW or BULK so their events are routed to the backfill lane when priority lanes are enabled.

ATTRIBUTE DESCRIPTION
BULK

For bulk imports, re-indexing, and mass data operations. Events are routed to the backfill lane and processed only when no higher-priority work is pending.

LOW

For background tasks, data migrations, and non-urgent processing. Routed to backfill lane.

NORMAL

The default for all production traffic. Events flow through the primary lane and are processed immediately.

HIGH

For expedited processing of time-sensitive operations. Processed via the primary lane with outbox priority ordering.

CRITICAL

For system-critical operations like payment processing or security-related events. Highest outbox priority.


processing_priority

processing_priority(priority: 'Priority') -> Generator[None, None, None]

Context manager to set processing priority for all operations in scope.

All commands processed within this context will have their events tagged with the specified priority. When priority lanes are enabled, low-priority events (priority < threshold) are routed to the backfill Redis Stream and processed only when the primary stream is empty.

PARAMETER DESCRIPTION
priority

A Priority enum member or integer value. Values below the configured threshold (default 0) are routed to the backfill lane.

TYPE: Priority

YIELDS DESCRIPTION
None

Context manager does not yield a value.

TYPE:: None

Example

with processing_priority(Priority.LOW): ... domain.process(CreateCustomer(name="Migration User")) ... # Events go to backfill lane

with processing_priority(Priority.CRITICAL): ... domain.process(ProcessPayment(amount=100)) ... # Events get highest outbox priority

Note

Contexts can be nested. The innermost context wins:

with processing_priority(Priority.LOW): ... # priority is LOW here ... with processing_priority(Priority.HIGH): ... # priority is HIGH here ... # priority is LOW again

Priority is always restored after the context exits, even if an exception is raised within the block.

Source code in src/protean/utils/processing.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@contextmanager
def processing_priority(priority: "Priority") -> Generator[None, None, None]:
    """Context manager to set processing priority for all operations in scope.

    All commands processed within this context will have their events tagged
    with the specified priority. When priority lanes are enabled, low-priority
    events (priority < threshold) are routed to the backfill Redis Stream
    and processed only when the primary stream is empty.

    Args:
        priority (Priority): A Priority enum member or integer value. Values below the
            configured threshold (default 0) are routed to the backfill lane.

    Yields:
        None: Context manager does not yield a value.

    Example:
        >>> with processing_priority(Priority.LOW):
        ...     domain.process(CreateCustomer(name="Migration User"))
        ...     # Events go to backfill lane

        >>> with processing_priority(Priority.CRITICAL):
        ...     domain.process(ProcessPayment(amount=100))
        ...     # Events get highest outbox priority

    Note:
        Contexts can be nested. The innermost context wins:

        >>> with processing_priority(Priority.LOW):
        ...     # priority is LOW here
        ...     with processing_priority(Priority.HIGH):
        ...         # priority is HIGH here
        ...     # priority is LOW again

        Priority is always restored after the context exits, even if an
        exception is raised within the block.
    """
    token = _processing_priority.set(int(priority))
    try:
        yield
    finally:
        _processing_priority.reset(token)

current_priority

current_priority() -> int

Get the current processing priority from context.

Returns the priority set by the nearest enclosing processing_priority() context manager, or Priority.NORMAL (0) if no context is active.

RETURNS DESCRIPTION
int

The current priority value.

TYPE: int

Example

current_priority() 0 with processing_priority(Priority.LOW): ... current_priority() -50

Source code in src/protean/utils/processing.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def current_priority() -> int:
    """Get the current processing priority from context.

    Returns the priority set by the nearest enclosing ``processing_priority()``
    context manager, or ``Priority.NORMAL`` (0) if no context is active.

    Returns:
        int: The current priority value.

    Example:
        >>> current_priority()
        0
        >>> with processing_priority(Priority.LOW):
        ...     current_priority()
        -50
    """
    return _processing_priority.get()