Consuming Events from Other Domains
The Problem
Your Fulfillment domain needs to react when the Order domain places an order.
The Order domain publishes an OrderPlaced event to a message broker. Your
domain subscribes to the broker channel and receives the event.
But the event was designed by the Order domain, using the Order domain's language. It contains fields named according to the Order domain's ubiquitous language, structured according to the Order domain's aggregate boundaries, and versioned according to the Order domain's release schedule.
If your Fulfillment domain consumes this event directly:
-
Language coupling. Your code uses the Order domain's terminology. If the Order domain renames
itemstoline_items, your handler breaks. -
Schema coupling. Your domain depends on the exact structure of the external event. Field additions, removals, or type changes propagate into your domain.
-
Release coupling. When the Order domain deploys a new event version, your domain must update simultaneously -- or fail.
-
Conceptual leaking. The Order domain's
OrderPlacedevent carries data shaped for the Order context. Your Fulfillment domain needs a subset of that data, differently structured. Processing the raw external event forces Fulfillment to think in Order domain terms.
The root cause: consuming external events without translation creates coupling between bounded contexts that should be autonomous.
The Pattern
Use a subscriber at the domain boundary as an anti-corruption layer. The subscriber receives the external event, translates it into your domain's language, and dispatches an internal command or domain event.
External Domain Your Domain Boundary Your Domain
┌───────────┐ Broker ┌──────────────────┐ Command/Event ┌─────────┐
│ Order │ ──────────► │ Subscriber │ ──────────────► │ Handler │
│ domain │ OrderPlaced│ (anti-corruption) │ CreateShipment │ │
│ │ │ translates to │ (your language) │ │
└───────────┘ │ your language │ └─────────┘
└──────────────────┘
The subscriber is the only place in your domain that knows about the external event's structure. Everything downstream works with your domain's own commands and events.
How Protean Supports This
Subscribers
Protean's @domain.subscriber listens to a broker channel and receives
messages from external domains:
@domain.subscriber(channel="orders")
class OrderEventsSubscriber(BaseSubscriber):
@handle(ExternalOrderPlaced)
def on_order_placed(self, event: ExternalOrderPlaced):
# Translate external event into internal command
current_domain.process(
CreateShipment(
order_id=event.order_id,
customer_id=event.customer_id,
items=[
{"product_id": item["product_id"],
"quantity": item["quantity"]}
for item in event.items
],
shipping_priority="standard",
)
)
The subscriber:
1. Receives the external ExternalOrderPlaced event from the broker
2. Extracts the data it needs
3. Constructs an internal CreateShipment command in the Fulfillment domain's language
4. Dispatches the command for processing by the domain's own handlers
Define Your Own Event Classes
Even though the external domain published the event, your domain defines its own class to deserialize it:
# Your domain's representation of the external event
# NOT imported from the Order domain
@domain.event(part_of=Shipment)
class ExternalOrderPlaced(BaseEvent):
order_id = Identifier(required=True)
customer_id = Identifier(required=True)
items = List(required=True)
total = Float()
placed_at = DateTime()
This class mirrors the external event's structure but belongs to your domain. If the external domain adds fields you don't need, your class ignores them. If they change field names, you update this one class -- not your entire domain.
The Translation Layer
Translating to Commands
The most common pattern: translate external events into internal commands. This funnels external stimuli through your domain's normal command processing pipeline:
@domain.subscriber(channel="payments")
class PaymentEventsSubscriber(BaseSubscriber):
@handle(ExternalPaymentConfirmed)
def on_payment_confirmed(self, event: ExternalPaymentConfirmed):
current_domain.process(
MarkOrderPaid(
order_id=event.reference_id, # External field name
payment_id=event.transaction_id, # Different naming
amount=event.amount,
currency=event.currency_code, # Different naming
paid_at=event.confirmed_at, # Different naming
)
)
Notice the translation: the external domain uses reference_id, your domain
uses order_id. The external domain uses transaction_id, you use
payment_id. The subscriber maps between the two vocabularies.
Translating to Domain Events
When the external event should trigger reactive processing across your domain (not a specific command), translate it into an internal domain event:
@domain.subscriber(channel="inventory-external")
class ExternalInventorySubscriber(BaseSubscriber):
@handle(ExternalStockDepleted)
def on_stock_depleted(self, event: ExternalStockDepleted):
# Translate to internal event
repo = current_domain.repository_for(Product)
product = repo.get(event.sku)
product.raise_(ProductBecameUnavailable(
product_id=product.product_id,
reason="external_stock_depleted",
))
repo.add(product)
The internal ProductBecameUnavailable event triggers your domain's own
event handlers and projectors, all speaking your domain's language.
Filtering Irrelevant Events
Not every external event is relevant to your domain. The subscriber filters:
@domain.subscriber(channel="orders")
class OrderEventsSubscriber(BaseSubscriber):
@handle(ExternalOrderPlaced)
def on_order_placed(self, event: ExternalOrderPlaced):
# Only create shipments for physical items
physical_items = [
item for item in event.items
if item.get("type") != "digital"
]
if not physical_items:
return # Nothing for Fulfillment to do
current_domain.process(
CreateShipment(
order_id=event.order_id,
customer_id=event.customer_id,
items=physical_items,
)
)
Enriching External Data
Sometimes your domain needs more context than the external event provides. The subscriber can enrich the data from your own domain's state:
@domain.subscriber(channel="orders")
class OrderEventsSubscriber(BaseSubscriber):
@handle(ExternalOrderPlaced)
def on_order_placed(self, event: ExternalOrderPlaced):
# Enrich with data from our domain
customer_repo = current_domain.repository_for(CustomerProfile)
customer = customer_repo.get(event.customer_id)
current_domain.process(
CreateShipment(
order_id=event.order_id,
customer_id=event.customer_id,
shipping_address=customer.default_shipping_address,
shipping_priority=customer.shipping_tier,
items=event.items,
)
)
Handling External Schema Changes
When the external domain changes its event schema, only the subscriber needs to update:
Before: External event v1
class ExternalOrderPlaced(BaseEvent):
order_id = Identifier(required=True)
items = List(required=True)
total = Float()
After: External event v2 (renamed field)
class ExternalOrderPlaced(BaseEvent):
order_id = Identifier(required=True)
line_items = List(required=True) # Renamed from 'items'
subtotal = Float() # Renamed from 'total'
tax = Float()
total = Float() # Now includes tax
Only the subscriber changes:
@handle(ExternalOrderPlaced)
def on_order_placed(self, event: ExternalOrderPlaced):
# Update the field mapping
items = getattr(event, 'line_items', None) or getattr(event, 'items', [])
current_domain.process(
CreateShipment(
order_id=event.order_id,
items=items,
)
)
Your domain's CreateShipment command, Shipment aggregate, and all handlers
remain unchanged. The schema change is absorbed by the subscriber.
Error Handling
External events may be malformed, carry unexpected data, or reference entities that don't exist in your domain:
@domain.subscriber(channel="orders")
class OrderEventsSubscriber(BaseSubscriber):
@handle(ExternalOrderPlaced)
def on_order_placed(self, event: ExternalOrderPlaced):
# Validate the external data before trusting it
if not event.order_id:
logger.warning(
f"Received OrderPlaced without order_id, skipping"
)
return
if not event.items:
logger.warning(
f"Received OrderPlaced with no items for {event.order_id}"
)
return
try:
current_domain.process(
CreateShipment(
order_id=event.order_id,
customer_id=event.customer_id,
items=event.items,
)
)
except ValidationError as e:
logger.error(
f"Failed to process OrderPlaced {event.order_id}: {e}"
)
# Don't re-raise -- the external event was received,
# the error is in our translation or domain rules.
# Log for investigation rather than blocking the subscription.
Key principle: Never trust external data. Validate it at the subscriber boundary before passing it into your domain.
Anti-Patterns
Importing Event Classes from the External Domain
# Anti-pattern: importing from another domain
from order_domain.events import OrderPlaced
@domain.subscriber(channel="orders")
class OrderEventsSubscriber(BaseSubscriber):
@handle(OrderPlaced) # Direct dependency on external code
def on_order_placed(self, event: OrderPlaced):
...
This creates a code-level dependency. Your domain can't deploy without the Order domain's code. See Sharing Event Classes Across Domains for alternatives.
Processing External Events Without Translation
# Anti-pattern: using external event directly in handler
@domain.event_handler(part_of=Shipment)
class ShipmentEventHandler(BaseEventHandler):
@handle(ExternalOrderPlaced) # External event in an internal handler
def on_order_placed(self, event: ExternalOrderPlaced):
shipment = Shipment(
order_id=event.order_id,
items=event.items, # External field structure leaks in
)
...
The internal event handler now depends on the external event's structure. Use a subscriber to translate first.
Calling Back to the External Domain
# Anti-pattern: calling external API from subscriber
@handle(ExternalOrderPlaced)
def on_order_placed(self, event: ExternalOrderPlaced):
# DON'T call back to the Order domain's API
order_details = requests.get(
f"http://order-service/orders/{event.order_id}"
)
...
If the subscriber needs more data than the event carries, either request that the external domain enrich its events (see Design Events for Consumers) or maintain a local projection of the external data.
Summary
| Aspect | Direct Consumption | Subscriber Translation |
|---|---|---|
| External schema coupling | High (throughout domain) | Low (subscriber only) |
| Language alignment | External terms leak in | Your domain's terms |
| Deployment independence | Low (must deploy together) | High (subscriber absorbs changes) |
| Error handling | Scattered | Centralized in subscriber |
| Testability | External dependency in tests | Mock external, test internal |
The principle: subscribers are anti-corruption layers. They receive external events, translate them into your domain's language, and dispatch internal commands or events. Nothing downstream knows or cares that the stimulus came from outside.