Subscribers
Subscribers consume messages from external message brokers and other systems outside the domain boundary. They bridge external messaging infrastructure and the domain's internal handling, serving as the entry point for inter-system integration at the messaging level.
Subscribers vs. Event Handlers
Event handlers consume domain events from the internal event store and
are associated with an aggregate via part_of. Use them to react to changes
within your bounded context.
Subscribers consume raw dict payloads from external brokers and are
associated with a stream. Use them to consume messages from external
systems like payment gateways, shipping providers, or third-party webhooks.
See Event Handlers for handling domain events.
Key Facts
- Subscribers are registered with the
@domain.subscriberdecorator and must specify astreamname. - Subscribers consume raw
dictpayloads from external message brokers, not typed domain events. - Each subscriber must implement the
__call__(self, payload: dict)method, which is invoked for every message on the stream. - Subscribers are associated with a broker (defaults to
"default") and a stream (required). - In synchronous mode (
message_processing = "sync"), subscribers are invoked immediately when a message is published. In asynchronous mode (the default), they are processed by the Protean Engine. - Subscribers support error recovery through an optional
handle_errorclassmethod. - Unlike event handlers, subscribers do not use the
@handledecorator for event-type dispatch -- they receive all messages on their stream through a single__call__method.
Defining a Subscriber
Subscribers are defined with the Domain.subscriber decorator:
import logging
from protean import Domain
from protean.fields import Float, Identifier, String
logger = logging.getLogger(__name__)
domain = Domain()
domain.config["message_processing"] = "sync"
@domain.aggregate
class Payment:
order_id = Identifier(required=True)
amount = Float(required=True)
status = String(choices=["PENDING", "CONFIRMED", "FAILED"], default="PENDING")
def confirm(self):
self.status = "CONFIRMED"
@domain.subscriber(stream="payment_gateway") # (1)
class PaymentConfirmationSubscriber:
"""Consumes payment confirmation messages from an external payment gateway."""
def __call__(self, payload: dict) -> None: # (2)
order_id = payload["order_id"]
repo = domain.repository_for(Payment)
payment = repo._dao.find_by(order_id=order_id)
payment.confirm() # (3)
repo.add(payment)
domain.init(traverse=False)
if __name__ == "__main__":
with domain.domain_context():
# Create a pending payment
payment = Payment(order_id="order-123", amount=49.99)
domain.repository_for(Payment).add(payment)
# Simulate an external payment gateway publishing a confirmation
domain.brokers["default"].publish(
"payment_gateway",
{"order_id": "order-123", "transaction_id": "txn-789"},
)
# Verify payment was confirmed by the subscriber
updated = domain.repository_for(Payment).get(payment.id)
print(f"Payment status: {updated.status}")
assert updated.status == "CONFIRMED"
print("Payment confirmed successfully!")
-
@domain.subscriber(stream="payment_gateway")registers this class as a subscriber listening to the"payment_gateway"broker stream. The default broker is used since nobrokerparameter is specified. -
The
__call__method receives a rawdictpayload. The structure of the payload depends entirely on what the external system publishes. -
The subscriber loads the relevant aggregate from the repository, applies business logic, and persists the changes.
Subscriber Workflow
sequenceDiagram
autonumber
External System->>Broker: Publish Message
Broker->>Broker: Store in Stream
Note over Broker,Subscriber: Asynchronous Processing (Engine)
Broker->>Engine: Deliver Message Batch
Engine->>Subscriber: __call__(payload)
Subscriber->>Subscriber: Process Message
Subscriber->>Repository: Load/Update Aggregate
Repository-->>Subscriber:
Subscriber->>Repository: Persist Changes
Subscriber-->>Engine: Success/Failure
Engine->>Broker: Acknowledge / Nack
-
External System Publishes Message: An external system sends a message to the broker on a named stream.
-
Broker Stores Message: The broker stores the message in the stream for later consumption.
-
Engine Delivers Messages: The Protean Engine's broker subscription polls the broker for new messages in batches.
-
Engine Invokes Subscriber: For each message, the engine instantiates the subscriber and calls its
__call__method with the rawdictpayload. -
Subscriber Processes Message: The subscriber extracts data from the payload and performs business logic.
-
Load/Update Aggregate: The subscriber may load and modify aggregates through repositories.
-
Persist Changes: Modified aggregates are persisted.
-
Acknowledge/Nack: On success, the message is acknowledged. On failure, it is negatively acknowledged for potential reprocessing.
Configuration Options
stream (required)
The name of the broker stream this subscriber listens to. This must be
specified; omitting it raises an IncorrectUsageError.
@domain.subscriber(stream="external_orders")
class ExternalOrderSubscriber:
def __call__(self, payload: dict) -> None:
...
broker (optional, default: "default")
The name of the broker to use, as configured in the domain configuration. If
not specified, the subscriber uses the "default" broker.
# Uses the default broker
@domain.subscriber(stream="order_events")
class OrderSubscriber:
def __call__(self, payload: dict) -> None:
...
# Uses a specific named broker
@domain.subscriber(stream="analytics_events", broker="analytics")
class AnalyticsSubscriber:
def __call__(self, payload: dict) -> None:
...
Note
The broker name must correspond to a broker configured in your domain
configuration. If the specified broker has not been configured, Protean
will raise a ConfigurationError during domain initialization.
See Brokers for details on configuring brokers.
Processing Modes
Synchronous Processing
When message_processing is set to "sync", subscribers are invoked
immediately within the publish() call. This is useful for development,
testing, and simple applications.
# domain.toml
message_processing = "sync"
In sync mode, when a message is published to a broker stream, the broker looks up all subscribers registered for that stream and invokes them inline before returning.
Asynchronous Processing
When message_processing is set to "async" (the default), messages are
stored in the broker and processed later by the Protean Engine. This is the
recommended mode for production.
# domain.toml
message_processing = "async"
In async mode, you must run the Protean server to process subscriber messages:
protean server
The engine creates a broker subscription for each registered subscriber, managing consumer groups, message batching, and acknowledgment automatically.
Learn more in Server and Subscriptions.
Error Handling
Subscribers support custom error handling through the optional handle_error
method. This method is called when an exception occurs during message
processing, allowing you to implement specialized error handling strategies.
The handle_error Method
You can define a handle_error class method in your subscriber to handle
exceptions:
@domain.subscriber(stream="payment_gateway")
class PaymentSubscriber:
def __call__(self, payload: dict) -> None:
# Processing logic that might raise exceptions
...
@classmethod
def handle_error(cls, exc: Exception, message: dict) -> None:
"""Custom error handling for message processing failures."""
logger.error(f"Failed to process payment message: {exc}")
# Perform recovery: store for retry, notify monitoring, etc.
...
How It Works
- When an exception occurs in a subscriber's
__call__method, the Protean Engine catches it. - The engine logs detailed error information including stack traces.
- The engine calls the subscriber's
handle_errorclassmethod, passing: - The original exception that was raised
- The message
dictbeing processed when the exception occurred - After
handle_errorcompletes, processing continues with the next message. - The failed message is negatively acknowledged for potential reprocessing by the broker.
Error Handler Failures
If an exception occurs within the handle_error method itself, the Protean
Engine will catch and log that exception as well, ensuring that the message
processing pipeline continues to function. This provides an additional layer
of resilience:
@classmethod
def handle_error(cls, exc: Exception, message: dict) -> None:
try:
# Error handling logic that might itself fail
...
except Exception as error_exc:
# The engine will catch and log this secondary exception
logger.error(f"Error handler failed: {error_exc}")
# Processing continues regardless
Best Practices
- Make error handlers robust and avoid complex logic that might fail.
- Use error handlers for logging, notification, and simple recovery.
- Do not throw exceptions from error handlers unless absolutely necessary.
- Consider implementing retry logic or dead-letter patterns for persistent failures.
Subscribers vs. Event Handlers
| Aspect | Event Handler | Subscriber |
|---|---|---|
| Decorator | @domain.event_handler |
@domain.subscriber |
| Message source | Internal event store | External message broker |
| Association | part_of an aggregate |
stream on a broker |
| Payload type | Typed domain event objects | Raw dict payloads |
| Dispatch | @handle(EventClass) per event type |
Single __call__(payload) for all messages |
| Processing config | event_processing |
message_processing |
| Use case | React to domain changes within bounded context | Consume messages from external systems |
Use event handlers when you need to react to events raised by aggregates within your domain. Use subscribers when you need to integrate with external systems that publish messages to a broker.
Complete Example
Below is a comprehensive example showing two subscribers processing webhooks from different external systems -- a payment gateway and a shipping provider:
import logging
from protean import Domain
from protean.fields import Float, String
logger = logging.getLogger(__name__)
domain = Domain()
domain.config["message_processing"] = "sync"
@domain.aggregate
class Order:
customer_email = String(max_length=255, required=True)
total_amount = Float(required=True)
status = String(
choices=["PENDING", "PAID", "SHIPPED", "CANCELLED"], default="PENDING"
)
def mark_paid(self):
self.status = "PAID"
@domain.subscriber(stream="payment_gateway") # (1)
class PaymentWebhookSubscriber:
"""Processes payment webhook notifications from an external gateway.
This subscriber listens to the `payment_gateway` broker stream and updates
order status when payments are confirmed.
"""
def __call__(self, payload: dict) -> None: # (2)
order_id = payload["order_id"]
status = payload["status"]
if status == "SUCCESS":
repo = domain.repository_for(Order)
order = repo.get(order_id)
order.mark_paid()
repo.add(order)
logger.info(f"Order {order_id} marked as paid")
@classmethod
def handle_error(cls, exc: Exception, message: dict) -> None: # (3)
"""Handle processing errors gracefully."""
order_id = message.get("order_id", "unknown")
logger.error(f"Failed to process payment for order {order_id}: {exc}")
@domain.subscriber(stream="shipping_updates", broker="default") # (4)
class ShippingUpdateSubscriber:
"""Processes shipping status updates from an external logistics provider."""
def __call__(self, payload: dict) -> None:
order_id = payload["order_id"]
repo = domain.repository_for(Order)
order = repo.get(order_id)
order.status = "SHIPPED"
repo.add(order)
logger.info(f"Order {order_id} marked as shipped")
if __name__ == "__main__":
domain.init(traverse=False)
with domain.domain_context():
# Create an order
order = Order(customer_email="alice@example.com", total_amount=149.99)
domain.repository_for(Order).add(order)
print(f"Order created: {order.id} (status: {order.status})")
# Simulate external payment gateway confirming payment
domain.brokers["default"].publish(
"payment_gateway",
{
"order_id": str(order.id),
"status": "SUCCESS",
"transaction_id": "txn-42",
},
)
# Verify order is now paid
updated = domain.repository_for(Order).get(order.id)
print(f"After payment webhook: status={updated.status}")
assert updated.status == "PAID"
# Simulate external shipping provider sending an update
domain.brokers["default"].publish(
"shipping_updates",
{"order_id": str(order.id), "tracking_number": "TRACK-12345"},
)
# Verify order is now shipped
updated = domain.repository_for(Order).get(order.id)
print(f"After shipping webhook: status={updated.status}")
assert updated.status == "SHIPPED"
print("\nAll webhooks processed successfully!")
-
PaymentWebhookSubscriberlistens to the"payment_gateway"stream on the default broker. -
The
__call__method receives the raw webhook payload, extracts the order ID and status, and updates the order accordingly. -
The
handle_errorclassmethod provides custom error handling. If__call__raises an exception, this method is invoked with the exception and the original message. -
ShippingUpdateSubscriberlistens to a different stream ("shipping_updates"), demonstrating that multiple subscribers can coexist, each consuming from their own stream.