Enrich Messages with Cross-Cutting Metadata
The Problem
Events and commands in a domain-driven system carry a business payload -- the fields that matter to the domain model. But downstream processing often needs more than the business payload. Multi-tenant applications need to know which tenant an event belongs to. Audit systems need the user who triggered the change. Distributed tracing requires request IDs that thread through every message in a chain. Feature flag evaluations may need to travel with the event so that handlers apply the same flags that were active when the event was raised.
The naive solution is to add these fields directly to every event and command class:
@domain.event(part_of=Order)
class OrderPlaced(BaseEvent):
order_id: Identifier(required=True)
customer_id: Identifier(required=True)
items: List(required=True)
total: Float()
# Cross-cutting concerns polluting the domain model
tenant_id: String(required=True)
user_id: String()
request_id: String()
feature_flags: Dict()
This creates several problems:
-
Domain model pollution. The
OrderPlacedevent now mixes business facts (order_id,items,total) with infrastructure concerns (tenant_id,request_id,feature_flags). The event's purpose -- recording what happened in the business -- is diluted by operational plumbing. -
Repetition across every event and command. Every event class needs the same cross-cutting fields. Every
raise_()call must populate them. Every command must carry them. Miss one, and downstream processing silently loses context. -
Coupling between domain logic and operational context. The aggregate's
place_order()method now needs to know about tenants, users, and feature flags. It reaches into request-scoped globals to populate fields that have nothing to do with order placement. -
Schema evolution friction. Adding a new cross-cutting field (say,
deployment_version) requires updating every event class, everyraise_()call, and every handler that reads the field. The change fans out across the entire domain. -
Testing complexity. Unit tests for business logic must now construct events with tenant IDs, user IDs, and request IDs, even when testing something as simple as "placing an order emits
OrderPlaced."
The root cause: cross-cutting metadata is being treated as domain data instead of message infrastructure.
The Pattern
Separate cross-cutting metadata from the domain model by using message
enrichment hooks. Register functions that automatically inject operational
context into every event and command, storing it in metadata.extensions
rather than in the event's business payload.
Domain Model Enrichment Layer Message Store
┌─────────────────┐ ┌──────────────────┐ ┌──────────────┐
│ OrderPlaced │ raise_() │ Event Enrichers │ │ Event Store │
│ order_id │ ──────────► │ +tenant_id │ ──────► │ payload: │
│ customer_id │ │ +user_id │ │ order_id │
│ items │ │ +request_id │ │ extensions: │
│ total │ │ │ │ tenant_id │
│ │ │ (from g context) │ │ user_id │
└─────────────────┘ └──────────────────┘ └──────────────┘
The enrichment layer sits between the domain model and the message store:
-
Enrichers are registered once during domain initialization. They are plain callables -- no base class, no decorator protocol.
-
Enrichers read from ambient context (Protean's
g, which is a thread-local/request-scoped namespace) rather than from the event payload or aggregate state. This keeps the domain model unaware of operational concerns. -
Enriched data lands in
metadata.extensions, a dict on the message's metadata that is persisted alongside headers, envelope, and domain meta. Extensions survive serialization, round-trip through the event store, and are available to every downstream handler. -
Downstream handlers access extensions via
message.metadata.extensionsorevent._metadata.extensions. They never see cross-cutting data mixed into the event's business fields.
The result: the domain model stays clean. Every event and command automatically carries the operational context that infrastructure needs. Adding a new cross-cutting concern means registering one enricher, not updating every event class.
Applying the Pattern
Setting Up Tenant Context
In a multi-tenant SaaS application, every request arrives with a tenant
identifier -- typically extracted from an authentication token, a request
header, or a subdomain. The application stores this in Protean's global
context g early in the request lifecycle.
# middleware.py -- FastAPI middleware that sets tenant context
from protean.utils.globals import g
class TenantContextMiddleware:
"""Extract tenant_id from the auth token and store it in g."""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
# Extract tenant from auth header (simplified)
headers = dict(scope.get("headers", []))
token = headers.get(b"authorization", b"").decode()
tenant_id = decode_tenant_from_token(token)
user_id = decode_user_from_token(token)
g.tenant_id = tenant_id
g.user_id = user_id
g.request_id = headers.get(
b"x-request-id", b""
).decode() or str(uuid4())
await self.app(scope, receive, send)
Registering Enrichers
With tenant context available in g, register enrichers that inject it into
every event and command. Do this during domain setup, before domain.init():
# domain.py -- Domain initialization with enrichers
from protean import Domain
from protean.utils.globals import g
domain = Domain(__file__, "SaasApp")
# --- Event enrichers ---
@domain.event_enricher
def enrich_event_with_tenant(event, aggregate):
"""Inject tenant context into every domain event."""
return {
"tenant_id": getattr(g, "tenant_id", None),
}
@domain.event_enricher
def enrich_event_with_audit(event, aggregate):
"""Inject user and request context for audit trails."""
return {
"user_id": getattr(g, "user_id", None),
"request_id": getattr(g, "request_id", None),
}
# --- Command enrichers ---
@domain.command_enricher
def enrich_command_with_tenant(command):
"""Inject tenant context into every command."""
return {
"tenant_id": getattr(g, "tenant_id", None),
}
@domain.command_enricher
def enrich_command_with_audit(command):
"""Inject user and request context into every command."""
return {
"user_id": getattr(g, "user_id", None),
"request_id": getattr(g, "request_id", None),
}
Clean Domain Model
With enrichers handling cross-cutting metadata, the domain model carries only business data:
@domain.aggregate
class Order:
order_id: Auto(identifier=True)
customer_id: Identifier(required=True)
status: String(default="draft")
items: HasMany(OrderItem)
total: Float(default=0.0)
def place(self) -> None:
"""Place the order and emit a domain event."""
self._validate_can_place()
self.status = "placed"
self.raise_(OrderPlaced(
order_id=self.order_id,
customer_id=self.customer_id,
items=[item.to_dict() for item in self.items],
total=self.total,
))
@domain.event(part_of=Order)
class OrderPlaced(BaseEvent):
order_id: Identifier(required=True)
customer_id: Identifier(required=True)
items: List(required=True)
total: Float()
# No tenant_id, no user_id, no request_id.
# Enrichers handle that.
When order.place() calls raise_(), Protean runs all registered event
enrichers. The resulting event has:
- Payload:
order_id,customer_id,items,total - Extensions:
tenant_id,user_id,request_id
Consuming Enriched Metadata
Downstream handlers access extensions from the message metadata. In async processing, the server deserializes the stored message and enriched extensions are available on the message object.
Tenant-Scoped Event Handler
@domain.event_handler(part_of=Inventory)
class InventoryEventHandler(BaseEventHandler):
@handle(OrderPlaced)
def reserve_stock(self, event: OrderPlaced):
# Access enriched metadata from the message in context
tenant_id = g.message_in_context.metadata.extensions.get("tenant_id")
repo = current_domain.repository_for(Inventory)
for item in event.items:
inventory = repo.get(item["product_id"])
inventory.reserve(
order_id=event.order_id,
quantity=item["quantity"],
tenant_id=tenant_id, # Pass to domain logic if needed
)
repo.add(inventory)
Audit-Aware Projector
@domain.projector(part_of=OrderAuditProjection)
class OrderAuditProjector(BaseProjector):
@handle(OrderPlaced)
def on_order_placed(self, event: OrderPlaced):
extensions = g.message_in_context.metadata.extensions
repo = current_domain.repository_for(OrderAuditProjection)
repo.add(OrderAuditProjection(
order_id=event.order_id,
action="placed",
performed_by=extensions.get("user_id", "unknown"),
request_id=extensions.get("request_id"),
tenant_id=extensions.get("tenant_id"),
timestamp=g.message_in_context.metadata.headers.time,
))
Conditional Enrichment
Enrichers can inspect the event or aggregate to decide what metadata to add. This is useful when different event types need different extensions:
@domain.event_enricher
def enrich_with_feature_flags(event, aggregate):
"""Attach active feature flags only for events that need them."""
# Only enrich billing-related events
if event.__class__.__name__ in ("InvoiceGenerated", "PaymentProcessed"):
return {
"feature_flags": {
"new_pricing_model": is_feature_enabled("new_pricing_model"),
"tax_v2": is_feature_enabled("tax_v2"),
}
}
return {}
Combining Enrichers with Correlation IDs
Message enrichment complements Protean's built-in
message tracing. Correlation and causation IDs live in
metadata.domain and are managed automatically. Extensions hold additional
context that the framework does not manage:
@domain.event_enricher
def enrich_with_tracing(event, aggregate):
"""Bridge domain enrichment with external distributed tracing."""
traceparent = getattr(g, "traceparent", None)
return {
"trace_id": traceparent.trace_id if traceparent else None,
"span_id": traceparent.parent_id if traceparent else None,
}
This lets you correlate domain events with infrastructure spans in tools like Jaeger, Zipkin, or Datadog, while keeping the domain model free of tracing infrastructure.
Functional Registration
The decorator form (@domain.event_enricher) is convenient when enrichers are
defined alongside the domain. For enrichers defined in separate modules or
registered conditionally, use the functional form:
# enrichers.py -- reusable enrichers for multiple domains
def tenant_enricher(event, aggregate):
return {"tenant_id": getattr(g, "tenant_id", None)}
def audit_enricher(event, aggregate):
return {
"user_id": getattr(g, "user_id", None),
"request_id": getattr(g, "request_id", None),
}
def tenant_command_enricher(command):
return {"tenant_id": getattr(g, "tenant_id", None)}
# domain.py -- register enrichers from the module
from myapp.enrichers import (
tenant_enricher,
audit_enricher,
tenant_command_enricher,
)
domain = Domain(__file__, "SaasApp")
domain.register_event_enricher(tenant_enricher)
domain.register_event_enricher(audit_enricher)
domain.register_command_enricher(tenant_command_enricher)
Testing with Enrichers
Because enrichers read from g, tests that need enriched metadata must set up
the global context. Tests that only care about business logic can ignore
enrichers entirely -- they just get empty extensions:
class TestOrderPlacement:
def test_business_logic_without_enrichment(self, test_domain):
"""Enrichers run but produce None values -- harmless."""
order = Order(
customer_id="cust-1",
total=99.99,
)
order.add_item(product_id="prod-1", quantity=2, price=49.99)
order.place()
assert order.status == "placed"
assert len(order._events) == 1
assert isinstance(order._events[0], OrderPlaced)
def test_enrichment_with_tenant_context(self, test_domain):
"""Set up g to verify enrichers populate extensions."""
g.tenant_id = "acme-corp"
g.user_id = "user-42"
g.request_id = "req-abc-123"
order = Order(
customer_id="cust-1",
total=99.99,
)
order.add_item(product_id="prod-1", quantity=2, price=49.99)
order.place()
event = order._events[0]
assert event._metadata.extensions["tenant_id"] == "acme-corp"
assert event._metadata.extensions["user_id"] == "user-42"
assert event._metadata.extensions["request_id"] == "req-abc-123"
# Business payload is clean
assert not hasattr(event, "tenant_id")
assert not hasattr(event, "user_id")
End-to-End: Multi-Tenant SaaS
Putting it all together, here is the flow for a multi-tenant order placement:
1. HTTP Request arrives
├── Middleware extracts tenant_id, user_id, request_id
└── Stores them in g
2. API endpoint calls domain.process(PlaceOrder(...))
├── Command enrichers run → extensions: {tenant_id, user_id, request_id}
├── Command is stored in event store with extensions
└── Command handler invoked
3. Handler calls order.place()
├── Aggregate raises OrderPlaced
├── Event enrichers run → extensions: {tenant_id, user_id, request_id}
└── Event stored in aggregate._events
4. UoW commits
├── Events written to event store (with extensions)
└── Outbox records created (with extensions)
5. Server picks up events
├── Event handler reads extensions from g.message_in_context
├── Projector builds tenant-scoped audit trail
└── Extensions available for filtering, routing, and observability
Anti-Patterns
Adding Tenant ID to Every Event Class
# Anti-pattern: cross-cutting fields in event payload
@domain.event(part_of=Order)
class OrderPlaced(BaseEvent):
order_id: Identifier(required=True)
tenant_id: String(required=True) # Infrastructure concern
user_id: String() # Infrastructure concern
request_id: String() # Infrastructure concern
items: List(required=True)
total: Float()
Every event class repeats the same fields. Every raise_() call must populate
them. The event schema mixes business facts with operational plumbing. When
you add a new cross-cutting field, every event class and every raise_() call
must change.
Fix: Remove cross-cutting fields from the event payload. Register enrichers
that inject them into metadata.extensions.
Accessing g Directly in Domain Logic
# Anti-pattern: aggregate reaching into request context
@domain.aggregate
class Order:
def place(self) -> None:
self.status = "placed"
self.raise_(OrderPlaced(
order_id=self.order_id,
customer_id=self.customer_id,
items=[item.to_dict() for item in self.items],
total=self.total,
tenant_id=g.tenant_id, # Aggregate knows about g
user_id=g.user_id, # Aggregate knows about request context
))
The aggregate now depends on the request context. It cannot be tested without
setting up g. It conflates domain behavior (placing an order) with
infrastructure plumbing (extracting tenant context).
Fix: Let enrichers handle g access. The aggregate's raise_() call
passes only business data. Enrichers, which are explicitly registered as
infrastructure hooks, read from g.
Building One Monolithic Enricher
# Anti-pattern: one enricher doing everything
@domain.event_enricher
def enrich_everything(event, aggregate):
return {
"tenant_id": getattr(g, "tenant_id", None),
"user_id": getattr(g, "user_id", None),
"request_id": getattr(g, "request_id", None),
"ip_address": getattr(g, "ip_address", None),
"feature_flags": get_all_feature_flags(),
"deployment_version": os.environ.get("VERSION"),
"region": os.environ.get("REGION"),
"trace_id": extract_trace_id(),
"session_id": getattr(g, "session_id", None),
}
A single enricher that returns everything is harder to test, harder to compose, and harder to disable selectively. If feature flag evaluation fails, the entire enricher fails and the event is not appended.
Fix: Separate enrichers by concern. One for tenancy, one for audit, one for feature flags, one for tracing. Each can be tested, enabled, or disabled independently.
Performing I/O in Enrichers
# Anti-pattern: database call inside an enricher
@domain.event_enricher
def enrich_with_tenant_name(event, aggregate):
# This runs for EVERY event raised
tenant = TenantRepository.get(g.tenant_id)
return {"tenant_name": tenant.name}
Enrichers run synchronously inside raise_(). A database call in an enricher
means every raise_() incurs a round-trip. If the database is slow or
unavailable, event raising fails.
Fix: Keep enrichers fast. Read only from in-memory context (g, environment
variables, cached values). If you need data from a database, load it once per
request in middleware and store it in g.
Summary
| Aspect | Payload Fields | Message Enrichment |
|---|---|---|
| Where metadata lives | Mixed into event/command fields | Separate in metadata.extensions |
| Domain model impact | Polluted with infrastructure | Clean business-only fields |
| Registration | Per-event, per-command | Once per domain, applies globally |
| Adding new context | Update every event class and raise_() call |
Register one enricher |
| Testing business logic | Must construct infrastructure fields | Enrichers produce None; irrelevant |
| Downstream access | event.tenant_id |
message.metadata.extensions["tenant_id"] |
| Persistence | Event payload (business schema) | Extensions dict (infrastructure schema) |
| Serialization | Survives round-trips | Survives round-trips |
The principle: event and command payloads carry business facts. Operational
context -- tenancy, audit, tracing, feature flags -- belongs in
metadata.extensions, injected automatically by enrichers that read from
ambient context. The domain model never knows, and never needs to know,
about the infrastructure that surrounds it.
Related reading
Patterns:
- Message Tracing -- Correlation and causation IDs for traceability.
- Consuming Events from Other Domains -- Enriching external events during translation.
Guides:
- Raising Events -- How events are raised and enriched.
- Message Tracing -- Correlation and causation IDs.