Events
Most applications have a definite state - they reflect past user input and interactions in their current state. It is advantageous to model these past changes as a series of discrete events. Domain events happen to be those activities that domain experts care about and represent what happened as-is.
In Protean, an Event
is an immutable object that represents a significant
occurrence or change in the domain. Events are raised by aggregates to signal
that something noteworthy has happened, allowing other parts of the system to
react - and sync - to these changes in a decoupled manner.
Defining Events
Event names should be descriptive and convey the specific change or occurrence
in the domain clearly, ensuring that the purpose of the event is immediately
understandable. Events are named as past-tense verbs to clearly indicate
that an event has already occurred, such as OrderPlaced
or PaymentProcessed
.
You can define an event with the Domain.event
decorator:
from enum import Enum
from protean import Domain
from protean.fields import Identifier, String
domain = Domain(__file__, name="Authentication", load_toml=False)
class UserStatus(Enum):
ACTIVE = "ACTIVE"
ARCHIVED = "ARCHIVED"
@domain.event(part_of="User")
class UserActivated:
user_id = Identifier(required=True)
@domain.event(part_of="User")
class UserRenamed:
user_id = Identifier(required=True)
name = String(required=True, max_length=50)
@domain.aggregate
class User:
name = String(max_length=50, required=True)
email = String(required=True)
status = String(choices=UserStatus)
def activate(self) -> None:
self.status = UserStatus.ACTIVE.value
self.raise_(UserActivated(user_id=self.id))
def change_name(self, name: str) -> None:
self.name = name
self.raise_(UserRenamed(user_id=self.id, name=name))
Events are always connected to an Aggregate class, specified with the
part_of
param in the decorator. An exception to this rule is when the
Event class has been marked Abstract.
Event Structure
An event is made of three parts:
Headers
trace_id
The trace_id
is a unique identifier of UUID format, that connects all
processing originating from a request. Trace IDs provide a detailed view of
the request's journey through the system. It helps in understanding the
complete flow of a request, showing each service interaction, the time taken,
and where any delays occur.
Metadata
An event's metadata provides additional context about the event.
Sample metadata from an event:
{
"id": "test::user-411b2ceb-9513-45d7-9e03-bbc0846fae93-0",
"type": "Test.UserLoggedIn.v1",
"fqn": "tests.event.test_event_metadata.UserLoggedIn",
"kind": "EVENT",
"stream": "test::user-411b2ceb-9513-45d7-9e03-bbc0846fae93",
"origin_stream": null,
"timestamp": "2024-08-16 15:30:27.977101+00:00",
"version": "v1",
"sequence_id": "0",
"payload_hash": 2438879608558888394
}
id
The unique identifier of the event. The event ID is a structured string, of the
format <domain-name>::<aggregate-name>-<aggregate-id>-<sequence_id>
.
The id
value is simply an extension of the event's stream combined with the
sequence_id
. Read the section on sequence_id
to understand possible values.
type
Type of the event. Format is <domain-name>.<event-class-name>.<event-version>
.
For e.g. Shipping.OrderShipped.v1
.
fqn
Internal. The fully qualified name of the event. This is used by Protean to resconstruct objects from messages.
kind
Internal. Represents the kind of object enclosed in an event store message.
Value is EVENT
for Events. Metadata
class is shared between Events and
Commands, so possible values are EVENT
and COMMAND
.
stream
Name of the event stream. E.g. Stream auth::user-1234
encloses messages
related to User
aggregate in the Auth
domain with identity 1234
.
origin_stream
Name of the stream that originated this event or command. origin_stream
comes
handy when correlating related events or understanding causality.
timestamp
The timestamp of event generation in ISO 8601 format.
version
The version of the event class used to generate the event.
sequence_id
The sequence ID is the version of the aggregate when the event was generated, along with the sequence number of the event within the update.
For example, if the aggregate was updated twice, the first update would have a
sequence ID of 1.1
, and the second update would have a sequence ID of 2.1
.
If the next update generated two events, then the sequence ID of the second
event would be 3.2
.
If the aggregate is event-sourced, the sequence_id
is a single integer of the
position of the event in its stream.
payload_hash
The payload_hash
serves as a unique fingerprint for the event's
payload. It is generated by hashing the stringified event payload
json with sorted keys.
payload_hash
can be used to verify the integrity of the payload and in
implementing idempotent operations.
Payload
The payload is a dictionary of key-value pairs that convey the information about the event.
The payload is made available as the body of the event, which also includes
the event metadata. If you want to extract just the payload, you can use the
payload
property of the event.
In [1]: user = User(id="1", email="<EMAIL>", name="<NAME>")
In [2]: user.login()
In [3]: event = user._events[0]
In [4]: event
Out[4]: <UserLoggedIn: UserLoggedIn object ({'_metadata': {'id': '002::user-1-0.1', 'type': '002.UserLoggedIn.v1', 'fqn': '002.UserLoggedIn', 'kind': 'EVENT', 'stream': '002::user-1', 'origin_stream': None, 'timestamp': '2024-07-18 22:02:32.522360+00:00', 'version': 'v1', 'sequence_id': '0.1', 'payload_hash': 2731902408806877088}, 'user_id': '1'})>
In [5]: event.to_dict()
Out[5]:
{'_metadata': {'id': '002::user-1-0.1',
'type': '002.UserLoggedIn.v1',
'fqn': '002.UserLoggedIn',
'kind': 'EVENT',
'stream': '002::user-1',
'origin_stream': None,
'timestamp': '2024-07-18 22:02:32.522360+00:00',
'version': 'v1',
'sequence_id': '0.1',
'payload_hash': 2731902408806877088},
'user_id': '1'}
In [6]: event.payload
Out[6]: {'user_id': '1'}
Versioning
Because events serve as API contracts of an aggregate with the rest of the ecosystem, they are versioned to signal changes to contract.
Events have a default version of v1.
You can override and customize the version with the __version__
class
attribute:
@domain.event(part_of=User)
class UserActivated:
__version__ = "v2"
user_id = Identifier(required=True)
activated_at = DateTime(required=True)
The configured version is reflected in version
and type
attributes of the
generated event's metadata:
import json
from datetime import datetime, timezone
from protean import BaseEvent, Domain
from protean.fields import DateTime, Identifier, String
domain = Domain(__name__, name="Authentication", load_toml=False)
@domain.aggregate
class User:
id = Identifier(identifier=True)
email = String()
name = String()
status = String(choices=["INACTIVE", "ACTIVE", "ARCHIVED"], default="INACTIVE")
def login(self):
self.raise_(UserLoggedIn(user_id=self.id))
def activate(self):
self.status = "ACTIVE"
self.raise_(UserActivated(user_id=self.id))
@domain.event(part_of="User")
class UserLoggedIn(BaseEvent):
user_id = Identifier(identifier=True)
@domain.event(part_of="User")
class UserActivated:
__version__ = "v2"
user_id = Identifier(required=True)
activated_at = DateTime(required=True, default=lambda: datetime.now(timezone.utc))
domain.init(traverse=False)
with domain.domain_context():
user = User(id="1", email="<EMAIL>", name="<NAME>")
user.login()
print(json.dumps(user._events[0].to_dict(), indent=4))
""" Output:
{
"_metadata": {
"id": "authentication::user-1-0.1",
"type": "Authentication.UserLoggedIn.v1",
"fqn": "__main__.UserLoggedIn",
"kind": "EVENT",
"stream": "authentication::user-1",
"origin_stream": null,
"timestamp": "2024-07-18 22:06:10.148226+00:00",
"version": "v1",
"sequence_id": "0.1",
"payload_hash": 6154717103144054927
},
"user_id": "1"
}
"""
user.activate()
print(json.dumps(user._events[1].to_dict(), indent=4))
""" Output:
{
"_metadata": {
"id": "authentication::user-1-0.2",
"type": "Authentication.UserActivated.v2",
"fqn": "__main__.UserActivated",
"kind": "EVENT",
"stream": "authentication::user-1",
"origin_stream": null,
"timestamp": "2024-07-18 22:06:10.155603+00:00",
"version": "v2",
"sequence_id": "0.2",
"payload_hash": -3600345200911557224
},
"user_id": "1",
"activated_at": "2024-07-18 22:06:10.155694+00:00"
}
"""
Fact Events
A fact event encloses the entire state of the aggregate at that specific point in time. It contains all of the attributes and values necessary to completely describe the fact in the context of your business. You can think of a fact event similarly to how you may think of a row in a database: a complete set of data pertaining to the row at that point in time.
Fact events enable a pattern known as Event-carried State Transfer, which is one of the best ways to asynchronously distribute immutable state to all consumers who need it. With fact events, consumers do not have to build up the state themselves from multiple delta event types, which can be risky and error-prone, especially as data schemas evolve and change over time. Instead, they rely on the owning service to compute and produce a fully detailed fact event.
Fact events are generated automatically by the framework with the
fact_events=True
option in the domain.aggregate
decorator.
Read about generating fact events in the section on raising events.
Immutability
Event objects are immutable - they cannot be changed once created. This is important because events are meant to be used as a snapshot of the domain state at a specific point in time.
In [1]: user = User(name='John Doe', email='john@doe.com', status='ACTIVE')
In [2]: renamed = UserRenamed(user_id=user.id, name="John Doe Jr.")
In [3]: renamed.name = "John Doe Sr."
...
IncorrectUsageError: 'Event/Command Objects are immutable and cannot be modified once created'