Event Processing
Subscribers consume external broker messages, process managers coordinate multi-step workflows, and upcasters handle event schema evolution.
Guides: Subscribers · Process Managers · Event Upcasting
BaseSubscriber
Bases: Element, OptionsMixin
Base class for subscribers that consume messages from external message brokers and act as an anti-corruption layer at the domain boundary.
Subscribers listen to named broker streams, receive raw dict payloads
(not typed domain events), and translate external data into domain
operations. Implement the __call__ method to process incoming messages.
Meta Options
| Option | Type | Description |
|---|---|---|
broker |
str |
The broker adapter name to consume from (default: "default"). |
stream |
str |
The external stream/topic name to subscribe to. Required. |
__call__
abstractmethod
__call__(payload: dict) -> None
Placeholder method for receiving notifications on event
Source code in src/protean/core/subscriber.py
42 43 44 | |
handle_error
classmethod
handle_error(exc: Exception, message: dict) -> None
Error handler method called when exceptions occur during broker message handling.
This method can be overridden in subclasses to provide custom error handling for exceptions that occur during message processing. It allows subscribers to recover from errors, log additional information, or perform cleanup operations.
When an exception occurs in a subscriber's call method: 1. The exception is caught in Engine.handle_broker_message 2. Details are logged with traceback information 3. This handle_error method is called with the exception and original message 4. Processing continues with the next message (the engine does not shut down)
If this method raises an exception itself, that exception is also caught and logged, but not propagated further.
| PARAMETER | DESCRIPTION |
|---|---|
exc
|
The exception that was raised during message handling
TYPE:
|
message
|
The original message being processed when the exception occurred
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
None
|
None |
Note
- The default implementation does nothing, allowing processing to continue
- Subclasses can override this method to implement custom error handling strategies
- This method is called from a try/except block, so exceptions raised here won't crash the engine
Source code in src/protean/core/subscriber.py
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | |
BaseProcessManager
BaseProcessManager(*args: Any, **kwargs: Any)
Bases: BaseModel, HandlerMixin, OptionsMixin
Base class for Process Managers.
A Process Manager combines handler-like event dispatch (from multiple aggregate streams) with aggregate-like stateful persistence (via auto-generated transition events in the event store).
Meta Options
| Option | Type | Description |
|---|---|---|
stream_categories |
list |
Stream categories to subscribe to. |
aggregates |
list |
Aggregate classes to listen to (categories derived automatically). |
subscription_type |
str |
The subscription type to use. |
subscription_profile |
str |
A predefined configuration profile. |
subscription_config |
dict |
Dictionary of custom configuration overrides. |
Source code in src/protean/core/process_manager.py
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 | |
__track_id_field
classmethod
__track_id_field() -> None
Find the field marked identifier=True and record its name.
Source code in src/protean/core/process_manager.py
215 216 217 218 219 220 221 222 223 224 225 226 | |
mark_as_complete
mark_as_complete() -> None
Mark this process manager as complete.
Once complete, no further events will be processed for this instance.
Source code in src/protean/core/process_manager.py
252 253 254 255 256 257 | |
to_dict
to_dict() -> dict[str, Any]
Return process manager data as a dictionary.
Source code in src/protean/core/process_manager.py
262 263 264 265 266 267 268 269 | |
BaseUpcaster
Bases: Element, OptionsMixin
Base class for event upcasters.
Subclasses must implement upcast() which receives the raw event
payload dict (as stored in the event store) and returns a transformed
dict compatible with the target version's schema.
Meta Options
| Option | Type | Description |
|---|---|---|
event_type |
type |
The event class this upcaster targets (current version). |
from_version |
str |
Source version string (e.g. "v1"). |
to_version |
str |
Target version string (e.g. "v2"). |
upcast
abstractmethod
upcast(data: dict) -> dict
Transform event data from from_version to to_version.
| PARAMETER | DESCRIPTION |
|---|---|
data
|
The raw event payload dictionary as stored.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict
|
The transformed payload dictionary compatible with |
Source code in src/protean/core/upcaster.py
54 55 56 57 58 59 60 61 62 63 64 | |