Skip to content

Subscription Types

Protean provides two subscription types for connecting handlers to message sources. Each is optimized for different use cases and provides different guarantees.

Overview

Feature StreamSubscription EventStoreSubscription
Message Source Redis Streams Event Store
Delivery Pattern Push (blocking read) Poll
Consumer Groups Yes No
Horizontal Scaling Yes Limited
Dead Letter Queue Yes No
Automatic Retries Yes No
Position Tracking Redis (automatic) Event Store (manual)
Recommended For Production workloads Projections, replay, debugging

StreamSubscription

StreamSubscription uses Redis Streams with consumer groups for reliable, scalable message processing. This is the recommended subscription type for production workloads.

How It Works

sequenceDiagram
    participant App as Application
    participant DB as Database
    participant OB as Outbox
    participant OP as Outbox Processor
    participant RS as Redis Stream
    participant SS as StreamSubscription
    participant H as Handler

    App->>DB: Save aggregate + events
    App->>OB: Write to outbox (same transaction)
    OP->>OB: Poll outbox
    OP->>RS: Publish to stream
    SS->>RS: XREADGROUP (blocking)
    RS-->>SS: Messages
    SS->>H: Process message
    H-->>SS: Success
    SS->>RS: XACK (acknowledge)

Key Features

Consumer Groups

Multiple instances of your application can process messages in parallel:

Stream: orders
├── Consumer Group: OrderEventHandler
│   ├── Consumer: order-handler-host1-12345-abc123
│   ├── Consumer: order-handler-host2-12346-def456
│   └── Consumer: order-handler-host3-12347-ghi789

Each message is delivered to exactly one consumer in the group, enabling horizontal scaling.

Blocking Reads

StreamSubscription uses Redis's XREADGROUP with blocking, which means:

  • No CPU-intensive polling
  • Low latency - messages are delivered as soon as available
  • Configurable timeout for periodic maintenance
# Internally uses blocking read
messages = broker.read_blocking(
    stream=stream_category,
    consumer_group=consumer_group,
    consumer_name=consumer_name,
    timeout_ms=5000,  # Block for up to 5 seconds
    count=100,        # Read up to 100 messages
)

Dead Letter Queue

Failed messages are moved to a DLQ after exhausting retries:

Stream: orders
├── Main stream: orders (normal messages)
└── DLQ stream: orders:dlq (failed messages)

DLQ messages include metadata for debugging:

{
    "original_stream": "orders",
    "original_id": "1234567890-0",
    "consumer_group": "OrderEventHandler",
    "consumer": "order-handler-host1-12345-abc123",
    "failed_at": "2024-01-15T10:30:00Z",
    "retry_count": 3,
}

Automatic Retries

Failed messages are automatically retried with configurable delays:

@domain.event_handler(
    part_of=Order,
    subscription_config={
        "max_retries": 3,
        "retry_delay_seconds": 1,
    }
)
class OrderEventHandler:
    ...

StreamSubscription Configuration Options

Option Default Description
messages_per_tick 10 Messages to read per batch
blocking_timeout_ms 5000 Blocking read timeout in milliseconds
max_retries 3 Retry attempts before moving to DLQ
retry_delay_seconds 1 Delay between retries
enable_dlq true Whether to use dead letter queue

When to Use

Use StreamSubscription when you need:

  • Production reliability: At-least-once delivery with DLQ
  • Horizontal scaling: Multiple consumers processing in parallel
  • Retry handling: Automatic retries for transient failures
  • Low latency: Blocking reads minimize delay

EventStoreSubscription

EventStoreSubscription reads messages directly from the event store. It's designed for scenarios where you need direct access to the event stream.

How It Works

sequenceDiagram
    participant ES as Event Store
    participant ESS as EventStoreSubscription
    participant H as Handler
    participant PS as Position Store

    ESS->>PS: Load last position
    PS-->>ESS: Position: 42
    loop Polling Loop
        ESS->>ES: Read from position 43
        ES-->>ESS: Messages [43, 44, 45]
        ESS->>H: Process message 43
        H-->>ESS: Success
        ESS->>H: Process message 44
        H-->>ESS: Success
        ESS->>PS: Update position to 45
    end

Key Features

Direct Event Store Access

Reads events directly from the event store, preserving the complete event history:

messages = event_store.read(
    stream_category,
    position=current_position + 1,
    no_of_messages=messages_per_tick,
)

Position Tracking

Position is stored in the event store itself as a special message type:

# Position stream name
f"position-{handler_name}-{stream_category}"

# Stored as Read position message
{
    "data": {"position": 145},
    "metadata": {
        "type": "Read",
        "kind": "read_position",
        "origin_stream": "orders"
    }
}

Origin Stream Filtering

Filter messages based on their origin stream - useful when handling events that were generated in response to specific commands:

@domain.event_handler(
    part_of=Order,
    source_stream="manage_order",  # Only events triggered by manage_order commands
)
class OrderEventHandler:
    ...

EventStoreSubscription Configuration Options

Option Default Description
messages_per_tick 10 Messages to read per batch
tick_interval 1 Seconds between polling cycles
position_update_interval 10 Messages between position writes
origin_stream None Filter by origin stream

When to Use

Use EventStoreSubscription when you need:

  • Projections: Building read models from event history
  • Event replay: Reprocessing historical events
  • Debugging: Inspecting event flow in development
  • Single-worker scenarios: When horizontal scaling isn't needed

Production Warning

When using EventStoreSubscription in production, Protean logs a warning:

⚠️ EventStoreSubscription is being used in production.
For production workloads, consider using StreamSubscription which provides:
transactional outbox pattern, automatic retry mechanisms,
dead letter queue, and horizontal scaling with consumer groups.

This warning appears when PROTEAN_ENV, PYTHON_ENV, ENV, or ENVIRONMENT is set to production, prod, or prd.

Recommendation Summary

Scenario Recommended Type
Production event handlers StreamSubscription
Production command handlers StreamSubscription
Projections/read models EventStoreSubscription
Event replay EventStoreSubscription
Development/debugging Either (EventStoreSubscription simpler)
High-throughput processing StreamSubscription
Single-server deployment Either

Configuration Examples

Note

If you do not explicitly specify a subscription_type or subscription_profile, the defaults will be picked up from your domain configuration.

StreamSubscription Configuration

@domain.event_handler(
    part_of=Order,
    subscription_type="stream",
    subscription_config={
        "messages_per_tick": 100,
        "blocking_timeout_ms": 5000,
        "max_retries": 3,
        "retry_delay_seconds": 1,
        "enable_dlq": True,
    }
)
class OrderEventHandler:
    ...

Or using a profile:

@domain.event_handler(
    part_of=Order,
    subscription_profile="production",
)
class OrderEventHandler:
    ...

EventStoreSubscription Configuration

@domain.event_handler(
    part_of=Order,
    subscription_type="event_store",
    subscription_config={
        "messages_per_tick": 100,
        "tick_interval": 1,
        "position_update_interval": 10,
    }
)
class OrderEventHandler:
    ...

Or using a profile:

@domain.projector(
    part_of=Order,
    subscription_profile="projection",
)
class OrderProjector:
    ...

Next Steps

  • Configuration - Learn about configuration profiles and the priority hierarchy
  • Outbox Pattern - Understand how StreamSubscription uses the transactional outbox