CloudEvents Interoperability
CQRS ES
Protean is a compliant CloudEvents v1.0 producer
and consumer. When events need to cross bounded context boundaries -- external
APIs, Kafka topics, webhooks, or other Protean domains -- to_cloudevent() and
from_cloudevent() translate between Protean's internal metadata and the
CloudEvents standard without modifying your domain model.
Internally, Protean uses its own metadata structure (headers, domain meta, envelope) optimized for DDD, event sourcing, and causal chain tracking. CloudEvents serialization is an anti-corruption layer at the boundary: your domain code stays DDD-native, while external consumers see a standards-compliant event format.
Producing CloudEvents
Any Protean Message can be serialized to a CloudEvents v1.0 JSON object:
from protean.utils.eventing import Message
# Create a message from a domain event (as usual)
message = Message.from_domain_object(event)
# Serialize to CloudEvents format
cloud_event = message.to_cloudevent()
The resulting dict is a valid CloudEvents v1.0 JSON object:
{
"specversion": "1.0",
"id": "myapp::order-abc123-1",
"type": "MyApp.OrderPlaced.v1",
"source": "https://orders.example.com",
"time": "2026-03-02T10:30:00+00:00",
"subject": "abc123",
"datacontenttype": "application/json",
"proteankind": "EVENT",
"proteancorrelationid": "a1b2c3d4e5f6...",
"proteanchecksum": "sha256...",
"sequence": "1",
"data": {
"order_id": "abc123",
"customer_id": "cust-456",
"total": 99.99
}
}
Every CloudEvents attribute is derived from existing Protean metadata -- nothing is stored redundantly.
Attribute Mapping
Required attributes
| CloudEvents | Derived from | Notes |
|---|---|---|
specversion |
Literal "1.0" |
Always CloudEvents v1.0 |
id |
metadata.headers.id |
Protean's composite message ID |
type |
metadata.headers.type |
Protean format Domain.Event.v1 -- valid per spec |
source |
source_uri config or domain name |
See Configuring source |
Optional attributes
| CloudEvents | Derived from | Notes |
|---|---|---|
time |
metadata.headers.time |
RFC 3339 / ISO 8601 string |
subject |
Parsed from stream name | Aggregate identifier |
datacontenttype |
Literal "application/json" |
Protean always uses JSON |
Protean extensions
Protean-specific metadata that has no CloudEvents equivalent is carried as
protean-namespaced extension attributes:
| Extension | Derived from | Purpose |
|---|---|---|
traceparent |
metadata.headers.traceparent |
W3C distributed trace context |
sequence |
metadata.domain.sequence_id |
Event position in aggregate |
proteansequencetype |
Inferred from sequence_id |
"Integer" (ES) or "DotNotation" (non-ES) |
proteancorrelationid |
metadata.domain.correlation_id |
Constant across causal chain |
proteancausationid |
metadata.domain.causation_id |
Parent message that caused this |
proteanchecksum |
metadata.envelope.checksum |
SHA-256 payload integrity hash |
proteankind |
metadata.domain.kind |
"EVENT" or "COMMAND" |
User-supplied extensions from message enrichers
(metadata.extensions) are merged into the top level of the CloudEvent.
Configuring source
The CloudEvents source attribute identifies the context in which the event
occurred -- in DDD terms, the bounded context. Protean derives it automatically,
but you can configure an explicit URI:
source_uri = "https://orders.example.com"
Fallback chain (when source_uri is not configured):
- Domain name →
urn:protean:<normalized_domain_name> - Stream category prefix →
urn:protean:<domain_part> - Last resort →
"urn:protean:unknown"
For production multi-domain systems, always configure source_uri explicitly
so external consumers see a meaningful, stable identifier.
Consuming CloudEvents
Parse an incoming CloudEvents JSON object into a Protean Message:
from protean.utils.eventing import Message
# In a subscriber or API endpoint
cloud_event_dict = json.loads(request.body)
message = Message.from_cloudevent(cloud_event_dict)
External events
When consuming events from a non-Protean system, the type string won't match
any registered Protean event. Access the data directly:
@domain.subscriber(stream="external-orders")
class ExternalOrderSubscriber:
def __call__(self, payload: dict) -> None:
message = Message.from_cloudevent(payload)
# Access the event data
order_id = message.data["order_id"]
# Access CloudEvents-specific attributes
source = message.metadata.extensions["ce_source"]
subject = message.metadata.extensions.get("ce_subject")
# Translate into a domain command
current_domain.process(
ImportOrder(external_id=order_id, source=source)
)
Protean-to-Protean round-trip
When two Protean services communicate via CloudEvents, the type string is in Protean format and can be resolved back to the original domain object:
message = Message.from_cloudevent(cloud_event_dict)
# If the type is registered in this domain, reconstruct the event
event = message.to_domain_object()
What gets preserved
When parsing a CloudEvent, Protean maps attributes back to their internal locations:
| CE Attribute | Protean Destination |
|---|---|
id |
headers.id |
type |
headers.type |
time |
headers.time |
source |
extensions["ce_source"] |
subject |
extensions["ce_subject"] |
traceparent |
headers.traceparent |
proteancorrelationid |
domain.correlation_id |
proteancausationid |
domain.causation_id |
proteanchecksum |
envelope.checksum |
proteankind |
domain.kind |
sequence |
domain.sequence_id |
| Unknown extensions | extensions (preserved as-is) |
Round-tripping
A CloudEvent produced by Protean can be consumed back with full fidelity:
original = Message.from_domain_object(event)
ce = original.to_cloudevent()
# ... send over the wire ...
restored = Message.from_cloudevent(ce)
assert restored.data == original.data
assert restored.metadata.headers.id == original.metadata.headers.id
assert restored.metadata.domain.correlation_id == original.metadata.domain.correlation_id
The source attribute is re-derived during the next to_cloudevent() call
(from domain config), so it is preserved in extensions["ce_source"] for
reference but not mapped to a dedicated internal field.
Validation
from_cloudevent() validates the incoming CloudEvent:
- Required attributes (
specversion,id,type,source) must be present -- raisesValueErrorif missing. - Spec version must be
"1.0"-- raisesValueErrorotherwise. - Checksum is computed from
dataif not provided viaproteanchecksum.
Related
- Message Tracing -- How
correlation_idandcausation_idflow through causal chains. - Message Enrichment -- How to attach custom metadata to events and commands.
- Consuming Events from Other Domains -- The subscriber / anti-corruption layer pattern.
- CloudEvents as a Boundary Contract -- When and why to use CloudEvents at system boundaries.