Skip to content

UnitOfWork

Transaction boundary for persistence operations. Used as a context manager to group multiple repository operations into a single atomic transaction.

See Unit of Work guide for practical usage.

Transaction boundary for persistence operations.

Groups one or more repository operations into an atomic unit. Use as a context manager to ensure that all changes within the block are committed together or rolled back on error::

with UnitOfWork():
    repo = domain.repository_for(Order)
    order = repo.get(order_id)
    order.confirm()
    repo.add(order)

Command handlers and the @use_case decorator wrap their execution in a UnitOfWork automatically, so explicit usage is typically only needed in application services or scripts.

The UnitOfWork maintains an identity map to track loaded aggregates and collects domain events raised during the transaction. On commit, events are persisted to the outbox and dispatched to brokers/event store.

Source code in src/protean/core/unit_of_work.py
42
43
44
45
46
47
48
def __init__(self) -> None:
    self.domain = current_domain
    self._in_progress = False

    self._sessions = {}
    self._messages_to_dispatch = []
    self._identity_map = defaultdict(dict)

start

start() -> None

Begin the transaction and push this UnitOfWork onto the context stack.

Source code in src/protean/core/unit_of_work.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def start(self) -> None:
    """Begin the transaction and push this UnitOfWork onto the context stack."""
    # Log transaction capability warnings for each configured provider
    for provider_name, provider in self.domain.providers.items():
        if not provider.has_capability(DatabaseCapabilities.TRANSACTIONS):
            if provider.has_capability(DatabaseCapabilities.SIMULATED_TRANSACTIONS):
                logger.debug(
                    "Provider '%s' uses simulated transactions. "
                    "Rollback will not undo persisted changes.",
                    provider_name,
                )
            else:
                logger.warning(
                    "Provider '%s' does not support transactions. "
                    "UoW will manage identity map and events "
                    "but commit/rollback are not atomic.",
                    provider_name,
                )

    self._in_progress = True
    _uow_context_stack.push(self)

commit

commit() -> None

Commit all changes, persist outbox messages, and dispatch events.

RAISES DESCRIPTION
InvalidOperationError

If the UnitOfWork is not in progress.

ExpectedVersionError

On optimistic concurrency conflict.

TransactionError

If the underlying database commit fails.

Source code in src/protean/core/unit_of_work.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def commit(self) -> None:  # noqa: C901
    """Commit all changes, persist outbox messages, and dispatch events.

    Raises:
        InvalidOperationError: If the UnitOfWork is not in progress.
        ExpectedVersionError: On optimistic concurrency conflict.
        TransactionError: If the underlying database commit fails.
    """
    # Raise error if there the Unit Of Work is not active
    logger.debug(f"Committing {self}...")
    if not self._in_progress:
        raise InvalidOperationError("UnitOfWork is not in progress")

    tracer = self.domain.tracer

    with tracer.start_as_current_span(
        "protean.uow.commit",
        record_exception=False,
        set_status_on_exception=False,
    ) as span:
        # Propagate correlation and causation IDs from the message being processed
        from protean.utils.globals import g

        msg = g.get("message_in_context")
        if msg is not None and hasattr(msg, "metadata") and msg.metadata:
            domain_meta = getattr(msg.metadata, "domain", None)
            if domain_meta is not None:
                correlation_id = getattr(domain_meta, "correlation_id", None)
                if correlation_id:
                    span.set_attribute("protean.correlation_id", correlation_id)

                causation_id = getattr(domain_meta, "causation_id", None)
                if causation_id:
                    span.set_attribute("protean.causation_id", causation_id)

        self._do_commit(span)

rollback

rollback() -> None

Roll back all changes and close sessions.

RAISES DESCRIPTION
InvalidOperationError

If the UnitOfWork is not in progress.

Source code in src/protean/core/unit_of_work.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def rollback(self) -> None:
    """Roll back all changes and close sessions.

    Raises:
        InvalidOperationError: If the UnitOfWork is not in progress.
    """
    # Raise error if the Unit Of Work is not active
    if not self._in_progress:
        raise InvalidOperationError("UnitOfWork is not in progress")

    # Exit from Unit of Work
    _uow_context_stack.pop()

    try:
        for session in self._sessions.values():
            session.rollback()

        logger.debug("Transaction rolled back")
    except Exception as exc:
        logger.error(f"Error during Transaction rollback: {str(exc)}")

    self._reset()

get_session

get_session(provider_name)

Get session for provider, initializing one if it doesn't exist

Source code in src/protean/core/unit_of_work.py
386
387
388
389
390
391
def get_session(self, provider_name):
    """Get session for provider, initializing one if it doesn't exist"""
    if provider_name in self._sessions:
        return self._sessions[provider_name]
    else:
        return self._initialize_session(provider_name)