Skip to content

Event Processing

Subscribers consume external broker messages, process managers coordinate multi-step workflows, and upcasters handle event schema evolution.

Guides: Subscribers · Process Managers · Event Upcasting


BaseSubscriber

Bases: Element, OptionsMixin

Base class for subscribers that consume messages from external message brokers and act as an anti-corruption layer at the domain boundary.

Subscribers listen to named broker streams, receive raw dict payloads (not typed domain events), and translate external data into domain operations. Implement the __call__ method to process incoming messages.

Meta Options

Option Type Description
broker str The broker adapter name to consume from (default: "default").
stream str The external stream/topic name to subscribe to. Required.

__call__ abstractmethod

__call__(payload: dict) -> None

Placeholder method for receiving notifications on event

Source code in src/protean/core/subscriber.py
42
43
44
@abstractmethod
def __call__(self, payload: dict) -> None:
    """Placeholder method for receiving notifications on event"""

handle_error classmethod

handle_error(exc: Exception, message: dict) -> None

Error handler method called when exceptions occur during broker message handling.

This method can be overridden in subclasses to provide custom error handling for exceptions that occur during message processing. It allows subscribers to recover from errors, log additional information, or perform cleanup operations.

When an exception occurs in a subscriber's call method: 1. The exception is caught in Engine.handle_broker_message 2. Details are logged with traceback information 3. This handle_error method is called with the exception and original message 4. Processing continues with the next message (the engine does not shut down)

If this method raises an exception itself, that exception is also caught and logged, but not propagated further.

PARAMETER DESCRIPTION
exc

The exception that was raised during message handling

TYPE: Exception

message

The original message being processed when the exception occurred

TYPE: dict

RETURNS DESCRIPTION
None

None

Note
  • The default implementation does nothing, allowing processing to continue
  • Subclasses can override this method to implement custom error handling strategies
  • This method is called from a try/except block, so exceptions raised here won't crash the engine
Source code in src/protean/core/subscriber.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@classmethod
def handle_error(cls, exc: Exception, message: dict) -> None:
    """Error handler method called when exceptions occur during broker message handling.

    This method can be overridden in subclasses to provide custom error handling
    for exceptions that occur during message processing. It allows subscribers to
    recover from errors, log additional information, or perform cleanup operations.

    When an exception occurs in a subscriber's __call__ method:
    1. The exception is caught in Engine.handle_broker_message
    2. Details are logged with traceback information
    3. This handle_error method is called with the exception and original message
    4. Processing continues with the next message (the engine does not shut down)

    If this method raises an exception itself, that exception is also caught and logged,
    but not propagated further.

    Args:
        exc (Exception): The exception that was raised during message handling
        message (dict): The original message being processed when the exception occurred

    Returns:
        None

    Note:
        - The default implementation does nothing, allowing processing to continue
        - Subclasses can override this method to implement custom error handling strategies
        - This method is called from a try/except block, so exceptions raised here won't crash the engine
    """

BaseProcessManager

BaseProcessManager(*args: Any, **kwargs: Any)

Bases: BaseModel, HandlerMixin, OptionsMixin

Base class for Process Managers.

A Process Manager combines handler-like event dispatch (from multiple aggregate streams) with aggregate-like stateful persistence (via auto-generated transition events in the event store).

Meta Options

Option Type Description
stream_categories list Stream categories to subscribe to.
aggregates list Aggregate classes to listen to (categories derived automatically).
subscription_type str The subscription type to use.
subscription_profile str A predefined configuration profile.
subscription_config dict Dictionary of custom configuration overrides.
Source code in src/protean/core/process_manager.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def __init__(self, *args: Any, **kwargs: Any) -> None:
    if self.meta_.abstract is True:
        raise NotSupportedError(
            f"{self.__class__.__name__} class has been marked abstract"
            f" and cannot be instantiated"
        )

    # Support template dict pattern
    if args:
        merged: dict[str, Any] = {}
        for template in args:
            if not isinstance(template, dict):
                raise AssertionError(
                    f"Positional argument {template} passed must be a dict."
                )
            merged.update(template)
        merged.update(kwargs)
        kwargs = merged

    super().__init__(**kwargs)

__track_id_field classmethod

__track_id_field() -> None

Find the field marked identifier=True and record its name.

Source code in src/protean/core/process_manager.py
215
216
217
218
219
220
221
222
223
224
225
226
@classmethod
def __track_id_field(cls) -> None:
    """Find the field marked ``identifier=True`` and record its name."""
    try:
        id_fld = next(
            field
            for _, field in getattr(cls, _FIELDS, {}).items()
            if getattr(field, "identifier", False)
        )
        setattr(cls, _ID_FIELD_NAME, id_fld.field_name)
    except StopIteration:
        pass

mark_as_complete

mark_as_complete() -> None

Mark this process manager as complete.

Once complete, no further events will be processed for this instance.

Source code in src/protean/core/process_manager.py
252
253
254
255
256
257
def mark_as_complete(self) -> None:
    """Mark this process manager as complete.

    Once complete, no further events will be processed for this instance.
    """
    self._is_complete = True

to_dict

to_dict() -> dict[str, Any]

Return process manager data as a dictionary.

Source code in src/protean/core/process_manager.py
262
263
264
265
266
267
268
269
def to_dict(self) -> dict[str, Any]:
    """Return process manager data as a dictionary."""
    result: dict[str, Any] = {}
    for fname, shim in getattr(self, _FIELDS, {}).items():
        if fname.startswith("_"):
            continue
        result[fname] = shim.as_dict(getattr(self, fname, None))
    return result

BaseUpcaster

Bases: Element, OptionsMixin

Base class for event upcasters.

Subclasses must implement upcast() which receives the raw event payload dict (as stored in the event store) and returns a transformed dict compatible with the target version's schema.

Meta Options

Option Type Description
event_type type The event class this upcaster targets (current version).
from_version str Source version string (e.g. "v1").
to_version str Target version string (e.g. "v2").

upcast abstractmethod

upcast(data: dict) -> dict

Transform event data from from_version to to_version.

PARAMETER DESCRIPTION
data

The raw event payload dictionary as stored.

TYPE: dict

RETURNS DESCRIPTION
dict

The transformed payload dictionary compatible with to_version.

Source code in src/protean/core/upcaster.py
54
55
56
57
58
59
60
61
62
63
64
@abstractmethod
def upcast(self, data: dict) -> dict:
    """Transform event data from ``from_version`` to ``to_version``.

    Args:
        data: The raw event payload dictionary as stored.

    Returns:
        The transformed payload dictionary compatible with ``to_version``.
    """
    ...