Subscriptions
Subscriptions are the mechanism that connects handlers to message sources. They manage the flow of messages from event stores and message brokers to your handlers, tracking position, handling errors, and ensuring reliable delivery.
What is a Subscription?
A subscription is a long-running process that:
- Polls for messages from a message source (event store or broker stream)
- Delivers messages to the appropriate handler
- Tracks position to resume from where it left off
- Handles failures with retries and error callbacks
sequenceDiagram
participant S as Subscription
participant MS as Message Source
participant H as Handler
participant E as Engine
loop Continuous Processing
S->>MS: Poll for messages
MS-->>S: Return message batch
S->>E: handle_message(handler, message)
E->>H: Process message
H-->>E: Success/Failure
E-->>S: Result
S->>S: Update position / Handle failure
end
Types of Subscriptions
Protean provides two types of subscriptions:
Handler Subscriptions
Handler subscriptions connect event handlers, command handlers, and projectors to message sources. They can use either:
- StreamSubscription: For production workloads using Redis Streams
- EventStoreSubscription: For reading directly from the event store
The SubscriptionFactory automatically selects the appropriate type based on
configuration.
Broker Subscriptions
Broker subscriptions connect subscribers to external message brokers. They are used for consuming messages from systems outside your domain:
@domain.subscriber(broker="default", stream="external_orders")
class ExternalOrderSubscriber:
@handle("OrderReceived")
def handle_order_received(self, event):
# Process external order
...
Stream Categories
Subscriptions connect handlers to stream categories, which are logical groupings of related messages (events and commands). Stream categories determine how messages are organized and routed from message sources (event stores, brokers) to handlers.
How Subscriptions Use Stream Categories
When you define a handler, it subscribes to a specific stream category:
@domain.event_handler(part_of=Order)
class OrderEventHandler:
# Subscribes to Order's stream category ("order")
...
The engine creates a subscription that:
- Polls the stream category for new messages
- Delivers messages to the handler
- Tracks the handler's position in the stream
- Handles errors and retries
Default Stream Category Subscription
Handlers automatically subscribe to their associated aggregate's stream category:
@domain.aggregate
class Order:
... # Stream category: "order"
@domain.event_handler(part_of=Order)
class OrderEventHandler:
# Automatically subscribes to "order" stream category
...
Custom Stream Category Subscription
Handlers can subscribe to a different stream category:
@domain.event_handler(part_of=Order, stream_category="all_orders")
class OrderReportHandler:
# Subscribes to "all_orders" instead of Order's default
...
Cross-Aggregate Subscriptions
Handlers can be part of one aggregate but subscribe to another aggregate's stream category for cross-aggregate coordination:
@domain.event_handler(part_of=Inventory, stream_category="order")
class InventoryEventHandler:
"""Updates inventory based on order events."""
@handle(OrderShipped)
def reduce_stock(self, event):
# React to Order events while being part of Inventory
...
For comprehensive details on stream categories, including message organization, routing patterns, event sourcing, and best practices, see the Stream Categories guide.
Subscription Lifecycle
Initialization
When a subscription starts, it performs type-specific initialization:
- StreamSubscription: Ensures consumer group exists in Redis
- EventStoreSubscription: Loads last processed position from event store
Polling Loop
Subscriptions run a continuous polling loop:
async def poll(self):
while self.keep_going and not self.engine.shutting_down:
# Get next batch of messages
messages = await self.get_next_batch_of_messages()
if messages:
# Process the batch
await self.process_batch(messages)
# Sleep between ticks (if configured)
await asyncio.sleep(self.tick_interval)
Message Processing
For each message in a batch:
- Deserialize the message
- Call
engine.handle_message(handler, message) - On success: Acknowledge the message, update position
- On failure: Retry or move to dead letter queue
Shutdown
During shutdown, subscriptions:
- Stop accepting new messages
- Complete processing of current batch
- Persist final position
- Clean up resources
Position Tracking
Subscriptions track their position to ensure messages are not reprocessed after restarts.
EventStoreSubscription Position
EventStoreSubscription stores position in the event store itself:
# Position stream name
f"position-{subscriber_name}-{stream_category}"
# Position is stored as a message
{
"data": {"position": 42},
"metadata": {"type": "Read", ...}
}
Position is updated periodically based on position_update_interval.
StreamSubscription Position
StreamSubscription uses Redis Streams' built-in consumer group tracking:
- Messages are acknowledged after successful processing
- Pending messages are tracked automatically
- Failed messages can be retried or moved to DLQ
Error Handling
Subscriptions handle errors at multiple levels:
Handler Errors
When a handler raises an exception:
- The exception is logged with full context
- The handler's
handle_errormethod is called (if defined) - The subscription continues processing
Subscription Errors
For transient errors (network issues, etc.):
- Exponential backoff between retries
- Configurable maximum retries
- Dead letter queue for persistent failures (StreamSubscription only)
Fatal Errors
For unrecoverable errors:
- The engine's exception handler is triggered
- Graceful shutdown is initiated
- Exit code indicates failure
Next Steps
- Subscription Types - Deep dive into StreamSubscription vs EventStoreSubscription
- Configuration - Configure subscriptions with profiles and options