Skip to content

UnitOfWork

Transaction boundary for persistence operations. Used as a context manager to group multiple repository operations into a single atomic transaction.

See Unit of Work guide for practical usage.

Transaction boundary for persistence operations.

Groups one or more repository operations into an atomic unit. Use as a context manager to ensure that all changes within the block are committed together or rolled back on error::

with UnitOfWork():
    repo = domain.repository_for(Order)
    order = repo.get(order_id)
    order.confirm()
    repo.add(order)

Command handlers and the @use_case decorator wrap their execution in a UnitOfWork automatically, so explicit usage is typically only needed in application services or scripts.

The UnitOfWork maintains an identity map to track loaded aggregates and collects domain events raised during the transaction. On commit, events are persisted to the outbox and dispatched to brokers/event store.

Source code in src/protean/core/unit_of_work.py
42
43
44
45
46
47
48
def __init__(self) -> None:
    self.domain = current_domain
    self._in_progress = False

    self._sessions = {}
    self._messages_to_dispatch = []
    self._identity_map = defaultdict(dict)

start

start() -> None

Begin the transaction and push this UnitOfWork onto the context stack.

Source code in src/protean/core/unit_of_work.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def start(self) -> None:
    """Begin the transaction and push this UnitOfWork onto the context stack."""
    # Log transaction capability warnings for each configured provider
    for provider_name, provider in self.domain.providers.items():
        if not provider.has_capability(DatabaseCapabilities.TRANSACTIONS):
            if provider.has_capability(DatabaseCapabilities.SIMULATED_TRANSACTIONS):
                logger.debug(
                    "Provider '%s' uses simulated transactions. "
                    "Rollback will not undo persisted changes.",
                    provider_name,
                )
            else:
                logger.warning(
                    "Provider '%s' does not support transactions. "
                    "UoW will manage identity map and events "
                    "but commit/rollback are not atomic.",
                    provider_name,
                )

    self._in_progress = True
    _uow_context_stack.push(self)

commit

commit() -> None

Commit all changes, persist outbox messages, and dispatch events.

RAISES DESCRIPTION
InvalidOperationError

If the UnitOfWork is not in progress.

ExpectedVersionError

On optimistic concurrency conflict.

TransactionError

If the underlying database commit fails.

Source code in src/protean/core/unit_of_work.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def commit(self) -> None:  # noqa: C901
    """Commit all changes, persist outbox messages, and dispatch events.

    Raises:
        InvalidOperationError: If the UnitOfWork is not in progress.
        ExpectedVersionError: On optimistic concurrency conflict.
        TransactionError: If the underlying database commit fails.
    """
    # Raise error if there the Unit Of Work is not active
    logger.debug("uow.committing", extra={"uow_id": id(self)})
    if not self._in_progress:
        raise InvalidOperationError("UnitOfWork is not in progress")

    tracer = self.domain.tracer

    with tracer.start_as_current_span(
        "protean.uow.commit",
        record_exception=False,
        set_status_on_exception=False,
    ) as span:
        # Propagate correlation and causation IDs from the message being processed
        from protean.utils.globals import g

        msg = g.get("message_in_context")
        if msg is not None and hasattr(msg, "metadata") and msg.metadata:
            domain_meta = getattr(msg.metadata, "domain", None)
            if domain_meta is not None:
                correlation_id = getattr(domain_meta, "correlation_id", None)
                if correlation_id:
                    span.set_attribute("protean.correlation_id", correlation_id)

                causation_id = getattr(domain_meta, "causation_id", None)
                if causation_id:
                    span.set_attribute("protean.causation_id", causation_id)

        self._do_commit(span)

rollback

rollback() -> None

Roll back all changes and close sessions.

RAISES DESCRIPTION
InvalidOperationError

If the UnitOfWork is not in progress.

Source code in src/protean/core/unit_of_work.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
def rollback(self) -> None:
    """Roll back all changes and close sessions.

    Raises:
        InvalidOperationError: If the UnitOfWork is not in progress.
    """
    # Raise error if the Unit Of Work is not active
    if not self._in_progress:
        raise InvalidOperationError("UnitOfWork is not in progress")

    # Record UoW outcome for the access log wide event
    try:
        from protean.utils.globals import g

        g._access_log_uow_outcome = "rolled_back"
    except Exception:
        pass

    # Exit from Unit of Work
    _uow_context_stack.pop()

    try:
        for session in self._sessions.values():
            session.rollback()

        logger.debug("uow.rollback_successful")
    except Exception as exc:
        logger.exception("uow.rollback_failed")

    self._reset()

get_session

get_session(provider_name)

Get session for provider, initializing one if it doesn't exist

Source code in src/protean/core/unit_of_work.py
411
412
413
414
415
416
def get_session(self, provider_name):
    """Get session for provider, initializing one if it doesn't exist"""
    if provider_name in self._sessions:
        return self._sessions[provider_name]
    else:
        return self._initialize_session(provider_name)