Subscribers
DDD CQRS ES
Subscribers consume messages from external message brokers and other systems outside the domain boundary. They bridge external messaging infrastructure and the domain's internal handling, serving as the entry point for inter-system integration at the messaging level.
Subscribers vs. Event Handlers
Event handlers consume domain events from the internal event store and
are associated with an aggregate via part_of. Use them to react to changes
within your bounded context.
Subscribers consume raw dict payloads from external brokers and are
associated with a stream. Use them to consume messages from external
systems like payment gateways, shipping providers, or third-party webhooks.
See Event Handlers for handling domain events.
For background on how subscribers bridge external systems, see Subscribers concept.
Defining a Subscriber
Subscribers are defined with the Domain.subscriber decorator:
import logging
from protean import Domain
from protean.fields import Float, Identifier, String
logger = logging.getLogger(__name__)
domain = Domain()
domain.config["message_processing"] = "sync"
@domain.aggregate
class Payment:
order_id: Identifier(required=True)
amount: Float(required=True)
status: String(choices=["PENDING", "CONFIRMED", "FAILED"], default="PENDING")
def confirm(self):
self.status = "CONFIRMED"
@domain.subscriber(stream="payment_gateway") # (1)
class PaymentConfirmationSubscriber:
"""Consumes payment confirmation messages from an external payment gateway."""
def __call__(self, payload: dict) -> None: # (2)
order_id = payload["order_id"]
repo = domain.repository_for(Payment)
payment = repo.find_by(order_id=order_id)
payment.confirm() # (3)
repo.add(payment)
domain.init(traverse=False)
if __name__ == "__main__":
with domain.domain_context():
# Create a pending payment
payment = Payment(order_id="order-123", amount=49.99)
domain.repository_for(Payment).add(payment)
# Simulate an external payment gateway publishing a confirmation
domain.brokers["default"].publish(
"payment_gateway",
{"order_id": "order-123", "transaction_id": "txn-789"},
)
# Verify payment was confirmed by the subscriber
updated = domain.repository_for(Payment).get(payment.id)
print(f"Payment status: {updated.status}")
assert updated.status == "CONFIRMED"
print("Payment confirmed successfully!")
-
@domain.subscriber(stream="payment_gateway")registers this class as a subscriber listening to the"payment_gateway"broker stream. The default broker is used since nobrokerparameter is specified. -
The
__call__method receives a rawdictpayload. The structure of the payload depends entirely on what the external system publishes. -
The subscriber loads the relevant aggregate from the repository, applies business logic, and persists the changes.
Subscriber Workflow
sequenceDiagram
autonumber
External System->>Broker: Publish Message
Broker->>Broker: Store in Stream
Note over Broker,Subscriber: Asynchronous Processing (Engine)
Broker->>Engine: Deliver Message Batch
Engine->>Subscriber: __call__(payload)
Subscriber->>Subscriber: Process Message
Subscriber->>Repository: Load/Update Aggregate
Repository-->>Subscriber: Done
Subscriber->>Repository: Persist Changes
Subscriber-->>Engine: Success/Failure
Engine->>Broker: Acknowledge / Nack
-
External System Publishes Message: An external system sends a message to the broker on a named stream.
-
Broker Stores Message: The broker stores the message in the stream for later consumption.
-
Engine Delivers Messages: The Protean Engine's broker subscription polls the broker for new messages in batches.
-
Engine Invokes Subscriber: For each message, the engine instantiates the subscriber and calls its
__call__method with the rawdictpayload. -
Subscriber Processes Message: The subscriber extracts data from the payload and performs business logic.
-
Load/Update Aggregate: The subscriber may load and modify aggregates through repositories.
-
Persist Changes: Modified aggregates are persisted.
-
Acknowledge/Nack: On success, the message is acknowledged. On failure, it is negatively acknowledged for potential reprocessing.
Configuration Options
stream (required)
The name of the broker stream this subscriber listens to. This must be
specified; omitting it raises an IncorrectUsageError.
@domain.subscriber(stream="external_orders")
class ExternalOrderSubscriber:
def __call__(self, payload: dict) -> None:
...
broker (optional, default: "default")
The name of the broker to use, as configured in the domain configuration. If
not specified, the subscriber uses the "default" broker.
# Uses the default broker
@domain.subscriber(stream="order_events")
class OrderSubscriber:
def __call__(self, payload: dict) -> None:
...
# Uses a specific named broker
@domain.subscriber(stream="analytics_events", broker="analytics")
class AnalyticsSubscriber:
def __call__(self, payload: dict) -> None:
...
Note
The broker name must correspond to a broker configured in your domain
configuration. If the specified broker has not been configured, Protean
will raise a ConfigurationError during domain initialization.
See Brokers for details on configuring brokers.
Processing Modes
Synchronous Processing
When message_processing is set to "sync", subscribers are invoked
immediately within the publish() call. This is useful for development,
testing, and simple applications.
# domain.toml
message_processing = "sync"
In sync mode, when a message is published to a broker stream, the broker looks up all subscribers registered for that stream and invokes them inline before returning.
Asynchronous Processing
When message_processing is set to "async" (the default), messages are
stored in the broker and processed later by the Protean Engine. This is the
recommended mode for production.
# domain.toml
message_processing = "async"
In async mode, you must run the Protean server to process subscriber messages:
protean server
The engine creates a broker subscription for each registered subscriber, managing consumer groups, message batching, and acknowledgment automatically.
Learn more in Server and Subscriptions.
Error Handling
Subscribers support custom error handling through the optional handle_error
method. This method is called when an exception occurs during message
processing, allowing you to implement specialized error handling strategies.
The handle_error Method
You can define a handle_error class method in your subscriber to handle
exceptions:
@domain.subscriber(stream="payment_gateway")
class PaymentSubscriber:
def __call__(self, payload: dict) -> None:
# Processing logic that might raise exceptions
...
@classmethod
def handle_error(cls, exc: Exception, message: dict) -> None:
"""Custom error handling for message processing failures."""
logger.error(f"Failed to process payment message: {exc}")
# Perform recovery: store for retry, notify monitoring, etc.
...
How It Works
- When an exception occurs in a subscriber's
__call__method, the Protean Engine catches it. - The engine logs detailed error information including stack traces.
- The engine calls the subscriber's
handle_errorclassmethod, passing: - The original exception that was raised
- The message
dictbeing processed when the exception occurred - After
handle_errorcompletes, processing continues with the next message. - The failed message is negatively acknowledged for potential reprocessing by the broker.
Error Handler Failures
If an exception occurs within the handle_error method itself, the Protean
Engine will catch and log that exception as well, ensuring that the message
processing pipeline continues to function. This provides an additional layer
of resilience:
@classmethod
def handle_error(cls, exc: Exception, message: dict) -> None:
try:
# Error handling logic that might itself fail
...
except Exception as error_exc:
# The engine will catch and log this secondary exception
logger.error(f"Error handler failed: {error_exc}")
# Processing continues regardless
Best Practices
- Make error handlers robust and avoid complex logic that might fail.
- Use error handlers for logging, notification, and simple recovery.
- Do not throw exceptions from error handlers unless absolutely necessary.
- Consider implementing retry logic or dead-letter patterns for persistent failures.
Patterns
Payload Validation
Subscribers receive raw dicts from external systems. Validate shape and types before trusting the data:
@domain.subscriber(stream="payments")
class PaymentWebhookSubscriber:
REQUIRED_FIELDS = ("order_id", "status", "amount")
def __call__(self, payload: dict) -> None:
# Validate required fields
missing = [f for f in self.REQUIRED_FIELDS if f not in payload]
if missing:
logger.warning(f"Missing fields {missing}, skipping message")
return
# Validate types
if not isinstance(payload["amount"], (int, float)):
logger.warning(f"Invalid amount type: {type(payload['amount'])}")
return
# Safe to process
current_domain.process(
RecordPayment(
order_id=payload["order_id"],
status=payload["status"],
amount=payload["amount"],
)
)
Accessing Message Context
The Protean Engine sets g.message_in_context during subscriber processing,
using the same mechanism as event and command handlers. This gives subscribers
access to the broker message ID and stream name — useful for idempotency
checks, audit logging, and debugging:
from protean.utils.globals import g
@domain.subscriber(stream="orders")
class OrderSubscriber:
def __call__(self, payload: dict) -> None:
msg = g.message_in_context
# Use the broker message ID for idempotency
message_id = msg.metadata.headers.id
repo = current_domain.repository_for(ProcessedMessage)
if repo.find(message_id=message_id):
logger.info(f"Already processed {message_id}, skipping")
return
# Process the message
current_domain.process(
CreateShipment(order_id=payload["order_id"], items=payload["items"])
)
# Record that we processed this message
repo.add(ProcessedMessage(message_id=message_id))
The context Message wraps the broker metadata:
| Attribute | Description |
|---|---|
msg.metadata.headers.id |
The broker-assigned message identifier |
msg.metadata.headers.stream |
The broker stream from which the message was consumed |
msg.data |
The raw payload dict |
Because this is the same message_in_context used for domain events and
commands, any commands dispatched by the subscriber via domain.process()
automatically inherit the broker message ID as their causation_id, linking
the full trace chain back to the original external message. The
correlation_id from the source service is also preserved automatically,
stitching the causal chain across service boundaries.
The context is automatically set before __call__ is invoked and cleaned up
afterward — even if the subscriber raises an exception.
For a complete guide on how correlation and causation IDs flow across service boundaries, see Correlation and Causation IDs.
Subscribers vs. Event Handlers
| Aspect | Event Handler | Subscriber |
|---|---|---|
| Decorator | @domain.event_handler |
@domain.subscriber |
| Message source | Internal event store | External message broker |
| Association | part_of an aggregate |
stream on a broker |
| Payload type | Typed domain event objects | Raw dict payloads |
| Dispatch | @handle(EventClass) per event type |
Single __call__(payload) for all messages |
| Processing config | event_processing |
message_processing |
| Use case | React to domain changes within bounded context | Consume messages from external systems |
Use event handlers when you need to react to events raised by aggregates within your domain. Use subscribers when you need to integrate with external systems that publish messages to a broker.
Complete Example
Below is a comprehensive example showing two subscribers processing webhooks from different external systems -- a payment gateway and a shipping provider:
import logging
from protean import Domain
from protean.fields import Float, String
logger = logging.getLogger(__name__)
domain = Domain()
domain.config["message_processing"] = "sync"
@domain.aggregate
class Order:
customer_email: String(max_length=255, required=True)
total_amount: Float(required=True)
status: String(
choices=["PENDING", "PAID", "SHIPPED", "CANCELLED"], default="PENDING"
)
def mark_paid(self):
self.status = "PAID"
@domain.subscriber(stream="payment_gateway") # (1)
class PaymentWebhookSubscriber:
"""Processes payment webhook notifications from an external gateway.
This subscriber listens to the `payment_gateway` broker stream and updates
order status when payments are confirmed.
"""
def __call__(self, payload: dict) -> None: # (2)
order_id = payload["order_id"]
status = payload["status"]
if status == "SUCCESS":
repo = domain.repository_for(Order)
order = repo.get(order_id)
order.mark_paid()
repo.add(order)
logger.info(f"Order {order_id} marked as paid")
@classmethod
def handle_error(cls, exc: Exception, message: dict) -> None: # (3)
"""Handle processing errors gracefully."""
order_id = message.get("order_id", "unknown")
logger.error(f"Failed to process payment for order {order_id}: {exc}")
@domain.subscriber(stream="shipping_updates", broker="default") # (4)
class ShippingUpdateSubscriber:
"""Processes shipping status updates from an external logistics provider."""
def __call__(self, payload: dict) -> None:
order_id = payload["order_id"]
repo = domain.repository_for(Order)
order = repo.get(order_id)
order.status = "SHIPPED"
repo.add(order)
logger.info(f"Order {order_id} marked as shipped")
if __name__ == "__main__":
domain.init(traverse=False)
with domain.domain_context():
# Create an order
order = Order(customer_email="alice@example.com", total_amount=149.99)
domain.repository_for(Order).add(order)
print(f"Order created: {order.id} (status: {order.status})")
# Simulate external payment gateway confirming payment
domain.brokers["default"].publish(
"payment_gateway",
{
"order_id": str(order.id),
"status": "SUCCESS",
"transaction_id": "txn-42",
},
)
# Verify order is now paid
updated = domain.repository_for(Order).get(order.id)
print(f"After payment webhook: status={updated.status}")
assert updated.status == "PAID"
# Simulate external shipping provider sending an update
domain.brokers["default"].publish(
"shipping_updates",
{"order_id": str(order.id), "tracking_number": "TRACK-12345"},
)
# Verify order is now shipped
updated = domain.repository_for(Order).get(order.id)
print(f"After shipping webhook: status={updated.status}")
assert updated.status == "SHIPPED"
print("\nAll webhooks processed successfully!")
-
PaymentWebhookSubscriberlistens to the"payment_gateway"stream on the default broker. -
The
__call__method receives the raw webhook payload, extracts the order ID and status, and updates the order accordingly. -
The
handle_errorclassmethod provides custom error handling. If__call__raises an exception, this method is invoked with the exception and the original message. -
ShippingUpdateSubscriberlistens to a different stream ("shipping_updates"), demonstrating that multiple subscribers can coexist, each consuming from their own stream.
See also
Concept overview: Subscribers — Anti-corruption layer for external message consumption.
Patterns:
- Consuming Events from Other Domains — Patterns for integrating with external bounded contexts.
- Connecting Concepts Across Domains — Bridging domain boundaries with shared concepts.