Processing priorities
Priority-based event routing for production and backfill lanes. Events can be routed to different processing streams based on priority, enabling bulk migrations without disrupting real-time event processing.
See Priority Lanes for the conceptual explanation.
Priority
Bases: IntEnum
Message processing priority levels.
Higher values = higher priority. Production traffic uses the default (NORMAL = 0). Migration and bulk operations should use LOW or BULK so their events are routed to the backfill lane when priority lanes are enabled.
| ATTRIBUTE | DESCRIPTION |
|---|---|
BULK |
For bulk imports, re-indexing, and mass data operations. Events are routed to the backfill lane and processed only when no higher-priority work is pending.
|
LOW |
For background tasks, data migrations, and non-urgent processing. Routed to backfill lane.
|
NORMAL |
The default for all production traffic. Events flow through the primary lane and are processed immediately.
|
HIGH |
For expedited processing of time-sensitive operations. Processed via the primary lane with outbox priority ordering.
|
CRITICAL |
For system-critical operations like payment processing or security-related events. Highest outbox priority.
|
processing_priority
processing_priority(priority: 'Priority') -> Generator[None, None, None]
Context manager to set processing priority for all operations in scope.
All commands processed within this context will have their events tagged with the specified priority. When priority lanes are enabled, low-priority events (priority < threshold) are routed to the backfill Redis Stream and processed only when the primary stream is empty.
| PARAMETER | DESCRIPTION |
|---|---|
priority
|
A Priority enum member or integer value. Values below the configured threshold (default 0) are routed to the backfill lane.
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
None
|
Context manager does not yield a value.
TYPE::
|
Example
with processing_priority(Priority.LOW): ... domain.process(CreateCustomer(name="Migration User")) ... # Events go to backfill lane
with processing_priority(Priority.CRITICAL): ... domain.process(ProcessPayment(amount=100)) ... # Events get highest outbox priority
Note
Contexts can be nested. The innermost context wins:
with processing_priority(Priority.LOW): ... # priority is LOW here ... with processing_priority(Priority.HIGH): ... # priority is HIGH here ... # priority is LOW again
Priority is always restored after the context exits, even if an exception is raised within the block.
Source code in src/protean/utils/processing.py
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | |
current_priority
current_priority() -> int
Get the current processing priority from context.
Returns the priority set by the nearest enclosing processing_priority()
context manager, or Priority.NORMAL (0) if no context is active.
| RETURNS | DESCRIPTION |
|---|---|
int
|
The current priority value.
TYPE:
|
Example
current_priority() 0 with processing_priority(Priority.LOW): ... current_priority() -50
Source code in src/protean/utils/processing.py
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | |