Raising Events
DDD CQRS ES
An aggregate rarely exists in isolation — its state changes often mean that other parts of the system have to sync up. An order being placed needs to reserve inventory; a user changing their email needs to update downstream projections. Without a mechanism to communicate these changes, aggregates become tightly coupled, and the system loses its ability to evolve independently. In DDD, the mechanism to accomplish this is through Domain Events.
Delta Events
When an aggregate mutates, it also (preferably) raises one or more events to record the state change in time, as well as propagate it within and beyond the bounded context.
@banking.aggregate
class Account:
account_number: Identifier(required=True, unique=True)
balance: Float()
overdraft_limit: Float(default=0.0)
@invariant.post
def balance_must_be_greater_than_or_equal_to_overdraft_limit(self):
if self.balance < -self.overdraft_limit:
raise InsufficientFundsException("Balance cannot be below overdraft limit")
def withdraw(self, amount: float):
self.balance -= amount # Update account state (mutation)
self.raise_(AccountWithdrawn(account_number=self.account_number, amount=amount))
The generated events are collected in the mutated aggregate:
In [1]: account = Account(account_number="1234", balance=1000.0, overdraft_limit=50.0)
In [2]: account.withdraw(500.0)
In [3]: account.to_dict()
Out[3]:
{'account_number': '1234',
'balance': 500.0,
'overdraft_limit': 50.0,
'id': '37fc8d10-1209-41d2-a6fa-4f7312912212'}
In [4]: account._events
Out[4]: [<AccountWithdrawn: AccountWithdrawn object (
{'account_number': '1234',
'amount': 500.0})>]
Events accumulate in _events until the aggregate is persisted. If an
aggregate is never persisted (e.g. you construct it in a test and don't save
it), the events remain in memory and are never dispatched.
Raising Events from Entities
Any entity in the aggregate cluster can raise events. But the events are collected in the aggregate alone. Aggregates are also responsible for consuming events and performing state changes on underlying entities, as we will see in the future.
If the event is being generated by an entity, it can access the enclosing
aggregate's identity with the help of _owner property:
from datetime import date
from protean import Domain
from protean.fields import Date, HasMany, Identifier, String
domain = Domain()
@domain.aggregate
class Patron:
name: String(required=True, max_length=50)
status: String(choices=["ACTIVE", "INACTIVE"], default="ACTIVE")
holds = HasMany("Hold")
def cancel_hold(self, hold_id):
self.get_one_from_holds(id=hold_id).cancel_hold()
@domain.event(part_of="Patron")
class HoldCanceled:
hold_id: Identifier(required=True)
book_id: Identifier(required=True)
patron_id: Identifier(required=True)
canceled_on: Date(default=date.today())
@domain.entity(part_of="Patron")
class Hold:
book_id: Identifier(required=True)
status: String(choices=["ACTIVE", "EXPIRED", "CANCELLED"], default="ACTIVE")
placed_on: Date(default=date.today())
def cancel_hold(self):
self.status = "CANCELLED"
self.raise_(
HoldCanceled(
hold_id=self.id,
book_id=self.book_id,
patron_id=self._owner.id, # (1)
)
)
self._ownerhere is the owningPatronobject.
How raise_() Works
When you call self.raise_(event), Protean:
- Verifies the event is associated with this aggregate
(
event.meta_.part_ofmust match the aggregate class). - Enriches the event with metadata — the aggregate's identity, stream
name, a monotonically increasing
sequence_id, and a checksum. - Appends the event to
self._events. - For event-sourced aggregates, additionally invokes the corresponding
@applyhandler insideatomic_change()to mutate state (see below).
Events within a single aggregate are ordered by sequence_id, ensuring
consumers process them in the order they were raised.
Note
raise_() cannot be called on a temporally-loaded aggregate (one loaded
with a point-in-time query). Temporal aggregates are read-only.
Dispatching Events
Events are dispatched automatically when the aggregate is persisted through
a repository. You do not need to publish events manually — when you call
domain.repository_for(Aggregate).add(aggregate) within a Unit of Work,
Protean persists the aggregate's accumulated events to the event store and
forwards them to registered brokers.
Events are published to streams based on the aggregate's stream category. Each event is stored in a stream named <domain>::<stream_category>-<aggregate_id>, ensuring that all events for a specific aggregate instance are grouped together and can be processed in order.
Learn more about how stream categories organize message flows in the Stream Categories guide.
In [1]: account = Account(account_number="1234", balance=1000.0, overdraft_limit=50.0)
In [2]: account.withdraw(500.0)
In [3]: account._events
Out[3]: [<AccountWithdrawn: AccountWithdrawn object (
{'account_number': '1234', 'amount': 500.0})>]
In [4]: domain.repository_for(Account).add(account) # Events are dispatched here
The events accumulated in account._events are written to the event store and
forwarded to brokers as part of the repository's persistence operation. After
persistence, the event list is cleared.
What Happens Next
From there, the async processing engine picks up events via subscriptions and delivers them to:
- Event handlers — react to events and orchestrate side effects (e.g. syncing another aggregate, sending a notification).
- Projectors — maintain read-optimized projections by processing events into denormalized views.
- Subscribers — consume messages from external brokers at the domain boundary.
The event store is the source of truth for the async processing engine. Events are persisted durably before being delivered, so handlers can process them even after restarts.
Event Sourced Aggregates: raise_() and @apply
For event-sourced aggregates (is_event_sourced=True), raise_()
does more than collect events — it automatically invokes the
corresponding @apply handler to mutate the aggregate's state in-place.
This makes @apply the single source of truth for all state
mutations, whether the aggregate is processing live commands or being
reconstructed from stored events.
from protean import apply
@domain.aggregate(is_event_sourced=True)
class Order:
customer_name: String(max_length=150, required=True)
status: String(max_length=20, default="PENDING")
@classmethod
def place(cls, customer_name):
order = cls._create_new()
order.raise_(OrderPlaced(
order_id=str(order.id),
customer_name=customer_name,
))
return order
def confirm(self):
self.raise_(OrderConfirmed(order_id=self.id))
@apply
def when_placed(self, event: OrderPlaced):
self.customer_name = event.customer_name
self.status = "PENDING"
@apply
def when_confirmed(self, event: OrderConfirmed):
self.status = "CONFIRMED"
Key points for ES aggregates:
- Business methods only raise events — they never mutate state
directly. The
@applyhandler does the mutation. raise_()wraps the@applycall insideatomic_change(), so invariants are checked before and after the state change.- Every event raised must have a corresponding
@applyhandler. Raising an event without one throwsIncorrectUsageError. - Factory methods use
_create_new()to create a blank aggregate with identity. The creation event's@applyhandler populates all remaining state.
Note
For standard (non-ES) aggregates, raise_() only collects
events — it does not call @apply handlers. State is mutated
directly in the business method, and @apply is not used.
Fact Events
Fact events are automatically
generated by Protean when an aggregate opts in with fact_events=True.
Unlike delta events (which you raise explicitly in business methods), fact
events are generated during repository persistence — each time the
aggregate is saved, Protean snapshots the full aggregate state into a fact
event.
The event name is of the format
<AggregateName>FactEvent, and the stream name will be
<stream_category>-fact-<aggregate_id>, where stream_category is the aggregate's stream category.
import json
from protean import Domain
from protean.fields import HasOne, String
from protean.utils.eventing import Message
domain = Domain(name="Authentication")
@domain.aggregate(fact_events=True)
class User:
name: String(max_length=50, required=True)
email: String(required=True)
status: String(choices=["ACTIVE", "ARCHIVED"])
account = HasOne("Account")
@domain.entity(part_of=User)
class Account:
password_hash: String(max_length=512)
domain.init(traverse=False)
with domain.domain_context():
user = User(name="John Doe", email="john.doe@example.com")
# Persist the user
domain.repository_for(User).add(user)
event_message = domain.event_store.store.read(
f"authentication::user-fact-{user.id}"
)[0]
event = Message.to_object(event_message)
print(json.dumps(event.to_dict(), indent=4))
""" Output:
{
"_metadata": {
"id": "authentication::user-fact-781c4363-5e7e-4c53-a599-2cb2dc428d96-0.1",
"type": "Authentication.UserFactEvent.v1",
"fqn": "protean.container.UserFactEvent",
"kind": "EVENT",
"stream": "authentication::user-fact-781c4363-5e7e-4c53-a599-2cb2dc428d96",
"origin_stream": null,
"timestamp": "2024-07-18 22:01:02.364078+00:00",
"version": "v1",
"sequence_id": "0.1"
},
"_version": 0,
"name": "John Doe",
"email": "john.doe@example.com",
"status": null,
"account": null,
"id": "781c4363-5e7e-4c53-a599-2cb2dc428d96"
}
"""
The fact event for User aggregate in the above example is UserFactEvent
and the output stream is user-fact-e97cef08-f11d-43eb-8a69-251a0828bbff
See also
Concept overview: Events — Domain events and their role in system communication.
Related guides:
- Aggregate Mutation — How state changes work inside aggregates.
- Invariants — Business rules checked before and after state changes.
- Event Handlers — Processing events to orchestrate side effects.
- Projections — Building read models from event streams.
- Message Enrichment — Automatically add custom metadata (user context, tenant ID, audit data) to every event.
- Message Tracing — Follow causal chains with correlation and causation IDs.
Patterns:
- Design Events for Consumers — Structuring events so consumers can process them reliably.
- Fact Events as Integration Contracts — Using fact events for cross-domain communication.