Skip to content

Testing DSL

Fluent test helpers for event-sourced aggregates. The given function provides a Pythonic DSL for integration tests that exercise the full command processing pipeline: command -> handler -> aggregate -> events.

See Testing guide for practical usage.

Testing DSL for Protean.

Provides fluent, Pythonic DSLs for testing event-sourced aggregates, process managers, projections, and domain invariants.

Event-sourcing tests

The three words::

given(Order, order_created, order_confirmed).process(initiate_payment)

"Given an Order after order_created and order_confirmed, process initiate_payment."

After .process(), assert with plain Python::

assert order.accepted
assert PaymentPending in order.events
assert order.events[PaymentPending].payment_id == "pay-001"
assert order.status == "Payment_Pending"

Multi-command chaining::

order = (
    given(Order)
    .process(CreateOrder(order_id=oid, customer="Alice", amount=99.99))
    .process(ConfirmOrder(order_id=oid))
    .process(InitiatePayment(order_id=oid, payment_id="pay-001"))
)

assert order.accepted
assert order.status == "Payment_Pending"

Process manager tests

When the first argument is a process manager class, given() returns a ProcessManagerResult that feeds events through the PM's handlers::

result = given(
    OrderFulfillmentPM,
    OrderPlaced(order_id="o1", customer_id="c1", total=100.0),
    PaymentConfirmed(payment_id="p1", order_id="o1", amount=100.0),
)
assert result.status == "awaiting_shipment"
assert not result.is_complete
assert result.transition_count == 2

Or events first with .results_in()::

result = given(
    OrderPlaced(order_id="o1", ...),
    PaymentConfirmed(order_id="o1", ...),
).results_in(OrderFulfillmentPM, id="o1")

Projection tests

When called with event instances only (no class), given() returns an EventSequence for testing projections::

result = given(
    Registered(user_id="u1", name="Alice"),
    Transacted(user_id="u1", amount=100),
).then(Balances, id="u1")

result.has(name="Alice", balance=100)
assert result.projection.balance == 100

Invariant helpers

assert_invalid and assert_valid test validation behavior::

assert_invalid(
    lambda: OrderPlacementService(order, [empty_inventory])(),
    message="Product is out of stock",
)

assert_valid(lambda: OrderPlacementService(order, [sufficient_inventory])())

AggregateResult

AggregateResult(
    aggregate_cls: type,
    given_events: list[Any] | None = None,
)

The result of processing a command against an event-sourced aggregate.

Proxies attribute access to the underlying aggregate, so order.status works directly.

Supports multi-command chaining — call .process() repeatedly to build up aggregate state through the real pipeline::

order = (
    given(Order)
    .process(CreateOrder(order_id=oid, customer="Alice", amount=99.99))
    .process(ConfirmOrder(order_id=oid))
    .process(InitiatePayment(order_id=oid, payment_id="pay-001"))
)

Created by given(), not directly.

Source code in src/protean/testing.py
245
246
247
248
249
250
251
252
253
254
255
256
257
def __init__(
    self, aggregate_cls: type, given_events: list[Any] | None = None
) -> None:
    self._aggregate_cls = aggregate_cls
    self._given_events = list(given_events or [])
    self._aggregate = None
    self._new_events: EventLog = EventLog([])
    self._all_events: list[Any] = []
    self._rejection: Exception | None = None
    self._processed: bool = False
    self._aggregate_id: Any = None
    self._event_count: int = 0
    self._seeded: bool = False

events property

events: EventLog

New events raised by the last command (EventLog).

all_events property

all_events: EventLog

All events raised across all .process() calls (EventLog).

rejection property

rejection: Exception | None

The exception if the command was rejected, or None.

accepted property

accepted: bool

True if the last command was processed without exception.

rejected property

rejected: bool

True if the last command raised an exception.

rejection_messages property

rejection_messages: list[str]

Flat list of error messages from the rejection.

For ValidationError, flattens the messages dict values. For other exceptions, returns [str(exc)]. Returns [] if no rejection.

Examples::

assert "Order must be confirmed" in result.rejection_messages

aggregate property

aggregate

The raw aggregate instance, if needed directly.

after

after(*events) -> AggregateResult

Accumulate more history events (for BDD "And given" steps).

Returns self for chaining::

order = given(Order, order_created)
order = order.after(order_confirmed)
order = order.after(payment_pending)
Source code in src/protean/testing.py
259
260
261
262
263
264
265
266
267
268
269
def after(self, *events) -> AggregateResult:
    """Accumulate more history events (for BDD "And given" steps).

    Returns self for chaining::

        order = given(Order, order_created)
        order = order.after(order_confirmed)
        order = order.after(payment_pending)
    """
    self._given_events.extend(events)
    return self

process

process(
    command, *, correlation_id: str | None = None
) -> AggregateResult

Dispatch a command through the domain's full processing pipeline.

Seeds the event store with given events (on first call only), then calls domain.process(command) which routes through the real command handler, repository, and unit of work.

Can be called multiple times to chain commands::

result = (
    given(Order)
    .process(CreateOrder(...))
    .process(ConfirmOrder(...))
)

After each call:

  • .events contains events from the last command only.
  • .all_events contains events from all commands.
  • .accepted / .rejected reflects the last command.

Returns self for chaining.

Source code in src/protean/testing.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def process(self, command, *, correlation_id: str | None = None) -> AggregateResult:
    """Dispatch a command through the domain's full processing pipeline.

    Seeds the event store with given events (on first call only),
    then calls ``domain.process(command)`` which routes through the
    real command handler, repository, and unit of work.

    Can be called multiple times to chain commands::

        result = (
            given(Order)
            .process(CreateOrder(...))
            .process(ConfirmOrder(...))
        )

    After each call:

    - ``.events`` contains events from the **last** command only.
    - ``.all_events`` contains events from **all** commands.
    - ``.accepted`` / ``.rejected`` reflects the **last** command.

    Returns self for chaining.
    """
    from protean.utils.globals import current_domain

    domain = current_domain
    self._processed = True
    self._rejection = None  # Reset for this command

    # Seed event store with given events (first call only)
    if self._given_events and not self._seeded:
        self._aggregate_id = self._seed_events(domain)
        self._event_count = len(self._given_events)
        self._seeded = True

    # Process command through the domain
    try:
        result = domain.process(
            command, asynchronous=False, correlation_id=correlation_id
        )
    except Exception as exc:
        self._rejection = exc
        # On rejection, load aggregate from event store to reflect
        # the state before the failed command
        if self._aggregate_id is not None:
            self._aggregate = domain.event_store.store.load_aggregate(
                self._aggregate_cls, str(self._aggregate_id)
            )
        self._new_events = EventLog([])
        return self

    # Determine aggregate_id if not known (e.g. create commands)
    if self._aggregate_id is None:
        self._aggregate_id = result

    aggregate_id_str = str(self._aggregate_id)

    # Load aggregate from event store
    self._aggregate = domain.event_store.store.load_aggregate(
        self._aggregate_cls, aggregate_id_str
    )

    # Read new events (those beyond previously seen events)
    stream = f"{self._aggregate_cls.meta_.stream_category}-{aggregate_id_str}"
    all_messages = domain.event_store.store.read(stream)
    new_events = [m.to_domain_object() for m in all_messages[self._event_count :]]
    self._new_events = EventLog(new_events)
    self._all_events.extend(new_events)
    self._event_count = len(all_messages)

    return self

__getattr__

__getattr__(name: str)

Proxy attribute access to the underlying aggregate.

This makes order.status, order.items, order.pricing work directly on the result object.

Source code in src/protean/testing.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
def __getattr__(self, name: str):
    """Proxy attribute access to the underlying aggregate.

    This makes ``order.status``, ``order.items``, ``order.pricing``
    work directly on the result object.
    """
    # Avoid infinite recursion on private/dunder attrs
    if name.startswith("_"):
        raise AttributeError(name)
    if self._aggregate is not None:
        return getattr(self._aggregate, name)
    raise AttributeError(
        f"'{type(self).__name__}' object has no attribute '{name}'. "
        f"Did you call .process() first?"
    )

_seed_events

_seed_events(domain) -> Any

Write given events to the event store and process handlers.

Reconstitutes the aggregate from events to determine its identity, then enriches each event with proper metadata and appends to the event store so that domain.process() can load the aggregate via its repository.

Also runs synchronous event handlers (projectors, etc.) for each seeded event, mirroring what UoW commit does. This ensures projections and other side effects are in place when the command under test is processed.

Returns the aggregate identifier.

Source code in src/protean/testing.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
def _seed_events(self, domain) -> Any:
    """Write given events to the event store and process handlers.

    Reconstitutes the aggregate from events to determine its identity,
    then enriches each event with proper metadata and appends to the
    event store so that ``domain.process()`` can load the aggregate
    via its repository.

    Also runs synchronous event handlers (projectors, etc.) for each
    seeded event, mirroring what UoW commit does. This ensures
    projections and other side effects are in place when the command
    under test is processed.

    Returns the aggregate identifier.
    """
    from protean.utils import Processing

    event_store = domain.event_store.store

    # Reconstitute aggregate to discover its identity
    temp_aggregate = self._aggregate_cls.from_events(self._given_events)
    id_field_name = getattr(self._aggregate_cls, _ID_FIELD_NAME)
    aggregate_id = getattr(temp_aggregate, id_field_name)

    stream_category = self._aggregate_cls.meta_.stream_category
    stream = f"{stream_category}-{aggregate_id}"

    enriched_events = []
    for i, event in enumerate(self._given_events):
        version = i + 1
        event_identity = f"{stream}-{version}"

        headers = MessageHeaders(
            id=event_identity,
            type=event.__class__.__type__,
            stream=stream,
            time=event._metadata.headers.time
            if (event._metadata.headers and event._metadata.headers.time)
            else None,
        )

        envelope = MessageEnvelope.build(event.payload)

        domain_meta = DomainMeta(
            kind="EVENT",
            fqn=fqn(event.__class__),
            stream_category=stream_category,
            version=event.__class__.__version__,
            sequence_id=str(version),
            asynchronous=False,
        )

        metadata = Metadata(
            headers=headers,
            envelope=envelope,
            domain=domain_meta,
        )

        enriched = event.__class__(
            event.payload,
            _expected_version=i - 1,
            _metadata=metadata,
        )

        event_store.append(enriched)
        enriched_events.append(enriched)

    # Process event handlers (projectors, etc.) for seeded events,
    # just like UoW commit does for synchronous processing.
    if domain.config["event_processing"] == Processing.SYNC.value:
        for enriched in enriched_events:
            handler_classes = domain.handlers_for(enriched)
            for handler_cls in handler_classes:
                handler_cls._handle(enriched)

    return aggregate_id

EventLog

EventLog(events: list[Any])

A collection of domain events with Pythonic access.

Supports in (contains by type), [] (getitem by type or index), len, bool, iteration, .get(), .of_type(), .types, .first, and .last.

Examples::

assert PaymentPending in log
assert log[PaymentPending].payment_id == "pay-001"
assert log.get(PaymentFailed) is None
assert log.types == [PaymentPending]
assert len(log) == 1
assert log.first is placed_event
assert log                          # truthy when non-empty
Source code in src/protean/testing.py
167
168
def __init__(self, events: list[Any]) -> None:
    self._events = list(events)

types property

types: list[type]

Ordered list of event types.

first property

first: Any | None

First event, or None if empty.

last property

last: Any | None

Last event, or None if empty.

__contains__

__contains__(event_cls: type) -> bool

Check if an event of this type exists.

Source code in src/protean/testing.py
170
171
172
def __contains__(self, event_cls: type) -> bool:
    """Check if an event of this type exists."""
    return any(isinstance(e, event_cls) for e in self._events)

__getitem__

__getitem__(key: type | int) -> Any

Access by event class (first match) or by index.

Raises KeyError if an event class is not found.

Source code in src/protean/testing.py
174
175
176
177
178
179
180
181
182
183
184
def __getitem__(self, key: type | int) -> Any:
    """Access by event class (first match) or by index.

    Raises ``KeyError`` if an event class is not found.
    """
    if isinstance(key, type):
        for e in self._events:
            if isinstance(e, key):
                return e
        raise KeyError(f"No {key.__name__} event found")
    return self._events[key]

get

get(event_cls: type, default: Any = None) -> Any

Safe access by event class. Returns default if not found.

Source code in src/protean/testing.py
186
187
188
189
190
191
def get(self, event_cls: type, default: Any = None) -> Any:
    """Safe access by event class. Returns *default* if not found."""
    for e in self._events:
        if isinstance(e, event_cls):
            return e
    return default

of_type

of_type(event_cls: type) -> list[Any]

Return all events of the given type.

Source code in src/protean/testing.py
193
194
195
def of_type(self, event_cls: type) -> list[Any]:
    """Return all events of the given type."""
    return [e for e in self._events if isinstance(e, event_cls)]

given

given(
    cls_or_event: type | "BaseEvent", *events: "BaseEvent"
) -> AggregateResult | ProcessManagerResult | EventSequence

Start a test sentence.

Polymorphic entry point:

  • given(AggregateClass, *events) — returns an AggregateResult for event-sourcing tests.
  • given(ProcessManagerClass, *events) — returns a ProcessManagerResult for process manager tests.
  • given(event, *events) — returns an EventSequence for projection or process manager tests (via .results_in()).

Examples::

# Event-sourcing test
given(Order)                                    # no history
given(Order, order_created)                     # one event
given(Order, order_created, order_confirmed)    # multiple events

# Process manager test
given(OrderFulfillmentPM, order_placed, payment_confirmed)

# Projection test
given(Registered(user_id="u1", name="Alice"))
given(registered_event, transacted_event)
Source code in src/protean/testing.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def given(
    cls_or_event: type | "BaseEvent", *events: "BaseEvent"
) -> AggregateResult | ProcessManagerResult | EventSequence:
    """Start a test sentence.

    Polymorphic entry point:

    - ``given(AggregateClass, *events)`` — returns an ``AggregateResult`` for
      event-sourcing tests.
    - ``given(ProcessManagerClass, *events)`` — returns a
      ``ProcessManagerResult`` for process manager tests.
    - ``given(event, *events)`` — returns an ``EventSequence`` for projection
      or process manager tests (via ``.results_in()``).

    Examples::

        # Event-sourcing test
        given(Order)                                    # no history
        given(Order, order_created)                     # one event
        given(Order, order_created, order_confirmed)    # multiple events

        # Process manager test
        given(OrderFulfillmentPM, order_placed, payment_confirmed)

        # Projection test
        given(Registered(user_id="u1", name="Alice"))
        given(registered_event, transacted_event)
    """
    if isinstance(cls_or_event, type):
        from protean.core.process_manager import BaseProcessManager

        if issubclass(cls_or_event, BaseProcessManager):
            return ProcessManagerResult(cls_or_event, list(events))
        return AggregateResult(cls_or_event, list(events))
    # All arguments are event instances → projection / PM testing path
    return EventSequence([cls_or_event, *events])