Skip to content

Persist Aggregates

DDD CQRS ES

This guide covers how to persist aggregates through repositories -- from basic save operations and transactions to event publishing and updates.

Aggregates are saved into the configured database using add method of the repository.

from protean import Domain
from protean.fields import String

domain = Domain()


@domain.aggregate
class Person:
    name: String(required=True, max_length=50)
    email: String(required=True, max_length=254)


domain.init(traverse=False)
with domain.domain_context():
    person = Person(
        id="1",  # (1)
        name="John Doe",
        email="john.doe@localhost",
    )
    domain.repository_for(Person).add(person)
  1. Identity, by default, is a string.
In [1]: domain.repository_for(Person).get("1")
Out[1]: <Person: Person object (id: 1)>

In [2]: domain.repository_for(Person).get("1").to_dict()
Out[2]: {'name': 'John Doe', 'email': 'john.doe@localhost', 'id': '1'}

Transaction

The add method is enclosed in a Unit of Work context by default. Changes are committed to the persistence store when the Unit of Work context exits.

The following calls are equivalent in behavior:

...
# Version 1
domain.repository_for(Person).add(person)
...

...
# Version 2
from protean import UnitOfWork

with UnitOfWork():
    domain.repository_for(Person).add(person)
...

This means changes across the aggregate cluster are committed as a single transaction (assuming the underlying database supports transactions, of course).

from protean import Domain
from protean.fields import Float, HasMany, String, Text

domain = Domain()


@domain.aggregate
class Post:
    title: String(required=True, max_length=100)
    body: Text()
    comments = HasMany("Comment")


@domain.entity(part_of=Post)
class Comment:
    content: String(required=True, max_length=50)
    rating: Float(max_value=5)


domain.init(traverse=False)
with domain.domain_context():
    post = Post(
        id="1",
        title="A Great Post",
        body="This is the body of a great post",
        comments=[
            Comment(id="1", content="Amazing!", rating=5.0),
            Comment(id="2", content="Great!", rating=4.5),
        ],
    )

    # This persists one `Post` record and two `Comment` records
    domain.repository_for(Post).add(post)

Note

This is especially handy in Relational databases because each entity is a separate table.

Events

The add method also publishes events to configured brokers upon successfully persisting to the database.

from protean import Domain
from protean.fields import Boolean, Identifier, String, Text

domain = Domain()


@domain.aggregate
class Post:
    title: String(required=True, max_length=100)
    body: Text()
    published: Boolean(default=False)

    def publish(self):
        self.published = True
        self.raise_(PostPublished(post_id=self.id, body=self.body))


@domain.event(part_of=Post)
class PostPublished:
    post_id: Identifier(required=True)
    body: Text()
In [1]: post = Post(title="Events in Aggregates", body="Lorem ipsum dolor sit amet, consectetur adipiscing...")

In [2]: post.to_dict()
Out[2]:
{'title': 'Events in Aggregates',
 'body': 'Lorem ipsum dolor sit amet, consectetur adipiscing...',
 'published': False,
 'id': 'a9ea7763-c5b2-4c8c-9c97-43ba890517d0'}

In [3]: post.publish()

In [4]: post._events
Out[4]: [<PostPublished: PostPublished object ({
    'post_id': 'a9ea7763-c5b2-4c8c-9c97-43ba890517d0',
    'body': 'Lorem ipsum dolor sit amet, consectetur adipiscing...'
})>]

In [5]: domain.repository_for(Post).add(post)
Out[5]: <Post: Post object (id: a9ea7763-c5b2-4c8c-9c97-43ba890517d0)>

In [6]: post._events
Out[6]: []

Updates

Recall that Protean repositories behave like a set collection. Updating is as simple as mutating an aggregate and persisting it with add again.

In [1]: post = Post(
   ...:     id="1",
   ...:     title="Events in Aggregates",
   ...:     body="Lorem ipsum dolor sit amet, consectetur adipiscing..."
   ...: )

In [2]: domain.repository_for(Post).add(post)
Out[2]: <Post: Post object (id: 1)>

In [3]: domain.repository_for(Post).get("1")
Out[3]: <Post: Post object (id: 1)>

In [4]: domain.repository_for(Post).get("1").to_dict()
Out[4]:
{'title': 'Events in Aggregates',
 'body': 'Lorem ipsum dolor sit amet, consectetur adipiscing...',
 'published': False,
 'id': '1'}

In [5]: post.title = "(Updated Title) Events in Entities"

In [6]: domain.repository_for(Post).add(post)
Out[6]: <Post: Post object (id: 1)>

In [7]: domain.repository_for(Post).get("1").to_dict()
Out[7]:
{'title': '(Updated Title) Events in Entities',
 'body': 'Lorem ipsum dolor sit amet, consectetur adipiscing...',
 'published': False,
 'id': '1'}

See also

Concept overview: Repositories — The role of repositories in DDD and how Protean implements the pattern.

Related guides:

  • Repositories — Define custom repositories with domain-named query methods.
  • Retrieve Aggregates — Query, filter, sort, and paginate aggregates.
  • Unit of Work — Transactional consistency when persisting multiple changes.