Subscription Configuration
Protean provides a flexible configuration system for subscriptions that allows you to configure settings at multiple levels. Configuration follows a priority hierarchy, with more specific settings overriding general ones.
Configuration Priority Hierarchy
Configuration is resolved using this priority order (highest to lowest):
- Handler Meta
subscription_config- Explicit configuration dict on handler - Handler Meta
subscription_profile- Profile specified on handler - Handler Meta
subscription_type- Type specified on handler - Server config handler-specific -
server.subscriptions.HandlerName - Server config defaults -
server.default_subscription_* - Profile defaults - Defaults from the resolved profile
- Hardcoded defaults - Built-in fallback values
flowchart TD
A[Handler Meta subscription_config] --> B[Handler Meta subscription_profile]
B --> C[Handler Meta subscription_type]
C --> D[Server subscriptions.HandlerName]
D --> E[Server default_subscription_*]
E --> F[Profile Defaults]
F --> G[Hardcoded Defaults]
style A fill:#e1f5fe
style B fill:#e1f5fe
style C fill:#e1f5fe
style D fill:#fff3e0
style E fill:#fff3e0
style F fill:#e8f5e9
style G fill:#fce4ec
Configuration Profiles
Profiles provide pre-configured settings optimized for common scenarios. Using profiles simplifies configuration while still allowing overrides.
Available Profiles
| Profile | Type | Description |
|---|---|---|
production |
Stream | High throughput with reliability guarantees |
fast |
Stream | Low-latency processing |
batch |
Stream | High-volume batch processing |
debug |
Stream | Development and debugging |
projection |
Event Store | Building read models |
Profile Defaults
production Profile
Optimized for production workloads with reliability guarantees:
{
"subscription_type": "stream",
"messages_per_tick": 100,
"tick_interval": 0,
"blocking_timeout_ms": 5000,
"max_retries": 3,
"retry_delay_seconds": 1,
"enable_dlq": True,
"position_update_interval": 10,
}
fast Profile
Optimized for low-latency processing:
{
"subscription_type": "stream",
"messages_per_tick": 10,
"tick_interval": 0,
"blocking_timeout_ms": 100,
"max_retries": 2,
"retry_delay_seconds": 0,
"enable_dlq": True,
"position_update_interval": 5,
}
batch Profile
Optimized for high-throughput batch processing:
{
"subscription_type": "stream",
"messages_per_tick": 500,
"tick_interval": 0,
"blocking_timeout_ms": 10000,
"max_retries": 5,
"retry_delay_seconds": 2,
"enable_dlq": True,
"position_update_interval": 50,
}
debug Profile
Optimized for development and debugging:
{
"subscription_type": "stream",
"messages_per_tick": 1,
"tick_interval": 1,
"blocking_timeout_ms": 1000,
"max_retries": 1,
"retry_delay_seconds": 0,
"enable_dlq": False,
"position_update_interval": 1,
}
projection Profile
Optimized for building read models:
{
"subscription_type": "event_store",
"messages_per_tick": 100,
"tick_interval": 0,
"position_update_interval": 10,
"enable_dlq": False, # Not supported for event_store
}
Configuration Options Reference
Common Options
| Option | Type | Default | Description |
|---|---|---|---|
subscription_type |
string | "stream" |
Subscription type: "stream" or "event_store" |
messages_per_tick |
int | 10 | Messages to process per batch |
tick_interval |
int | 1 | Seconds between polling cycles |
StreamSubscription Options
| Option | Type | Default | Description |
|---|---|---|---|
blocking_timeout_ms |
int | 5000 | Blocking read timeout in milliseconds |
max_retries |
int | 3 | Retry attempts before moving to DLQ |
retry_delay_seconds |
int | 1 | Delay between retries |
enable_dlq |
bool | true | Enable dead letter queue |
EventStoreSubscription Options
| Option | Type | Default | Description |
|---|---|---|---|
position_update_interval |
int | 10 | Messages between position writes |
origin_stream |
string | None | Filter by origin stream category |
Configuration Methods
1. Handler Decorator Options
Configure directly on the handler decorator:
@domain.event_handler(
part_of=Order,
subscription_type="stream",
subscription_profile="production",
subscription_config={
"messages_per_tick": 50,
"enable_dlq": True,
}
)
class OrderEventHandler:
@handle(OrderCreated)
def handle_order_created(self, event):
...
The options combine with priority:
subscription_configvalues override everythingsubscription_profileprovides base defaultssubscription_typesets the subscription type
2. Domain Configuration File
Configure in domain.toml:
[server]
# Default settings for all handlers
default_subscription_type = "stream"
default_subscription_profile = "production"
messages_per_tick = 100
# StreamSubscription defaults
[server.stream_subscription]
blocking_timeout_ms = 5000
max_retries = 3
retry_delay_seconds = 1
enable_dlq = true
# EventStoreSubscription defaults
[server.event_store_subscription]
position_update_interval = 10
# Handler-specific overrides
[server.subscriptions.OrderEventHandler]
profile = "fast"
messages_per_tick = 50
[server.subscriptions.InventoryProjector]
subscription_type = "event_store"
profile = "projection"
3. Environment-Specific Configuration
Use environment sections for different deployment environments:
# Development defaults
[server]
default_subscription_profile = "debug"
# Production overrides
[prod.server]
default_subscription_profile = "production"
messages_per_tick = 200
[prod.server.stream_subscription]
max_retries = 5
enable_dlq = true
# Staging overrides
[staging.server]
default_subscription_profile = "fast"
Configuration Examples
Example 1: Production Event Handler
A handler with production settings and custom batch size:
@domain.event_handler(
part_of=Order,
subscription_profile="production",
subscription_config={
"messages_per_tick": 200, # Higher throughput
}
)
class OrderEventHandler:
...
Resolved configuration:
{
"subscription_type": "stream", # From profile
"messages_per_tick": 200, # From subscription_config (override)
"blocking_timeout_ms": 5000, # From profile
"max_retries": 3, # From profile
"retry_delay_seconds": 1, # From profile
"enable_dlq": True, # From profile
}
Example 2: Low-Latency Handler
A handler optimized for fast processing:
@domain.event_handler(
part_of=Notification,
subscription_profile="fast",
)
class NotificationEventHandler:
...
Resolved configuration:
{
"subscription_type": "stream",
"messages_per_tick": 10,
"blocking_timeout_ms": 100, # Short timeout for quick response
"max_retries": 2,
"retry_delay_seconds": 0, # No delay between retries
"enable_dlq": True,
}
Example 3: Projection Handler
A projector using EventStoreSubscription:
@domain.projector(
stream_categories=["order", "inventory"],
subscription_profile="projection",
)
class OrderSummaryProjector:
...
Resolved configuration:
{
"subscription_type": "event_store", # Direct event store access
"messages_per_tick": 100,
"tick_interval": 0,
"position_update_interval": 10,
"enable_dlq": False, # Not supported
}
Example 4: Mixed Configuration
Combining domain config with handler config:
# domain.toml
[server]
default_subscription_profile = "production"
[server.subscriptions.SpecialHandler]
messages_per_tick = 500
max_retries = 10
@domain.event_handler(
part_of=Order,
subscription_config={
"enable_dlq": False, # Highest priority - overrides everything
}
)
class SpecialHandler:
...
Resolved configuration:
{
"subscription_type": "stream", # From profile (via server default)
"messages_per_tick": 500, # From server.subscriptions.SpecialHandler
"max_retries": 10, # From server.subscriptions.SpecialHandler
"enable_dlq": False, # From handler subscription_config (override)
"blocking_timeout_ms": 5000, # From profile
"retry_delay_seconds": 1, # From profile
}
Example 5: Origin Stream Filtering
Filter events by their origin stream:
@domain.event_handler(
part_of=Notification,
stream_category="order",
source_stream="user_actions", # Only events from user_actions origin
subscription_type="event_store",
)
class UserActionNotificationHandler:
"""Send notifications only for user-initiated order events."""
@handle(OrderShipped)
def notify_user(self, event):
# Only called for OrderShipped events that originated from user_actions
...
Configuration Validation
Protean validates configuration and provides helpful error messages:
Invalid Subscription Type
@domain.event_handler(subscription_type="invalid")
ConfigurationError: Unknown subscription type: 'invalid'.
Valid types are: stream, event_store
Invalid Profile
@domain.event_handler(subscription_profile="nonexistent")
ConfigurationError: Unknown subscription profile: 'nonexistent'.
Valid profiles are: production, fast, batch, debug, projection
Incompatible Options
@domain.event_handler(
subscription_type="event_store",
subscription_config={"enable_dlq": True}
)
ConfigurationError: Invalid subscription configuration:
enable_dlq is not supported for EVENT_STORE subscription type.
EventStoreSubscription handles messages without DLQ support.
Best Practices
1. Use Profiles as a Starting Point
Start with a profile and override specific options:
@domain.event_handler(
part_of=Order,
subscription_profile="production",
subscription_config={
"messages_per_tick": 50, # Adjust based on your workload
}
)
2. Configure at the Right Level
- Handler level: Handler-specific tuning
- Server config: Domain-wide defaults
- Environment config: Environment-specific overrides
3. Use debug Profile in Development
# domain.toml (development)
[server]
default_subscription_profile = "debug"
4. Monitor and Adjust
Start with conservative settings and adjust based on monitoring:
# Start conservative
subscription_config={"messages_per_tick": 10}
# Increase after monitoring shows capacity
subscription_config={"messages_per_tick": 100}
5. Enable DLQ in Production
Always enable DLQ for production workloads to capture failed messages:
subscription_config={"enable_dlq": True}
Next Steps
- Subscription Types - Deep dive into subscription types
- Running the Server - CLI options and deployment