Bases: BaseEntity
Base class for aggregate root entities -- the primary building block for
modeling domain concepts.
Aggregates enforce consistency rules and define transaction boundaries.
They inherit all entity capabilities (fields, identity, invariants) and add
versioning for optimistic concurrency, event raising via raise_(), and
event-sourcing support via _apply() / from_events().
Meta Options
| Option |
Type |
Description |
is_event_sourced |
bool |
Enable event-sourcing mode (default: False). |
fact_events |
bool |
Auto-generate fact events on persistence (default: False). |
stream_category |
str |
Override the event stream category name. |
provider |
str |
The persistence provider name (default: "default"). |
schema_name |
str |
The storage table/collection name. |
auto_add_id_field |
bool |
Whether to auto-inject an id field (default: True). |
Source code in src/protean/core/aggregate.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137 | def __init__(self, *args: Any, **kwargs: Any) -> None:
# Pop _version before Pydantic init (it's a PrivateAttr,
# and extra="forbid" would reject it). Restore after construction.
# Check both kwargs and positional dict args (template dict pattern).
version = kwargs.pop("_version", None)
if version is None:
for arg in args:
if isinstance(arg, dict) and "_version" in arg:
version = arg.pop("_version")
break
if version is None:
version = -1
super().__init__(*args, **kwargs)
# Restore _version from kwargs or default
self._version = version
# Set self as root and owner
self._set_root_and_owner(self, self)
# Increment version and set next version
self._next_version = self._version + 1
|
raise_
raise_(event: Any) -> None
Raise a domain event on this aggregate.
Enriches the event with metadata (identity, stream, sequence,
checksum) and appends it to self._events.
Source code in src/protean/core/aggregate.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242 | def raise_(self, event: Any) -> None:
"""Raise a domain event on this aggregate.
Enriches the event with metadata (identity, stream, sequence,
checksum) and appends it to ``self._events``.
"""
# Guard: temporal aggregates are read-only
if self._is_temporal:
raise IncorrectUsageError(
"Cannot raise events on a temporally-loaded aggregate. "
"Temporal aggregates are read-only."
)
# Verify that event is associated with this aggregate
if event.meta_.part_of != self.__class__:
raise ConfigurationError(
f"Event `{event.__class__.__name__}` is not associated with"
f" aggregate `{self.__class__.__name__}`"
)
id_field_name = getattr(self.__class__, _ID_FIELD_NAME, None)
identifier = getattr(self, id_field_name) if id_field_name else None
# Set Fact Event stream to be `<aggregate_stream_name>-fact`
if event.__class__.__name__.endswith("FactEvent"):
stream = f"{self.meta_.stream_category}-fact-{identifier}"
else:
stream = f"{self.meta_.stream_category}-{identifier}"
if self.meta_.is_event_sourced:
if not event.__class__.__name__.endswith("FactEvent"):
self._version += 1
event_identity = f"{stream}-{self._version}"
sequence_id = f"{self._version}"
else:
aggregate_version = max(self._version, self._next_version)
event_number = len(self._events) + 1
event_identity = f"{stream}-{aggregate_version}.{event_number}"
sequence_id = f"{aggregate_version}.{event_number}"
headers = MessageHeaders(
id=event_identity,
type=event.__class__.__type__,
stream=stream,
time=event._metadata.headers.time
if (event._metadata.headers and event._metadata.headers.time)
else None,
)
envelope = MessageEnvelope.build(event.payload)
domain_meta = DomainMeta(
**{
**event._metadata.domain.to_dict(),
"stream_category": self.meta_.stream_category,
"sequence_id": sequence_id,
"asynchronous": current_domain.config["event_processing"]
== Processing.ASYNC.value,
}
)
metadata = Metadata(
headers=headers,
envelope=envelope,
domain=domain_meta,
)
# Run event enrichers
if current_domain._event_enrichers:
extensions = dict(metadata.extensions)
for enricher in current_domain._event_enrichers:
result = enricher(event, self)
if result:
extensions.update(result)
if extensions:
metadata = Metadata(
headers=metadata.headers,
envelope=metadata.envelope,
domain=metadata.domain,
event_store=metadata.event_store,
extensions=extensions,
)
event_with_metadata = event.__class__(
event.payload,
_expected_version=self._event_position,
_metadata=metadata,
)
self._event_position = self._event_position + 1
self._events.append(event_with_metadata)
# For ES aggregates, apply the event handler to mutate state in-place.
# This makes @apply the single source of truth for state changes —
# the same code path used during live processing and event replay.
# We use atomic_change so that invariants are checked before and
# after the handler runs, preserving the "always valid" guarantee.
if self.meta_.is_event_sourced:
is_fact_event = event.__class__.__name__.endswith("FactEvent")
if not is_fact_event:
with atomic_change(self):
self._apply_handler(event_with_metadata)
|
from_events
classmethod
Event-Sourcing: reconstruct an aggregate from a list of events.
Creates a blank aggregate via _create_for_reconstitution() and
applies all events uniformly through _apply(). The first event's
@apply handler must set ALL fields including identity.
Source code in src/protean/core/aggregate.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404 | @classmethod
def from_events(cls, events: list) -> "BaseAggregate":
"""Event-Sourcing: reconstruct an aggregate from a list of events.
Creates a blank aggregate via ``_create_for_reconstitution()`` and
applies all events uniformly through ``_apply()``. The first event's
``@apply`` handler must set ALL fields including identity.
"""
aggregate = cls._create_for_reconstitution()
for event in events:
aggregate._apply(event)
aggregate._disable_invariant_checks = False
return aggregate
|