Skip to content

BaseEventStore

Event store interface for event-sourced persistence. All event store adapters (Memory, MessageDB, etc.) implement this contract.

See Event Store Adapters for concrete adapter configuration.

This class outlines the base event store capabilities to be implemented in all supported event store adapters.

It is also a marker interface for registering event store classes with the domain.

Source code in src/protean/port/event_store.py
35
36
37
38
39
40
def __init__(
    self, name: str, domain: Any, conn_info: Dict[str, str]
) -> None:  # FIXME Any should be Domain
    self.name = name
    self.domain = domain
    self.conn_info = conn_info

close

close() -> None

Close the event store and release all connections.

Subclasses that hold external resources (connection pools, sockets, etc.) should override this to perform cleanup. The default implementation is a no-op so that adapters without external resources (e.g. the in-memory store) work without changes.

Source code in src/protean/port/event_store.py
42
43
44
45
46
47
48
49
def close(self) -> None:
    """Close the event store and release all connections.

    Subclasses that hold external resources (connection pools, sockets,
    etc.) should override this to perform cleanup.  The default
    implementation is a no-op so that adapters without external
    resources (e.g. the in-memory store) work without changes.
    """

load_aggregate

load_aggregate(part_of: Type[BaseAggregate], identifier: str, *, at_version: int | None = None, as_of: datetime | None = None) -> Optional[BaseAggregate]

Load an aggregate from underlying events.

By default, reconstitutes the aggregate to its current (latest) state. When at_version or as_of is provided, reconstitutes a historical snapshot of the aggregate — a temporal query.

PARAMETER DESCRIPTION
part_of

The EventSourced Aggregate's class.

TYPE: Type[BaseAggregate]

identifier

Unique aggregate identifier.

TYPE: str

at_version

Reconstitute to this exact version (0-indexed). Version 0 is the state after the first event.

TYPE: int | None DEFAULT: None

as_of

Reconstitute the aggregate as of this timestamp. Only events written on or before as_of are applied.

TYPE: datetime | None DEFAULT: None

RETURNS DESCRIPTION
Optional[BaseAggregate]

The fully-formed aggregate, or None when no events exist

Optional[BaseAggregate]

(and no temporal param was given that would raise instead).

Source code in src/protean/port/event_store.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def load_aggregate(
    self,
    part_of: Type[BaseAggregate],
    identifier: str,
    *,
    at_version: int | None = None,
    as_of: datetime | None = None,
) -> Optional[BaseAggregate]:
    """Load an aggregate from underlying events.

    By default, reconstitutes the aggregate to its current (latest) state.
    When ``at_version`` or ``as_of`` is provided, reconstitutes a historical
    snapshot of the aggregate — a *temporal query*.

    Args:
        part_of: The EventSourced Aggregate's class.
        identifier: Unique aggregate identifier.
        at_version: Reconstitute to this exact version (0-indexed).
            Version 0 is the state after the first event.
        as_of: Reconstitute the aggregate as of this timestamp.
            Only events written on or before ``as_of`` are applied.

    Returns:
        The fully-formed aggregate, or ``None`` when no events exist
        (and no temporal param was given that would raise instead).
    """
    if as_of is not None:
        return self._load_aggregate_as_of(part_of, identifier, as_of)
    if at_version is not None:
        return self._load_aggregate_at_version(part_of, identifier, at_version)
    return self._load_aggregate_current(part_of, identifier)

create_snapshot

create_snapshot(part_of: Type[BaseAggregate], identifier: str) -> bool

Create a snapshot for a specific event-sourced aggregate instance.

Reads the full event stream for the aggregate, reconstructs it via from_events(), and writes a snapshot to the snapshot stream. This bypasses the snapshot threshold -- manual triggers always create a snapshot regardless of event count.

PARAMETER DESCRIPTION
part_of

The EventSourced Aggregate class

TYPE: Type[BaseAggregate]

identifier

Unique aggregate identifier

TYPE: str

RETURNS DESCRIPTION
bool

True if a snapshot was created.

RAISES DESCRIPTION
IncorrectUsageError

If the aggregate is not event-sourced.

ObjectNotFoundError

If no events exist for the given identifier.

Source code in src/protean/port/event_store.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
def create_snapshot(self, part_of: Type[BaseAggregate], identifier: str) -> bool:
    """Create a snapshot for a specific event-sourced aggregate instance.

    Reads the full event stream for the aggregate, reconstructs it via
    ``from_events()``, and writes a snapshot to the snapshot stream.
    This bypasses the snapshot threshold -- manual triggers always create
    a snapshot regardless of event count.

    Args:
        part_of: The EventSourced Aggregate class
        identifier: Unique aggregate identifier

    Returns:
        True if a snapshot was created.

    Raises:
        IncorrectUsageError: If the aggregate is not event-sourced.
        ObjectNotFoundError: If no events exist for the given identifier.
    """
    if not part_of.meta_.is_event_sourced:
        raise IncorrectUsageError(
            f"`{part_of.__name__}` is not an event-sourced aggregate"
        )

    # Read ALL events (fresh reconstruction, not from existing snapshot)
    event_stream = deque(
        self._read(f"{part_of.meta_.stream_category}-{identifier}")
    )

    if not event_stream:
        raise ObjectNotFoundError(
            f"`{part_of.__name__}` object with identifier {identifier} "
            f"does not exist."
        )

    events = [Message.deserialize(msg).to_domain_object() for msg in event_stream]
    aggregate = part_of.from_events(events)

    self._write(
        f"{part_of.meta_.stream_category}:snapshot-{identifier}",
        "SNAPSHOT",
        aggregate.to_dict(),
    )

    return True

create_snapshots

create_snapshots(part_of: Type[BaseAggregate]) -> int

Create snapshots for all instances of an event-sourced aggregate.

Discovers all unique aggregate identifiers in the stream category, then creates a snapshot for each.

PARAMETER DESCRIPTION
part_of

The EventSourced Aggregate class

TYPE: Type[BaseAggregate]

RETURNS DESCRIPTION
int

Number of snapshots created.

RAISES DESCRIPTION
IncorrectUsageError

If the aggregate is not event-sourced.

Source code in src/protean/port/event_store.py
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
def create_snapshots(self, part_of: Type[BaseAggregate]) -> int:
    """Create snapshots for all instances of an event-sourced aggregate.

    Discovers all unique aggregate identifiers in the stream category,
    then creates a snapshot for each.

    Args:
        part_of: The EventSourced Aggregate class

    Returns:
        Number of snapshots created.

    Raises:
        IncorrectUsageError: If the aggregate is not event-sourced.
    """
    if not part_of.meta_.is_event_sourced:
        raise IncorrectUsageError(
            f"`{part_of.__name__}` is not an event-sourced aggregate"
        )

    identifiers = self._stream_identifiers(part_of.meta_.stream_category)
    count = 0
    for identifier in identifiers:
        self.create_snapshot(part_of, identifier)
        count += 1

    return count

trace_causation

trace_causation(message_id: str | Message) -> list[Message]

Walk UP the causation chain from a message to the root.

Returns an ordered list of Messages from the root command (first) to the given message (last). The given message itself is included.

PARAMETER DESCRIPTION
message_id

A Protean message ID string (headers.id) or a :class:Message object.

TYPE: str | Message

RETURNS DESCRIPTION
list[Message]

List of :class:Message objects in causal order (root first,

list[Message]

target last).

RAISES DESCRIPTION
ValueError

If the message cannot be found in the event store.

Source code in src/protean/port/event_store.py
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
def trace_causation(self, message_id: str | Message) -> list[Message]:
    """Walk UP the causation chain from a message to the root.

    Returns an ordered list of Messages from the root command (first)
    to the given message (last).  The given message itself is included.

    Args:
        message_id: A Protean message ID string (``headers.id``) or
            a :class:`Message` object.

    Returns:
        List of :class:`Message` objects in causal order (root first,
        target last).

    Raises:
        ValueError: If the message cannot be found in the event store.
    """
    mid, group = self._resolve_and_load_group(message_id)

    # Build lookup: headers.id -> raw_message
    by_id: dict[str, dict[str, Any]] = {}
    for m in group:
        hid = self._extract_message_id(m)
        if hid:
            by_id[hid] = m

    # Walk up from target to root
    chain: list[dict[str, Any]] = []
    current_id: str | None = mid
    visited: set[str] = set()

    while current_id and current_id not in visited:
        visited.add(current_id)
        raw_msg = by_id.get(current_id)
        if raw_msg is None:
            break
        chain.append(raw_msg)
        current_id = self._extract_causation_id(raw_msg)

    # Reverse so root is first
    chain.reverse()

    return [Message.deserialize(m) for m in chain]

trace_effects

trace_effects(message_id: str | Message, *, recursive: bool = True) -> list[Message]

Walk DOWN the causation chain to find all effects of a message.

Returns messages that were caused by the given message, ordered by global_position (chronological order).

PARAMETER DESCRIPTION
message_id

A Protean message ID string (headers.id) or a :class:Message object.

TYPE: str | Message

recursive

If True (default), return the full subtree of effects. If False, return only direct children.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
list[Message]

List of :class:Message objects caused by the given message,

list[Message]

in chronological order. The given message itself is NOT included.

RAISES DESCRIPTION
ValueError

If the message cannot be found in the event store.

Source code in src/protean/port/event_store.py
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
def trace_effects(
    self, message_id: str | Message, *, recursive: bool = True
) -> list[Message]:
    """Walk DOWN the causation chain to find all effects of a message.

    Returns messages that were caused by the given message, ordered by
    ``global_position`` (chronological order).

    Args:
        message_id: A Protean message ID string (``headers.id``) or
            a :class:`Message` object.
        recursive: If ``True`` (default), return the full subtree of
            effects.  If ``False``, return only direct children.

    Returns:
        List of :class:`Message` objects caused by the given message,
        in chronological order.  The given message itself is NOT included.

    Raises:
        ValueError: If the message cannot be found in the event store.
    """
    mid, group = self._resolve_and_load_group(message_id)

    # Build children lookup: causation_id -> [raw_messages]
    children: dict[str, list[dict[str, Any]]] = defaultdict(list)
    for m in group:
        cid = self._extract_causation_id(m)
        if cid:
            children[cid].append(m)

    if not recursive:
        direct = children.get(mid, [])
        direct.sort(key=lambda m: m.get("global_position", 0))
        return [Message.deserialize(m) for m in direct]

    # BFS for full subtree
    result: list[dict[str, Any]] = []
    queue: deque[str] = deque([mid])
    visited: set[str] = {mid}

    while queue:
        current = queue.popleft()
        for child in children.get(current, []):
            child_id = self._extract_message_id(child)
            if child_id and child_id not in visited:
                visited.add(child_id)
                result.append(child)
                queue.append(child_id)

    result.sort(key=lambda m: m.get("global_position", 0))
    return [Message.deserialize(m) for m in result]

build_causation_tree

build_causation_tree(correlation_id: str) -> CausationNode | None

Build a full causation tree for a correlation ID.

Returns the root node of the tree with children recursively populated.

PARAMETER DESCRIPTION
correlation_id

The correlation ID to trace.

TYPE: str

RETURNS DESCRIPTION
Root

class:CausationNode with children, or None if no

TYPE: CausationNode | None

CausationNode | None

messages found.

Source code in src/protean/port/event_store.py
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
def build_causation_tree(self, correlation_id: str) -> CausationNode | None:
    """Build a full causation tree for a correlation ID.

    Returns the root node of the tree with children recursively populated.

    Args:
        correlation_id: The correlation ID to trace.

    Returns:
        Root :class:`CausationNode` with children, or ``None`` if no
        messages found.
    """
    group = self._load_correlation_group(correlation_id)
    if not group:
        return None

    # Build index and children map
    by_id: dict[str, dict[str, Any]] = {}
    children_map: dict[str, list[dict[str, Any]]] = defaultdict(list)
    roots: list[dict[str, Any]] = []

    for m in group:
        hid = self._extract_message_id(m)
        if hid:
            by_id[hid] = m
        cid = self._extract_causation_id(m)
        if cid:
            children_map[cid].append(m)
        else:
            roots.append(m)

    # Sort children by global_position for deterministic ordering
    for cid in children_map:
        children_map[cid].sort(key=lambda m: m.get("global_position", 0))

    visited: set[str] = set()

    def _build_node(raw_msg: dict[str, Any]) -> CausationNode:
        hid = self._extract_message_id(raw_msg) or "?"
        visited.add(hid)

        metadata = raw_msg.get("metadata", {})
        if not isinstance(metadata, dict):
            metadata = {}
        headers = metadata.get("headers", {})
        if not isinstance(headers, dict):
            headers = {}
        domain_meta = metadata.get("domain", {})
        if not isinstance(domain_meta, dict):
            domain_meta = {}

        node = CausationNode(
            message_id=hid,
            message_type=raw_msg.get("type", headers.get("type", "?")),
            kind=domain_meta.get("kind", "?"),
            stream=raw_msg.get("stream_name", headers.get("stream", "?")),
            time=str(raw_msg.get("time", "")) if raw_msg.get("time") else None,
            global_position=raw_msg.get("global_position"),
        )

        for child_msg in children_map.get(hid, []):
            child_id = self._extract_message_id(child_msg)
            if child_id and child_id not in visited:
                node.children.append(_build_node(child_msg))

        return node

    if not roots:
        # All messages have causation_id set — pick the one whose
        # causation_id points outside the group
        root_candidates = [
            m for m in group if self._extract_causation_id(m) not in by_id
        ]
        roots = root_candidates if root_candidates else [group[0]]

    roots.sort(key=lambda m: m.get("global_position", 0))
    return _build_node(roots[0])

CausationNode

Tree node used by build_causation_tree() to represent the causation hierarchy of messages sharing a correlation_id.

A node in the causation tree, representing a single message and its effects.