Skip to content

Outbox Pattern

The outbox pattern ensures reliable message delivery by storing messages in the same database transaction as your business data, then publishing them to the message broker in a separate process. This guarantees that messages are never lost, even if the broker is temporarily unavailable.

Why Use the Outbox Pattern?

Without the outbox pattern, there's a risk of data inconsistency:

sequenceDiagram
    participant App as Application
    participant DB as Database
    participant Broker as Message Broker

    App->>DB: 1. Save order
    DB-->>App: Success
    App->>Broker: 2. Publish OrderCreated
    Note over Broker: Broker is down!
    Broker--xApp: Failed

    Note over App,Broker: Order saved but event lost!

With the outbox pattern:

sequenceDiagram
    participant App as Application
    participant DB as Database
    participant Outbox as Outbox Table
    participant OP as Outbox Processor
    participant Broker as Message Broker

    App->>DB: 1. Save order
    App->>Outbox: 2. Save event (same transaction)
    DB-->>App: Success (both committed)

    Note over OP,Broker: Later, asynchronously...

    OP->>Outbox: 3. Poll for messages
    OP->>Broker: 4. Publish OrderCreated
    Broker-->>OP: Success
    OP->>Outbox: 5. Mark as published

    Note over App,Broker: Both order and event are guaranteed!

How It Works

1. Event Storage

When you save an aggregate that raises events, Protean stores the events in the outbox table within the same transaction:

with domain.domain_context():
    order = Order.create(customer_id="123", items=[...])
    # Order raises OrderCreated event internally

    domain.repository_for(Order).add(order)
    # Transaction commits:
    # - Order saved to orders table
    # - OrderCreated saved to outbox table

2. Outbox Processing

The OutboxProcessor runs as part of the Engine, polling the outbox table and publishing messages to the configured broker:

# Simplified flow
messages = outbox_repo.find_unprocessed(limit=10)

for message in messages:
    message.start_processing(worker_id)
    broker.publish(message.stream_name, message.data)
    message.mark_published()

3. Message Consumption

StreamSubscription consumers read from the broker stream:

@domain.event_handler(part_of=Inventory, stream_category="order")
class InventoryEventHandler:
    @handle(OrderCreated)
    def reserve_stock(self, event):
        # Process the event
        ...

Configuration

Enabling the Outbox

Enable the outbox pattern in your domain configuration:

# domain.toml
enable_outbox = true

[outbox]
broker = "default"         # Which broker to publish to
messages_per_tick = 10     # Messages processed per cycle
tick_interval = 1          # Seconds between cycles

# Retry configuration
[outbox.retry]
max_attempts = 3           # Maximum retry attempts
base_delay_seconds = 60    # Initial retry delay
max_backoff_seconds = 3600 # Maximum retry delay (1 hour)
backoff_multiplier = 2     # Exponential backoff multiplier
jitter = true              # Add randomization to delays
jitter_factor = 0.25       # ±25% randomization

# Cleanup configuration
[outbox.cleanup]
published_retention_hours = 168   # Keep published messages for 7 days
abandoned_retention_hours = 720   # Keep abandoned messages for 30 days
cleanup_interval_ticks = 86400    # Cleanup every 86400 ticks

Broker Configuration

Ensure you have a broker configured for the outbox:

[brokers.default]
provider = "redis"
URI = "redis://localhost:6379/0"

Multiple Database Providers

If your domain uses multiple database providers, Protean creates an outbox processor for each one:

[databases.default]
provider = "postgresql"
database_uri = "postgresql://localhost/main"

[databases.analytics]
provider = "postgresql"
database_uri = "postgresql://localhost/analytics"

# Creates two outbox processors:
# - outbox-processor-default-to-default
# - outbox-processor-analytics-to-default

Outbox Message Lifecycle

Messages in the outbox go through several states:

stateDiagram-v2
    [*] --> PENDING: Event raised
    PENDING --> PROCESSING: Worker claims message
    PROCESSING --> PUBLISHED: Broker publish succeeds
    PROCESSING --> FAILED: Broker publish fails
    FAILED --> PENDING: Retry scheduled
    FAILED --> ABANDONED: Max retries exceeded
    PUBLISHED --> [*]: Cleanup removes
    ABANDONED --> [*]: Cleanup removes

Message States

State Description
PENDING Message waiting to be processed
PROCESSING Message claimed by a worker
PUBLISHED Successfully published to broker
FAILED Publishing failed, may be retried
ABANDONED Max retries exceeded, given up

Retry Mechanism

Failed messages are automatically retried with exponential backoff:

Attempt 1: Immediate
Attempt 2: 60 seconds later (base_delay)
Attempt 3: 120 seconds later (base_delay * 2)
Attempt 4: 240 seconds later (base_delay * 4)
... up to max_backoff_seconds

With jitter enabled (default), delays are randomized by ±25% to prevent thundering herd problems.

Retry Configuration

[outbox.retry]
max_attempts = 3           # Give up after 3 attempts
base_delay_seconds = 60    # Start with 1 minute delay
max_backoff_seconds = 3600 # Cap at 1 hour
backoff_multiplier = 2     # Double delay each attempt
jitter = true              # Add randomization
jitter_factor = 0.25       # ±25%

Message Cleanup

Old messages are automatically cleaned up based on retention settings:

[outbox.cleanup]
published_retention_hours = 168   # Remove published after 7 days
abandoned_retention_hours = 720   # Remove abandoned after 30 days
cleanup_interval_ticks = 86400    # Run cleanup daily (approx)

Cleanup removes:

  • Published messages: Successfully delivered, kept for audit trail
  • Abandoned messages: Failed after max retries

Multi-Worker Support

The outbox processor supports multiple workers processing messages concurrently. Each message is locked by a worker during processing to prevent duplicate delivery:

# Worker 1 claims message
success, result = message.start_processing("worker-1")
# success=True, message is locked

# Worker 2 tries to claim same message
success, result = message.start_processing("worker-2")
# success=False, result=ProcessingResult.ALREADY_LOCKED

Lock Behavior

  • Messages are locked when processing starts
  • Lock is released when processing completes (success or failure)
  • Stale locks (from crashed workers) are automatically released

Monitoring

The outbox processor logs important events:

DEBUG: Found 10 messages to process
DEBUG: Published to orders: abc12345...
DEBUG: Outbox batch: 10/10 processed
INFO:  Outbox cleanup: removed 100 messages (95 published, 5 abandoned)

Key Metrics to Monitor

Metric Description Action if High
Pending messages Messages waiting to be processed Increase workers or throughput
Failed messages Messages that failed publishing Check broker connectivity
Retry rate Messages being retried Investigate root cause
Processing time Time to process a batch Tune batch size

Best Practices

1. Size Your Batches Appropriately

[outbox]
messages_per_tick = 100  # Larger batches for high throughput
tick_interval = 0        # No delay between batches

2. Configure Retries Conservatively

[outbox.retry]
max_attempts = 5         # More attempts for critical messages
base_delay_seconds = 30  # Start with shorter delays

3. Monitor Abandoned Messages

Set up alerts for abandoned messages - they indicate persistent problems:

# Query for abandoned messages
abandoned = outbox_repo.find_by_status(OutboxStatus.ABANDONED)
if abandoned:
    alert("Outbox has abandoned messages!")

4. Use Appropriate Retention

Balance audit requirements with storage costs:

[outbox.cleanup]
published_retention_hours = 24    # Keep 1 day for debugging
abandoned_retention_hours = 168   # Keep 7 days for investigation

Next Steps