ADR-0013: Atomic Find-and-Claim Contract for Concurrent Consumers
Status: Accepted
Date: June 2026
Context
The outbox is the framework default for reliable message delivery, so it sits on
the hot path of every Protean deployment that publishes events. The
OutboxProcessor historically pulled work in two steps per poll:
find_unprocessed(limit=N)— aSELECTreturning up toNeligible rows.- For each row,
claim_for_processing(row, worker_id)— anUPDATE … WHERE id = :id AND status IN (pending, failed)that marks the rowprocessingand locks it.
This has two problems on a contended table:
- A TOCTOU window. Between the
SELECTand the per-rowUPDATE, another worker can claim the same row. The guardedUPDATEdetects the loss (theWHEREno longer matches, rowcount is0) but does not prevent it — the loser has already paid for a wasted round trip and produces "already claimed" log noise. 1 + Nround trips per batch. OneSELECTplusNUPDATEs. At high throughput the per-row latency dominates the poll cost.
The same shape recurs for any high-throughput consumer that pulls a bounded batch of rows and marks them in-flight (process-manager correlation lookups, projection rebuild cursors, retention sweeps). The need is broader than the outbox, so the contract belongs at the DAO layer, not buried in the outbox repository.
Decision
We introduce a DAO-level primitive, BaseDAO._claim(criteria,
claim_fields, limit, order_by=None), that selects up to limit rows matching
criteria, applies claim_fields as an update, and returns the claimed rows in
post-claim state. Implementations must guarantee that no two callers observe the
same row as claimed (no double-claim) and that the returned rows reflect the
applied claim_fields. Non-blocking is not part of the contract: only the
FOR UPDATE SKIP LOCKED fast path steps over locked rows without waiting; the
portable default may briefly block on a contended row before its guard rejects
the claim.
Portable default (BaseDAO). Reads candidates via the entity query API,
then issues a guarded UPDATE … WHERE id = :id AND <criteria> per row. The
re-asserted criteria make a row another worker already claimed fail to match.
This is 1 + N round trips — identical in cost to the old flow — and is the
correctness floor, not the performance target. It prevents double-claim on every
backend, but the failure mode of a lost race differs: relational backends
(PostgreSQL, MySQL, SQL Server) re-evaluate the guard under the row lock and
return zero rows (graceful skip); SQLite serializes writers (a contended write
may raise SQLITE_BUSY); Elasticsearch relies on document versioning, so the
second writer hits a version conflict and the call raises rather than skipping.
Elasticsearch is therefore not recommended as a concurrently-consumed claim
store.
SQLAlchemy fast path (PostgreSQL only). A single statement:
UPDATE <table> SET <claim_fields>
WHERE id IN (
SELECT id FROM <table> WHERE <criteria>
ORDER BY <order> LIMIT <n> FOR UPDATE SKIP LOCKED
)
RETURNING *
The inner FOR UPDATE SKIP LOCKED locks eligible rows and steps over rows other
workers hold; the enclosing UPDATE claims exactly those rows; RETURNING
hands them back — one round trip, no TOCTOU window. RETURNING does not
preserve the inner ORDER BY, so the returned batch is re-sorted in Python to
honour order_by.
Memory adapter. Holds the provider's threading.Lock across the whole
read-and-claim section and delegates to the portable default. The lock
serializes concurrent claimers in-process, which is the strongest guarantee the
single-process store can offer.
MySQL/MariaDB, SQL Server, SQLite, Elasticsearch, and other dialects use the
portable default. MySQL and MariaDB have SKIP LOCKED but no
UPDATE … RETURNING, so the single-statement form does not compile there; SQL
Server uses OUTPUT with different table-hint semantics; SQLite serializes
writers; Elasticsearch is non-relational. PostgreSQL is the only dialect that
offers FOR UPDATE SKIP LOCKED and UPDATE … RETURNING together, so it is the
sole fast-path dialect. A dedicated MySQL or MSSQL fast path can be added later
(two-statement lock-then-update, or OUTPUT) if profiling justifies it.
Transaction boundary. _claim commits the claim through the
DAO's standalone-commit path, so the lock and state change are durable the
moment it returns. It must therefore be called outside an active Unit of
Work — inside a UoW the write would not commit until the UoW does, so other
workers would not see the claim and the lock-then-return guarantee would not
hold. OutboxProcessor.get_next_batch_of_messages calls it with no surrounding
UoW, which is correct.
OutboxRepository.claim_batch(worker_id, limit, target_broker=None) is the new
public entry point, built on _claim, and OutboxProcessor uses it.
The claim now happens once when the batch is fetched, not per message during
processing. OutboxRepository is framework-internal plumbing (not part of the
public protean.* surface user code builds on), so no deprecation cycle
applies: claim_for_processing — the racy per-message claim that
claim_batch supersedes — is removed outright, while find_unprocessed is
kept as a read-only inspection query alongside its siblings (find_failed,
find_abandoned, find_published, find_processing).
This complements, rather than replaces, the optimistic-concurrency work from
epic 5.1 (BaseDAO._validate_and_update_version): version checking guards
aggregate updates against lost writes, while _claim guards
queue-style claims against double processing.
Consequences
- The outbox poll path drops from
1 + Nround trips to one (fast path) or stays at1 + N(portable default), with the TOCTOU window closed in both. - "Already claimed" contention log noise disappears on the fast path — workers never select rows they cannot claim.
- Because the claim now happens once at batch fetch (not per message), a worker
that dies after claiming but before publishing leaves the whole claimed
batch in
processinguntillocked_untilexpires. The eligibility criteria includePROCESSINGrows whose lock has expired, so those rows are then reclaimed automatically (an actively-lockedPROCESSINGrow is still excluded, so in-flight messages are never stolen). This is a wider stall window than the previous per-message claim (which stranded only the in-flight message), but it is still at-least-once and self-healing; sizelocked_untilagainstmessages_per_tickand expected processing time. - New adapters get correct behaviour for free via the portable default; implementing the fast path is an opt-in optimization, not a requirement.
- The fast path depends on the SQL backend supporting
SKIP LOCKEDandRETURNING. The dialect allow-list (_SKIP_LOCKED_DIALECTS) keeps the decision explicit; an unlisted dialect silently and correctly uses the portable default.
Alternatives Considered
- Ship only the portable default. Rejected — it is the same
1 + Ncost as today; the round-trip win is the whole point. SELECT … FOR UPDATE SKIP LOCKEDfollowed by a separateUPDATE. Rejected — observed double-claims in testing because the row locks taken by theSELECTwere not reliably held across the Python round trip to the separateUPDATE, and it is still two statements. The singleUPDATE … RETURNINGwith the locking sub-select is atomic and avoids the window entirely.- Hand-rolled raw SQL per dialect (CTEs,
OUTPUTfor MSSQL, two-statement for MySQL). Rejected for now — PostgreSQL renders the whole claim from one SQLAlchemy Core construct (UPDATE … FOR UPDATE SKIP LOCKED … RETURNING), which is far less to maintain and get wrong, and it is the dialect most likely to run a high-throughput outbox. MySQL/MariaDB (noUPDATE … RETURNING) and MSSQL (OUTPUT+ different table hints) would each need bespoke SQL, so they use the portable default for now; dedicated fast paths can be added later if profiling justifies them. - Put dialect dispatch in
OutboxRepository. Rejected — the contract is broader than the outbox and belongs at the DAO layer where other consumers can reuse it.