Raising Events
An aggregate rarely exists in isolation - it's state changes often mean that other parts of the system of the system have to sync up. In DDD, the mechanism to accomplish this is through Domain Events.
Delta Events
When an aggregate mutates, it also (preferably) raises one or more events to record the state change in time, as well as propagate it within and beyond the bounded context.
@banking.aggregate
class Account:
account_number = Identifier(required=True, unique=True)
balance = Float()
overdraft_limit = Float(default=0.0)
@invariant.post
def balance_must_be_greater_than_or_equal_to_overdraft_limit(self):
if self.balance < -self.overdraft_limit:
raise InsufficientFundsException("Balance cannot be below overdraft limit")
def withdraw(self, amount: float):
self.balance -= amount # Update account state (mutation)
self.raise_(AccountWithdrawn(account_number=self.account_number, amount=amount))
The generated events are collected in the mutated aggregate:
In [1]: account = Account(account_number="1234", balance=1000.0, overdraft_limit=50.0)
In [2]: account.withdraw(500.0)
In [3]: account.to_dict()
Out[3]:
{'account_number': '1234',
'balance': 500.0,
'overdraft_limit': 50.0,
'id': '37fc8d10-1209-41d2-a6fa-4f7312912212'}
In [4]: account._events
Out[4]: [<AccountWithdrawn: AccountWithdrawn object (
{'account_number': '1234',
'amount': 500.0})>]
Raising Events from Entities
Any entity in the aggregate cluster can raise events. But the events are collected in the aggregate alone. Aggregates are also responisible for consuming events and performing state changes on underlying entities, as we will see in the future.
If the event is being generated by an entity, it can access the enclosing
aggregate's identity with the help of _owner
property:
from datetime import date
from protean import Domain
from protean.fields import Date, HasMany, Identifier, String
domain = Domain(__file__, load_toml=False)
@domain.aggregate
class Patron:
name = String(required=True, max_length=50)
status = String(choices=["ACTIVE", "INACTIVE"], default="ACTIVE")
holds = HasMany("Hold")
def cancel_hold(self, hold_id):
self.get_one_from_holds(id=hold_id).cancel_hold()
@domain.event(part_of="Patron")
class HoldCanceled:
hold_id = Identifier(required=True)
book_id = Identifier(required=True)
patron_id = Identifier(required=True)
canceled_on = Date(default=date.today())
@domain.entity(part_of="Patron")
class Hold:
book_id = Identifier(required=True)
status = String(choices=["ACTIVE", "EXPIRED", "CANCELLED"], default="ACTIVE")
placed_on = Date(default=date.today())
def cancel_hold(self):
self.status = "CANCELLED"
self.raise_(
HoldCanceled(
hold_id=self.id,
book_id=self.book_id,
patron_id=self._owner.id, # (1)
)
)
self._owner
here is the owningPatron
object.
Dispatching Events
These events are dispatched automatically to registered brokers when the
aggregate is persisted. We will explore this when we discuss repositories, but
you can also manually publish the events to the rest of the system with
domain.publish()
.
In [1]: order = Order(
...: customer_id=1, premium_customer=True,
...: items=[
...: OrderItem(product_id=1, quantity=2, price=10.0),
...: OrderItem(product_id=2, quantity=1, price=20.0),
...: ]
...: )
In [2]: order.confirm()
In [3]: order._events
Out[3]:
[<OrderConfirmed: OrderConfirmed object ({'order_id': '149b5549-3903-459e-9127-731266372472', 'confirmed_at': '2024-06-10 22:53:25.827101+00:00'})>,
<OrderDiscountApplied: OrderDiscountApplied object ({'order_id': '149b5549-3903-459e-9127-731266372472', 'customer_id': '1'})>]
In [4]: domain.publish(order._events)
Fact Events
Fact events are automatically generated by Protean.
The event name is of the format
<AggregateName>FactEvent
, and the stream name will be
<snakecase_aggregate_name>-<fact>-<aggregate_-_id>
.
import json
from protean import Domain
from protean.fields import HasOne, String
from protean.utils.mixins import Message
domain = Domain(__file__, name="Authentication", load_toml=False)
@domain.aggregate(fact_events=True)
class User:
name = String(max_length=50, required=True)
email = String(required=True)
status = String(choices=["ACTIVE", "ARCHIVED"])
account = HasOne("Account")
@domain.entity(part_of=User)
class Account:
password_hash = String(max_length=512)
domain.init(traverse=False)
with domain.domain_context():
user = User(name="John Doe", email="john.doe@example.com")
# Persist the user
domain.repository_for(User).add(user)
event_message = domain.event_store.store.read(
f"authentication::user-fact-{user.id}"
)[0]
event = Message.to_object(event_message)
print(json.dumps(event.to_dict(), indent=4))
""" Output:
{
"_metadata": {
"id": "authentication::user-fact-781c4363-5e7e-4c53-a599-2cb2dc428d96-0.1",
"type": "Authentication.UserFactEvent.v1",
"fqn": "protean.container.UserFactEvent",
"kind": "EVENT",
"stream": "authentication::user-fact-781c4363-5e7e-4c53-a599-2cb2dc428d96",
"origin_stream": null,
"timestamp": "2024-07-18 22:01:02.364078+00:00",
"version": "v1",
"sequence_id": "0.1",
"payload_hash": 2754382941688457931
},
"_version": 0,
"name": "John Doe",
"email": "john.doe@example.com",
"status": null,
"account": null,
"id": "781c4363-5e7e-4c53-a599-2cb2dc428d96"
}
"""
The fact event for User
aggregate in the above example is UserFactEvent
and the output stream is user-fact-e97cef08-f11d-43eb-8a69-251a0828bbff