Skip to content

Events

DDD CQRS ES

Most applications have a definite state - they reflect past user input and interactions in their current state. It is advantageous to model these past changes as a series of discrete events. Domain events happen to be those activities that domain experts care about and represent what happened as-is.

In Protean, an Event is an immutable object that represents a significant occurrence or change in the domain. Events are raised by aggregates to signal that something noteworthy has happened, allowing other parts of the system to react - and sync - to these changes in a decoupled manner.

Defining Events

Event names should be descriptive and convey the specific change or occurrence in the domain clearly, ensuring that the purpose of the event is immediately understandable. Events are named as past-tense verbs to clearly indicate that an event has already occurred, such as OrderPlaced or PaymentProcessed.

You can define an event with the Domain.event decorator:

from enum import Enum

from protean import Domain
from protean.fields import Identifier, String

domain = Domain(name="Authentication")


class UserStatus(Enum):
    ACTIVE = "ACTIVE"
    ARCHIVED = "ARCHIVED"


@domain.event(part_of="User")
class UserActivated:
    user_id: Identifier(required=True)


@domain.event(part_of="User")
class UserRenamed:
    user_id: Identifier(required=True)
    name: String(required=True, max_length=50)


@domain.aggregate
class User:
    name: String(max_length=50, required=True)
    email: String(required=True)
    status: String(choices=UserStatus)

    def activate(self) -> None:
        self.status = UserStatus.ACTIVE.value
        self.raise_(UserActivated(user_id=self.id))

    def change_name(self, name: str) -> None:
        self.name = name
        self.raise_(UserRenamed(user_id=self.id, name=name))

Events are always connected to an Aggregate class, specified with the part_of param in the decorator. An exception to this rule is when the Event class has been marked Abstract.

Abstract Events

Use the abstract option to define a base event with shared fields that concrete events can inherit:

@domain.event(abstract=True)
class BaseOrderEvent:
    order_id = Identifier(required=True)
    occurred_at = DateTime(required=True)

@domain.event(part_of=Order)
class OrderPlaced(BaseOrderEvent):
    customer_name = String(max_length=100)

@domain.event(part_of=Order)
class OrderCancelled(BaseOrderEvent):
    reason = String(max_length=500)

Abstract events cannot be instantiated or raised directly. They don't require part_of since they are never emitted to a stream.


Synchronous vs Asynchronous Processing

Events in Protean can be processed either synchronously or asynchronously:

  • Synchronous processing: The event is processed immediately when raised. Event handlers are called in the same execution flow, and the operation is blocked until all event handlers have completed.
  • Asynchronous processing: The event is stored in the event store and processed later by a background worker. The operation continues without waiting for event handlers to complete.

Domain Configuration

You can configure the event processing mode through the domain configuration:

# Configure events to be processed synchronously
domain.config["event_processing"] = "sync"  # or "async"

In domain.toml:

event_processing = "sync"  # or "async"

By default, Protean sets event_processing to async in the domain configuration.

Event Processing Workflows

The workflow for event processing differs based on whether synchronous or asynchronous mode is used:

Synchronous Event Flow

sequenceDiagram
    autonumber
    Aggregate->>Aggregate: Raise event
    Aggregate->>Event Store: Store event (asynchronous=False)
    Aggregate->>Domain: Process event immediately
    Domain->>Event Handler: Process event
    Event Handler->>Event Handler: Handle event
    Event Handler-->>Domain: Return result
    Domain-->>Aggregate: Continue execution

Asynchronous Event Flow

sequenceDiagram
    autonumber
    Aggregate->>Aggregate: Raise event
    Aggregate->>Event Store: Store event (asynchronous=True)
    Aggregate-->>Client: Continue execution immediately

    Note over Protean Server: Later, asynchronously...

    Protean Server->>Event Store: Poll for unprocessed events
    Event Store-->>Protean Server: Return event
    Protean Server->>Event Handler: Process event
    Event Handler->>Event Handler: Handle event
    Protean Server->>Event Store: Update processed position

How Asynchronous Processing Works

Asynchronous event processing in Protean uses the server/engine component that:

  1. Creates subscriptions for event handlers to listen to their respective event streams
  2. Polls the event store for new events that haven't been processed yet
  3. Dispatches those events to the appropriate handlers

To run the Protean server for processing asynchronous events, use the CLI:

protean server --domain path/to/domain.py

See CLI documentation for more details about the server command and other available CLI options.

The server continually polls the event store for new events that have the asynchronous flag set to True in their metadata. When found, it dispatches them to the appropriate handlers, keeping track of processed events to avoid duplicate processing.

When to use each mode

  • Synchronous processing is useful when:
  • You need immediate consistency between different parts of your system
  • Event handlers perform essential operations that must complete before continuing
  • The operation is part of a transaction that needs to complete atomically

  • Asynchronous processing is beneficial when:

  • You want to improve system responsiveness by not blocking the execution flow
  • Event handlers might take a long time to process
  • You want to distribute load across background workers
  • You're implementing event-driven or reactive architectures

Relationship with Command Processing

Protean offers similar configuration options for commands through: - The command_processing domain configuration setting - The ability to specify the asynchronous parameter when processing commands

Both events and commands in Protean follow similar processing patterns, enabling you to build consistent, predictable workflows. You can configure both to suit your specific domain needs:

# Domain-wide configuration
domain.config["event_processing"] = "async"   # or "sync"
domain.config["command_processing"] = "sync"  # or "async"

This flexibility allows you to implement various architectural patterns like CQRS, Event Sourcing, and Event-Driven Architecture within your Protean applications.


Event Structure

An event is made of three parts:

Headers

trace_id

The trace_id is a unique identifier of UUID format, that connects all processing originating from a request. Trace IDs provide a detailed view of the request's journey through the system. It helps in understanding the complete flow of a request, showing each service interaction, the time taken, and where any delays occur.

Metadata

An event's metadata provides additional context about the event.

Sample metadata from an event:

{
    "id": "test::user-411b2ceb-9513-45d7-9e03-bbc0846fae93-0",
    "type": "Test.UserLoggedIn.v1",
    "fqn": "tests.event.test_event_metadata.UserLoggedIn",
    "kind": "EVENT",
    "stream": "test::user-411b2ceb-9513-45d7-9e03-bbc0846fae93",
    "origin_stream": null,
    "timestamp": "2024-08-16 15:30:27.977101+00:00",
    "version": 1,
    "sequence_id": "0",
    "asynchronous": true,
    "correlation_id": "a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6",
    "causation_id": "test::user:command-411b2ceb-9513-45d7-9e03-bbc0846fae93"
}

id

The unique identifier of the event. The event ID is a structured string, of the format <domain-name>::<aggregate-name>-<aggregate-id>-<sequence_id>.

The id value is simply an extension of the event's stream combined with the sequence_id. Read the section on sequence_id to understand possible values.

type

Type of the event. Format is <domain-name>.<event-class-name>.<event-version>. For e.g. Shipping.OrderShipped.v1.

fqn

Internal. The fully qualified name of the event. This is used by Protean to resconstruct objects from messages.

kind

Internal. Represents the kind of object enclosed in an event store message. Value is EVENT for Events. Metadata class is shared between Events and Commands, so possible values are EVENT and COMMAND.

stream

Name of the event stream. E.g. Stream auth::user-1234 encloses messages related to User aggregate in the Auth domain with identity 1234.

The stream name follows the pattern <domain>::<stream_category>-<aggregate_id>, where:

  • domain is the normalized domain name
  • stream_category is the aggregate's stream category
  • aggregate_id is the unique identifier of the aggregate instance

All events for a specific aggregate instance are stored in its dedicated stream, enabling event sourcing and message ordering guarantees.

See the Stream Categories guide for comprehensive details on how stream categories organize and route messages.

origin_stream

Name of the stream that originated this event or command. origin_stream comes handy when correlating related events or understanding causality.

timestamp

The timestamp of event generation in ISO 8601 format.

version

The version of the event class used to generate the event.

sequence_id

The sequence ID is the version of the aggregate when the event was generated, along with the sequence number of the event within the update.

For example, if the aggregate was updated twice, the first update would have a sequence ID of 1.1, and the second update would have a sequence ID of 2.1. If the next update generated two events, then the sequence ID of the second event would be 3.2.

If the aggregate is event-sourced, the sequence_id is a single integer of the position of the event in its stream.

asynchronous

A boolean flag indicating whether the event should be processed asynchronously (true) or synchronously (false). This is determined by the domain's event_processing configuration. When true, the event is stored and processed later by the Protean server; when false, event handlers are invoked immediately during the same execution flow.

correlation_id

A string identifier that is constant across an entire causal chain of messages. Every command and event spawned from a single business operation shares the same correlation_id. It answers: "Which business operation does this message belong to?"

The correlation_id is typically generated at the earliest entry point -- often the API gateway or frontend -- and passed into domain.process(). If no external ID is provided, Protean auto-generates one (UUID4 hex, 32 characters).

# Pass an external correlation ID
domain.process(
    PlaceOrder(customer_id="cust-123"),
    correlation_id=request.headers["X-Correlation-ID"],
)

See Message Tracing for details on how correlation IDs propagate through the system.

causation_id

The message ID (headers.id) of the immediate parent message that caused this one. It answers: "What directly triggered this message?"

For example, when a command handler raises an event, the event's causation_id is the command's headers.id. When an event handler dispatches a new command, that command's causation_id is the event's headers.id. Root messages (the first command in a chain) have causation_id = None.

Together with correlation_id, causation_id lets you reconstruct the full causation tree for any business operation.

Payload

The payload is a dictionary of key-value pairs that convey the information about the event.

The payload is made available as the body of the event, which also includes the event metadata. If you want to extract just the payload, you can use the payload property of the event.

In [1]: user = User(id="1", email="<EMAIL>", name="<NAME>")

In [2]: user.login()

In [3]: event = user._events[0]

In [4]: event
Out[4]: <UserLoggedIn: UserLoggedIn object ({'_metadata': {'id': '002::user-1-0.1', 'type': '002.UserLoggedIn.v1', 'fqn': '002.UserLoggedIn', 'kind': 'EVENT', 'stream': '002::user-1', 'origin_stream': None, 'timestamp': '2024-07-18 22:02:32.522360+00:00', 'version': 1, 'sequence_id': '0.1'}, 'user_id': '1'})>

In [5]: event.to_dict()
Out[5]:
{'_metadata': {'id': '002::user-1-0.1',
  'type': '002.UserLoggedIn.v1',
  'fqn': '002.UserLoggedIn',
  'kind': 'EVENT',
  'stream': '002::user-1',
  'origin_stream': None,
  'timestamp': '2024-07-18 22:02:32.522360+00:00',
  'version': 1,
  'sequence_id': '0.1'},
 'user_id': '1'}

In [6]: event.payload
Out[6]: {'user_id': '1'}

Versioning

Because events serve as API contracts of an aggregate with the rest of the ecosystem, they are versioned to signal changes to contract.

Events have a default version of 1.

You can override and customize the version with the __version__ class attribute:

@domain.event(part_of=User)
class UserActivated:
    __version__ = 2

    user_id: Identifier(required=True)
    activated_at: DateTime(required=True)

When event schemas evolve (fields renamed, new required fields, changed structure), you can register upcasters that transform old event payloads to the current schema during deserialization. See the Event Upcasting guide for details, and the Event Versioning and Evolution pattern for broader versioning strategies.

The configured version is reflected in version and type attributes of the generated event's metadata:

import json
from datetime import datetime, timezone

from protean import Domain
from protean.fields import DateTime, Identifier, String

domain = Domain(__name__, name="Authentication")


@domain.aggregate
class User:
    id: Identifier(identifier=True)
    email: String()
    name: String()
    status: String(choices=["INACTIVE", "ACTIVE", "ARCHIVED"], default="INACTIVE")

    def login(self):
        self.raise_(UserLoggedIn(user_id=self.id))

    def activate(self):
        self.status = "ACTIVE"
        self.raise_(UserActivated(user_id=self.id))


@domain.event(part_of="User")
class UserLoggedIn:
    user_id: Identifier(identifier=True)


@domain.event(part_of="User")
class UserActivated:
    __version__ = 2

    user_id: Identifier(required=True)
    activated_at: DateTime(required=True, default=lambda: datetime.now(timezone.utc))


domain.init(traverse=False)
with domain.domain_context():
    user = User(id="1", email="<EMAIL>", name="<NAME>")

    user.login()
    print(json.dumps(user._events[0].to_dict(), indent=4))

    """ Output:
    {
        "_metadata": {
            "id": "authentication::user-1-0.1",
            "type": "Authentication.UserLoggedIn.v1",
            "fqn": "__main__.UserLoggedIn",
            "kind": "EVENT",
            "stream": "authentication::user-1",
            "origin_stream": null,
            "timestamp": "2024-07-18 22:06:10.148226+00:00",
            "version": "v1",
            "sequence_id": "0.1"
        },
        "user_id": "1"
    }
    """

    user.activate()
    print(json.dumps(user._events[1].to_dict(), indent=4))

    """ Output:
    {
        "_metadata": {
            "id": "authentication::user-1-0.2",
            "type": "Authentication.UserActivated.v2",
            "fqn": "__main__.UserActivated",
            "kind": "EVENT",
            "stream": "authentication::user-1",
            "origin_stream": null,
            "timestamp": "2024-07-18 22:06:10.155603+00:00",
            "version": "v2",
            "sequence_id": "0.2"
        },
        "user_id": "1",
        "activated_at": "2024-07-18 22:06:10.155694+00:00"
    }
    """

End-to-End Event Flow

Here is a complete example showing the full lifecycle: defining an event, raising it from an aggregate, and handling it in an event handler.

from protean import Domain, handle
from protean.fields import Identifier, String, DateTime

domain = Domain(__file__, "Ordering")

# 1. Define the aggregate and event
@domain.aggregate
class Order:
    customer_name: String(max_length=100, required=True)
    status: String(max_length=20, default="DRAFT")

    def place(self):
        self.status = "PLACED"
        self.raise_(OrderPlaced(
            order_id=str(self.id),
            customer_name=self.customer_name,
        ))

@domain.event(part_of=Order)
class OrderPlaced:
    order_id = Identifier(required=True)
    customer_name = String(max_length=100)

# 2. Define a handler that reacts to the event
@domain.event_handler(part_of=Order)
class OrderPlacedNotification:
    @handle(OrderPlaced)
    def send_confirmation(self, event: OrderPlaced):
        print(f"Order {event.order_id} placed for {event.customer_name}")

When order.place() is called, the OrderPlaced event is raised and collected in order._events. Once the aggregate is persisted (via repository or Unit of Work), the event is dispatched to registered handlers — either synchronously or asynchronously depending on domain configuration.

See Raising Events for details on dispatching and the event-sourcing variant, and Event Handlers for the full handler guide.

Fact Events

A fact event encloses the entire state of the aggregate at that specific point in time. It contains all of the attributes and values necessary to completely describe the fact in the context of your business. You can think of a fact event similarly to how you may think of a row in a database: a complete set of data pertaining to the row at that point in time.

Fact events enable a pattern known as Event-carried State Transfer, which is one of the best ways to asynchronously distribute immutable state to all consumers who need it. With fact events, consumers do not have to build up the state themselves from multiple delta event types, which can be risky and error-prone, especially as data schemas evolve and change over time. Instead, they rely on the owning service to compute and produce a fully detailed fact event.

Fact events are generated automatically by the framework with the fact_events=True option in the domain.aggregate decorator. The generated event class is named <AggregateName>FactEvent and is written to a dedicated stream: <stream_category>-fact-<aggregate_id>. This separation lets internal consumers subscribe to granular delta events while external consumers subscribe to the fact stream for complete state snapshots.

Read about generating fact events in the section on raising events. For the integration pattern, see Fact Events as Integration Contracts.

Immutability

Event objects are immutable - they cannot be changed once created. This is important because events are meant to be used as a snapshot of the domain state at a specific point in time.

In [1]: user = User(name='John Doe', email='john@doe.com', status='ACTIVE')

In [2]: renamed = UserRenamed(user_id=user.id, name="John Doe Jr.")

In [3]: renamed.name = "John Doe Sr."
...
IncorrectUsageError: 'Event/Command Objects are immutable and cannot be modified once created'

Common Errors

Exception When it occurs
ValidationError Event construction fails because a required field is missing or a field value is invalid.
IncorrectUsageError Attempting to modify an event attribute after creation — events are immutable.
IncorrectUsageError Non-abstract event defined without part_of — every concrete event must be associated with an aggregate.
ConfigurationError Event raised on an aggregate it is not associated with (part_of mismatch).

See also

Concept overview: Events — Domain events and their role in system communication.

Next steps:

  • Raising Events — How to raise events from aggregates and entities, dispatch them, and use event sourcing patterns.
  • Event Handlers — Consuming events and performing side effects.

Guides:

  • Message Tracing — Track the full causal chain of commands and events with correlation and causation IDs.
  • Event Upcasting — Transforming old event payloads to current schemas.

Patterns: