Skip to content

Raising Events

An aggregate rarely exists in isolation - it's state changes often mean that other parts of the system of the system have to sync up. In DDD, the mechanism to accomplish this is through Domain Events.

Delta Events

When an aggregate mutates, it also (preferably) raises one or more events to record the state change in time, as well as propagate it within and beyond the bounded context.

@banking.aggregate
class Account:
    account_number = Identifier(required=True, unique=True)
    balance = Float()
    overdraft_limit = Float(default=0.0)

    @invariant.post
    def balance_must_be_greater_than_or_equal_to_overdraft_limit(self):
        if self.balance < -self.overdraft_limit:
            raise InsufficientFundsException("Balance cannot be below overdraft limit")

    def withdraw(self, amount: float):
        self.balance -= amount  # Update account state (mutation)

        self.raise_(AccountWithdrawn(account_number=self.account_number, amount=amount))

The generated events are collected in the mutated aggregate:

In [1]: account = Account(account_number="1234", balance=1000.0, overdraft_limit=50.0)

In [2]: account.withdraw(500.0)

In [3]: account.to_dict()
Out[3]: 
{'account_number': '1234',
 'balance': 500.0,
 'overdraft_limit': 50.0,
 'id': '37fc8d10-1209-41d2-a6fa-4f7312912212'}

In [4]: account._events
Out[4]: [<AccountWithdrawn: AccountWithdrawn object (
{'account_number': '1234',
 'amount': 500.0})>]

Raising Events from Entities

Any entity in the aggregate cluster can raise events. But the events are collected in the aggregate alone. Aggregates are also responisible for consuming events and performing state changes on underlying entities, as we will see in the future.

If the event is being generated by an entity, it can access the enclosing aggregate's identity with the help of _owner property:

from datetime import date

from protean import Domain
from protean.fields import Date, HasMany, Identifier, String

domain = Domain(__file__, load_toml=False)


@domain.aggregate
class Patron:
    name = String(required=True, max_length=50)
    status = String(choices=["ACTIVE", "INACTIVE"], default="ACTIVE")
    holds = HasMany("Hold")

    def cancel_hold(self, hold_id):
        self.get_one_from_holds(id=hold_id).cancel_hold()


@domain.event(part_of="Patron")
class HoldCanceled:
    hold_id = Identifier(required=True)
    book_id = Identifier(required=True)
    patron_id = Identifier(required=True)
    canceled_on = Date(default=date.today())


@domain.entity(part_of="Patron")
class Hold:
    book_id = Identifier(required=True)
    status = String(choices=["ACTIVE", "EXPIRED", "CANCELLED"], default="ACTIVE")
    placed_on = Date(default=date.today())

    def cancel_hold(self):
        self.status = "CANCELLED"
        self.raise_(
            HoldCanceled(
                hold_id=self.id,
                book_id=self.book_id,
                patron_id=self._owner.id,  # (1)
            )
        )
  1. self._owner here is the owning Patron object.

Dispatching Events

These events are dispatched automatically to registered brokers when the aggregate is persisted. We will explore this when we discuss repositories, but you can also manually publish the events to the rest of the system with domain.publish().

In [1]: order = Order(
   ...:         customer_id=1, premium_customer=True,
   ...:         items=[
   ...:             OrderItem(product_id=1, quantity=2, price=10.0),
   ...:             OrderItem(product_id=2, quantity=1, price=20.0),
   ...:         ]
   ...:     )

In [2]: order.confirm()

In [3]: order._events
Out[3]: 
[<OrderConfirmed: OrderConfirmed object ({'order_id': '149b5549-3903-459e-9127-731266372472', 'confirmed_at': '2024-06-10 22:53:25.827101+00:00'})>,
 <OrderDiscountApplied: OrderDiscountApplied object ({'order_id': '149b5549-3903-459e-9127-731266372472', 'customer_id': '1'})>]

In [4]: domain.publish(order._events)

Fact Events

Fact events are automatically generated by Protean.

The event name is of the format <AggregateName>FactEvent, and the stream name will be <snakecase_aggregate_name>-<fact>-<aggregate_-_id>.

import json

from protean import Domain
from protean.fields import HasOne, String
from protean.utils.mixins import Message

domain = Domain(__file__, name="Authentication", load_toml=False)


@domain.aggregate(fact_events=True)
class User:
    name = String(max_length=50, required=True)
    email = String(required=True)
    status = String(choices=["ACTIVE", "ARCHIVED"])

    account = HasOne("Account")


@domain.entity(part_of=User)
class Account:
    password_hash = String(max_length=512)


domain.init(traverse=False)
with domain.domain_context():
    user = User(name="John Doe", email="john.doe@example.com")

    # Persist the user
    domain.repository_for(User).add(user)

    event_message = domain.event_store.store.read(
        f"authentication::user-fact-{user.id}"
    )[0]
    event = Message.to_object(event_message)

    print(json.dumps(event.to_dict(), indent=4))

    """ Output:
    {
        "_metadata": {
            "id": "authentication::user-fact-781c4363-5e7e-4c53-a599-2cb2dc428d96-0.1",
            "type": "Authentication.UserFactEvent.v1",
            "fqn": "protean.container.UserFactEvent",
            "kind": "EVENT",
            "stream": "authentication::user-fact-781c4363-5e7e-4c53-a599-2cb2dc428d96",
            "origin_stream": null,
            "timestamp": "2024-07-18 22:01:02.364078+00:00",
            "version": "v1",
            "sequence_id": "0.1",
            "payload_hash": 2754382941688457931
        },
        "_version": 0,
        "name": "John Doe",
        "email": "john.doe@example.com",
        "status": null,
        "account": null,
        "id": "781c4363-5e7e-4c53-a599-2cb2dc428d96"
    }
    """

The fact event for User aggregate in the above example is UserFactEvent and the output stream is user-fact-e97cef08-f11d-43eb-8a69-251a0828bbff