Skip to content

BaseAggregate

Base class for aggregate root entities -- the primary building block for modeling domain concepts that enforce consistency rules and manage state changes.

See Aggregates guide for practical usage and Aggregates concept for design rationale.

Bases: BaseEntity

Base class for aggregate root entities -- the primary building block for modeling domain concepts.

Aggregates enforce consistency rules and define transaction boundaries. They inherit all entity capabilities (fields, identity, invariants) and add versioning for optimistic concurrency, event raising via raise_(), and event-sourcing support via _apply() / from_events().

Meta Options

Option Type Description
is_event_sourced bool Enable event-sourcing mode (default: False).
fact_events bool Auto-generate fact events on persistence (default: False).
stream_category str Override the event stream category name.
provider str The persistence provider name (default: "default").
schema_name str The storage table/collection name.
auto_add_id_field bool Whether to auto-inject an id field (default: True).
Source code in src/protean/core/aggregate.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def __init__(self, *args: Any, **kwargs: Any) -> None:
    # Pop _version before Pydantic init (it's a PrivateAttr,
    # and extra="forbid" would reject it). Restore after construction.
    # Check both kwargs and positional dict args (template dict pattern).
    version = kwargs.pop("_version", None)
    if version is None:
        for arg in args:
            if isinstance(arg, dict) and "_version" in arg:
                version = arg.pop("_version")
                break
    if version is None:
        version = -1

    super().__init__(*args, **kwargs)

    # Restore _version from kwargs or default
    self._version = version

    # Set self as root and owner
    self._set_root_and_owner(self, self)

    # Increment version and set next version
    self._next_version = self._version + 1

raise_

raise_(event: Any) -> None

Raise a domain event on this aggregate.

Enriches the event with metadata (identity, stream, sequence, checksum) and appends it to self._events.

Source code in src/protean/core/aggregate.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def raise_(self, event: Any) -> None:
    """Raise a domain event on this aggregate.

    Enriches the event with metadata (identity, stream, sequence,
    checksum) and appends it to ``self._events``.
    """
    # Guard: temporal aggregates are read-only
    if self._is_temporal:
        raise IncorrectUsageError(
            "Cannot raise events on a temporally-loaded aggregate. "
            "Temporal aggregates are read-only."
        )

    # Verify that event is associated with this aggregate
    if event.meta_.part_of != self.__class__:
        raise ConfigurationError(
            f"Event `{event.__class__.__name__}` is not associated with"
            f" aggregate `{self.__class__.__name__}`"
        )

    id_field_name = getattr(self.__class__, _ID_FIELD_NAME, None)
    identifier = getattr(self, id_field_name) if id_field_name else None

    # Set Fact Event stream to be `<aggregate_stream_name>-fact`
    if event.__class__.__name__.endswith("FactEvent"):
        stream = f"{self.meta_.stream_category}-fact-{identifier}"
    else:
        stream = f"{self.meta_.stream_category}-{identifier}"

    if self.meta_.is_event_sourced:
        if not event.__class__.__name__.endswith("FactEvent"):
            self._version += 1

        event_identity = f"{stream}-{self._version}"
        sequence_id = f"{self._version}"
    else:
        aggregate_version = max(self._version, self._next_version)
        event_number = len(self._events) + 1

        event_identity = f"{stream}-{aggregate_version}.{event_number}"
        sequence_id = f"{aggregate_version}.{event_number}"

    headers = MessageHeaders(
        id=event_identity,
        type=event.__class__.__type__,
        stream=stream,
        time=event._metadata.headers.time
        if (event._metadata.headers and event._metadata.headers.time)
        else None,
    )

    envelope = MessageEnvelope.build(event.payload)

    domain_meta = DomainMeta(
        **{
            **event._metadata.domain.to_dict(),
            "stream_category": self.meta_.stream_category,
            "sequence_id": sequence_id,
            "asynchronous": current_domain.config["event_processing"]
            == Processing.ASYNC.value,
        }
    )

    metadata = Metadata(
        headers=headers,
        envelope=envelope,
        domain=domain_meta,
    )

    # Run event enrichers
    if current_domain._event_enrichers:
        extensions = dict(metadata.extensions)
        for enricher in current_domain._event_enrichers:
            result = enricher(event, self)
            if result:
                extensions.update(result)
        if extensions:
            metadata = Metadata(
                headers=metadata.headers,
                envelope=metadata.envelope,
                domain=metadata.domain,
                event_store=metadata.event_store,
                extensions=extensions,
            )

    event_with_metadata = event.__class__(
        event.payload,
        _expected_version=self._event_position,
        _metadata=metadata,
    )

    self._event_position = self._event_position + 1
    self._events.append(event_with_metadata)

    # For ES aggregates, apply the event handler to mutate state in-place.
    # This makes @apply the single source of truth for state changes —
    # the same code path used during live processing and event replay.
    # We use atomic_change so that invariants are checked before and
    # after the handler runs, preserving the "always valid" guarantee.
    if self.meta_.is_event_sourced:
        is_fact_event = event.__class__.__name__.endswith("FactEvent")
        if not is_fact_event:
            with atomic_change(self):
                self._apply_handler(event_with_metadata)

from_events classmethod

from_events(events: list) -> BaseAggregate

Event-Sourcing: reconstruct an aggregate from a list of events.

Creates a blank aggregate via _create_for_reconstitution() and applies all events uniformly through _apply(). The first event's @apply handler must set ALL fields including identity.

Source code in src/protean/core/aggregate.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
@classmethod
def from_events(cls, events: list) -> "BaseAggregate":
    """Event-Sourcing: reconstruct an aggregate from a list of events.

    Creates a blank aggregate via ``_create_for_reconstitution()`` and
    applies all events uniformly through ``_apply()``.  The first event's
    ``@apply`` handler must set ALL fields including identity.
    """
    aggregate = cls._create_for_reconstitution()

    for event in events:
        aggregate._apply(event)

    aggregate._disable_invariant_checks = False
    return aggregate