Skip to content

Raising Events

DDD CQRS ES

An aggregate rarely exists in isolation — its state changes often mean that other parts of the system have to sync up. An order being placed needs to reserve inventory; a user changing their email needs to update downstream projections. Without a mechanism to communicate these changes, aggregates become tightly coupled, and the system loses its ability to evolve independently. In DDD, the mechanism to accomplish this is through Domain Events.

Delta Events

When an aggregate mutates, it also (preferably) raises one or more events to record the state change in time, as well as propagate it within and beyond the bounded context.

@banking.aggregate
class Account:
    account_number: Identifier(required=True, unique=True)
    balance: Float()
    overdraft_limit: Float(default=0.0)

    @invariant.post
    def balance_must_be_greater_than_or_equal_to_overdraft_limit(self):
        if self.balance < -self.overdraft_limit:
            raise InsufficientFundsException("Balance cannot be below overdraft limit")

    def withdraw(self, amount: float):
        self.balance -= amount  # Update account state (mutation)

        self.raise_(AccountWithdrawn(account_number=self.account_number, amount=amount))

The generated events are collected in the mutated aggregate:

In [1]: account = Account(account_number="1234", balance=1000.0, overdraft_limit=50.0)

In [2]: account.withdraw(500.0)

In [3]: account.to_dict()
Out[3]:
{'account_number': '1234',
 'balance': 500.0,
 'overdraft_limit': 50.0,
 'id': '37fc8d10-1209-41d2-a6fa-4f7312912212'}

In [4]: account._events
Out[4]: [<AccountWithdrawn: AccountWithdrawn object (
{'account_number': '1234',
 'amount': 500.0})>]

Events accumulate in _events until the aggregate is persisted. If an aggregate is never persisted (e.g. you construct it in a test and don't save it), the events remain in memory and are never dispatched.

Raising Events from Entities

Any entity in the aggregate cluster can raise events. But the events are collected in the aggregate alone. Aggregates are also responsible for consuming events and performing state changes on underlying entities, as we will see in the future.

If the event is being generated by an entity, it can access the enclosing aggregate's identity with the help of _owner property:

from datetime import date

from protean import Domain
from protean.fields import Date, HasMany, Identifier, String

domain = Domain()


@domain.aggregate
class Patron:
    name: String(required=True, max_length=50)
    status: String(choices=["ACTIVE", "INACTIVE"], default="ACTIVE")
    holds = HasMany("Hold")

    def cancel_hold(self, hold_id):
        self.get_one_from_holds(id=hold_id).cancel_hold()


@domain.event(part_of="Patron")
class HoldCanceled:
    hold_id: Identifier(required=True)
    book_id: Identifier(required=True)
    patron_id: Identifier(required=True)
    canceled_on: Date(default=date.today())


@domain.entity(part_of="Patron")
class Hold:
    book_id: Identifier(required=True)
    status: String(choices=["ACTIVE", "EXPIRED", "CANCELLED"], default="ACTIVE")
    placed_on: Date(default=date.today())

    def cancel_hold(self):
        self.status = "CANCELLED"
        self.raise_(
            HoldCanceled(
                hold_id=self.id,
                book_id=self.book_id,
                patron_id=self._owner.id,  # (1)
            )
        )
  1. self._owner here is the owning Patron object.

How raise_() Works

When you call self.raise_(event), Protean:

  1. Verifies the event is associated with this aggregate (event.meta_.part_of must match the aggregate class).
  2. Enriches the event with metadata — the aggregate's identity, stream name, a monotonically increasing sequence_id, and a checksum.
  3. Appends the event to self._events.
  4. For event-sourced aggregates, additionally invokes the corresponding @apply handler inside atomic_change() to mutate state (see below).

Events within a single aggregate are ordered by sequence_id, ensuring consumers process them in the order they were raised.

Note

raise_() cannot be called on a temporally-loaded aggregate (one loaded with a point-in-time query). Temporal aggregates are read-only.

Dispatching Events

Events are dispatched automatically when the aggregate is persisted through a repository. You do not need to publish events manually — when you call domain.repository_for(Aggregate).add(aggregate) within a Unit of Work, Protean persists the aggregate's accumulated events to the event store and forwards them to registered brokers.

Events are published to streams based on the aggregate's stream category. Each event is stored in a stream named <domain>::<stream_category>-<aggregate_id>, ensuring that all events for a specific aggregate instance are grouped together and can be processed in order.

Learn more about how stream categories organize message flows in the Stream Categories guide.

In [1]: account = Account(account_number="1234", balance=1000.0, overdraft_limit=50.0)

In [2]: account.withdraw(500.0)

In [3]: account._events
Out[3]: [<AccountWithdrawn: AccountWithdrawn object (
{'account_number': '1234', 'amount': 500.0})>]

In [4]: domain.repository_for(Account).add(account)  # Events are dispatched here

The events accumulated in account._events are written to the event store and forwarded to brokers as part of the repository's persistence operation. After persistence, the event list is cleared.

What Happens Next

From there, the async processing engine picks up events via subscriptions and delivers them to:

  • Event handlers — react to events and orchestrate side effects (e.g. syncing another aggregate, sending a notification).
  • Projectors — maintain read-optimized projections by processing events into denormalized views.
  • Subscribers — consume messages from external brokers at the domain boundary.

The event store is the source of truth for the async processing engine. Events are persisted durably before being delivered, so handlers can process them even after restarts.

Event Sourced Aggregates: raise_() and @apply

For event-sourced aggregates (is_event_sourced=True), raise_() does more than collect events — it automatically invokes the corresponding @apply handler to mutate the aggregate's state in-place. This makes @apply the single source of truth for all state mutations, whether the aggregate is processing live commands or being reconstructed from stored events.

from protean import apply

@domain.aggregate(is_event_sourced=True)
class Order:
    customer_name: String(max_length=150, required=True)
    status: String(max_length=20, default="PENDING")

    @classmethod
    def place(cls, customer_name):
        order = cls._create_new()
        order.raise_(OrderPlaced(
            order_id=str(order.id),
            customer_name=customer_name,
        ))
        return order

    def confirm(self):
        self.raise_(OrderConfirmed(order_id=self.id))

    @apply
    def when_placed(self, event: OrderPlaced):
        self.customer_name = event.customer_name
        self.status = "PENDING"

    @apply
    def when_confirmed(self, event: OrderConfirmed):
        self.status = "CONFIRMED"

Key points for ES aggregates:

  • Business methods only raise events — they never mutate state directly. The @apply handler does the mutation.
  • raise_() wraps the @apply call inside atomic_change(), so invariants are checked before and after the state change.
  • Every event raised must have a corresponding @apply handler. Raising an event without one throws IncorrectUsageError.
  • Factory methods use _create_new() to create a blank aggregate with identity. The creation event's @apply handler populates all remaining state.

Note

For standard (non-ES) aggregates, raise_() only collects events — it does not call @apply handlers. State is mutated directly in the business method, and @apply is not used.

Fact Events

Fact events are automatically generated by Protean when an aggregate opts in with fact_events=True. Unlike delta events (which you raise explicitly in business methods), fact events are generated during repository persistence — each time the aggregate is saved, Protean snapshots the full aggregate state into a fact event.

The event name is of the format <AggregateName>FactEvent, and the stream name will be <stream_category>-fact-<aggregate_id>, where stream_category is the aggregate's stream category.

import json

from protean import Domain
from protean.fields import HasOne, String
from protean.utils.eventing import Message

domain = Domain(name="Authentication")


@domain.aggregate(fact_events=True)
class User:
    name: String(max_length=50, required=True)
    email: String(required=True)
    status: String(choices=["ACTIVE", "ARCHIVED"])

    account = HasOne("Account")


@domain.entity(part_of=User)
class Account:
    password_hash: String(max_length=512)


domain.init(traverse=False)
with domain.domain_context():
    user = User(name="John Doe", email="john.doe@example.com")

    # Persist the user
    domain.repository_for(User).add(user)

    event_message = domain.event_store.store.read(
        f"authentication::user-fact-{user.id}"
    )[0]
    event = Message.to_object(event_message)

    print(json.dumps(event.to_dict(), indent=4))

    """ Output:
    {
        "_metadata": {
            "id": "authentication::user-fact-781c4363-5e7e-4c53-a599-2cb2dc428d96-0.1",
            "type": "Authentication.UserFactEvent.v1",
            "fqn": "protean.container.UserFactEvent",
            "kind": "EVENT",
            "stream": "authentication::user-fact-781c4363-5e7e-4c53-a599-2cb2dc428d96",
            "origin_stream": null,
            "timestamp": "2024-07-18 22:01:02.364078+00:00",
            "version": "v1",
            "sequence_id": "0.1"
        },
        "_version": 0,
        "name": "John Doe",
        "email": "john.doe@example.com",
        "status": null,
        "account": null,
        "id": "781c4363-5e7e-4c53-a599-2cb2dc428d96"
    }
    """

The fact event for User aggregate in the above example is UserFactEvent and the output stream is user-fact-e97cef08-f11d-43eb-8a69-251a0828bbff


See also

Concept overview: Events — Domain events and their role in system communication.

Related guides:

  • Aggregate Mutation — How state changes work inside aggregates.
  • Invariants — Business rules checked before and after state changes.
  • Event Handlers — Processing events to orchestrate side effects.
  • Projections — Building read models from event streams.
  • Message Enrichment — Automatically add custom metadata (user context, tenant ID, audit data) to every event.
  • Message Tracing — Follow causal chains with correlation and causation IDs.

Patterns: