OpenTelemetry integration
Protean ships with native OpenTelemetry (OTel) support for distributed tracing and metrics. When enabled, command processing, event handling, repository operations, and server message dispatch automatically emit OTel spans and metrics -- plugging into any APM backend (Datadog, Jaeger, Grafana Tempo, etc.) with zero user code changes.
Installation
Install the telemetry extras alongside Protean:
pip install "protean[telemetry]"
This pulls in:
opentelemetry-apiandopentelemetry-sdkopentelemetry-exporter-otlp-proto-grpc(OTLP exporter)opentelemetry-instrumentation-fastapi(HTTP span auto-instrumentation)opentelemetry-exporter-prometheus(/metricsconvergence)
Configuration
Enable telemetry in your domain.toml:
[telemetry]
enabled = true
service_name = "my-service" # Defaults to the domain's normalized name
exporter = "otlp" # "otlp" (default) or "console"
endpoint = "http://localhost:4317" # OTLP collector endpoint (optional)
Configuration keys
| Key | Type | Default | Description |
|---|---|---|---|
enabled |
bool | false |
Master switch for all OTel instrumentation |
service_name |
string | domain name | Populates the service.name resource attribute |
exporter |
string | "otlp" |
Span and metric exporter: "otlp" or "console" |
endpoint |
string | SDK default | OTLP collector endpoint (gRPC) |
resource_attributes |
table | {} |
Additional OTel resource attributes merged into the Resource |
Resource attributes
Add arbitrary resource attributes for your APM:
[telemetry]
enabled = true
[telemetry.resource_attributes]
"deployment.environment" = "production"
"service.version" = "1.2.0"
"team.name" = "platform"
These attributes appear on every span and metric exported by the domain.
Zero-overhead guarantee
When telemetry is disabled (the default) or the opentelemetry packages are
not installed, all instrumentation is a no-op:
- No imports at module level -- the
opentelemetrypackage is imported lazily insidesrc/protean/utils/telemetry.pyand only when enabled. - No-op fallbacks --
domain.traceranddomain.meterreturn lightweight no-op objects (_NoOpTracer,_NoOpMeter) whose methods do nothing. - Context managers still work -- every
with tracer.start_as_current_span()call works identically whether real or no-op, so instrumented code never needs conditional guards.
The rest of the codebase never imports opentelemetry directly -- all OTel
interaction flows through protean.utils.telemetry.
Automatic tracing
Protean instruments key operations across every layer of the stack. Each instrumentation point creates an OTel span with relevant attributes.
Span catalog
Command processing
| Span name | Emitted by | Description |
|---|---|---|
protean.command.enrich |
CommandProcessor.enrich() |
Command enrichment (identity, metadata, TraceParent injection) |
protean.command.process |
CommandProcessor.process() |
Full command processing lifecycle |
protean.command.enrich attributes:
| Attribute | Type | Description |
|---|---|---|
protean.command.type |
string | Command class type identifier |
protean.command.process attributes:
| Attribute | Type | Description |
|---|---|---|
protean.command.type |
string | Command class type identifier |
protean.command.id |
string | Command message ID (set after enrichment) |
protean.stream |
string | Target stream name (set after enrichment) |
protean.correlation_id |
string | Correlation ID (when present) |
Handler execution
| Span name | Emitted by | Description |
|---|---|---|
protean.handler.execute |
HandlerMixin._handle() |
Individual handler method execution |
Attributes:
| Attribute | Type | Description |
|---|---|---|
protean.handler.name |
string | Handler class name |
protean.handler.type |
string | Element type (command_handler, event_handler, projector, etc.) |
Query dispatch
| Span name | Emitted by | Description |
|---|---|---|
protean.query.dispatch |
QueryProcessor.dispatch() |
Query handler dispatch |
Attributes:
| Attribute | Type | Description |
|---|---|---|
protean.query.type |
string | Query class type identifier |
protean.handler.name |
string | Query handler class name |
Data layer
| Span name | Emitted by | Description |
|---|---|---|
protean.repository.add |
Repository / EventSourcedRepository | Persist an aggregate |
protean.repository.get |
Repository / EventSourcedRepository | Load an aggregate by identity |
protean.event_store.append |
EventStore port | Append events/commands to the event store |
protean.uow.commit |
UnitOfWork | Commit a Unit of Work transaction |
Repository attributes:
| Attribute | Type | Description |
|---|---|---|
protean.aggregate.type |
string | Aggregate class name |
protean.provider |
string | Database provider name (CRUD and ES repositories) |
protean.repository.kind |
string | "event_sourced" (ES repositories only) |
Event store attributes:
| Attribute | Type | Description |
|---|---|---|
protean.event_store.stream |
string | Stream name |
protean.event_store.message_type |
string | Event/command type |
protean.event_store.position |
int | Resulting stream position |
UoW attributes:
| Attribute | Type | Description |
|---|---|---|
protean.uow.event_count |
int | Total events gathered |
protean.uow.session_count |
int | Number of database sessions committed |
Server and outbox
| Span name | Emitted by | Description |
|---|---|---|
protean.engine.handle_message |
Engine | Top-level message processing in the server |
protean.outbox.process |
OutboxProcessor | Batch outbox processing tick |
protean.outbox.publish |
OutboxProcessor | Individual outbox message publish |
Engine attributes:
| Attribute | Type | Description |
|---|---|---|
protean.handler.name |
string | Handler class name |
protean.message.type |
string | Message class name |
protean.message.id |
string | Message UUID |
protean.stream_category |
string | Stream category |
protean.worker_id |
string | Worker process ID (multi-worker mode) |
protean.subscription_type |
string | command_dispatcher, event_handler, command_handler, or process_manager |
Outbox attributes:
| Attribute | Type | Description |
|---|---|---|
protean.outbox.batch_size |
int | Number of messages in the batch |
protean.outbox.processor_id |
string | Subscription identifier |
protean.outbox.is_external |
bool | Whether this is an external broker dispatch |
protean.outbox.message_id |
string | Individual message ID |
protean.outbox.stream_category |
string | Target stream |
protean.outbox.message_type |
string | Message class name |
protean.outbox.successful_count |
int | Successfully published messages in batch |
protean.outbox.skipped |
bool | Message skipped (already claimed by another worker) |
Span hierarchy
When the server processes a message, spans form a clean parent-child tree:
protean.engine.handle_message (Engine)
└── protean.handler.execute (HandlerMixin)
├── protean.repository.get (Repository)
├── protean.repository.add (Repository)
└── protean.uow.commit (UnitOfWork)
└── protean.event_store.append (EventStore)
For command dispatch:
protean.command.process (CommandProcessor)
└── protean.handler.execute (HandlerMixin)
├── protean.repository.add (Repository)
└── protean.uow.commit (UnitOfWork)
Most spans (command process, handler execute, UoW commit, repository, event
store, engine, outbox) are created with record_exception=False and
set_status_on_exception=False so that Protean can record errors with precise
context using set_span_error(). Lightweight spans like command.enrich and
query.dispatch use OTel defaults for exception handling.
Metrics
When telemetry is enabled, Protean records domain-operation metrics through
OTel counters and histograms. These are created lazily per domain via the
DomainMetrics class.
Metrics catalog
Counters
| Metric name | Unit | Description |
|---|---|---|
protean.command.processed |
{command} |
Commands processed |
protean.handler.invocations |
{invocation} |
Handler invocations |
protean.uow.commits |
{commit} |
UoW commits |
protean.outbox.published |
{message} |
Outbox messages published |
protean.outbox.failed |
{message} |
Outbox publish failures |
Histograms
| Metric name | Unit | Description |
|---|---|---|
protean.command.duration |
s |
Command processing latency |
protean.handler.duration |
s |
Handler execution latency |
protean.uow.events_per_commit |
{event} |
Events gathered per UoW commit |
protean.outbox.latency |
s |
Time from outbox write to publish |
Metric labels
Different metrics carry different label sets:
| Metric | Labels |
|---|---|
protean.command.processed |
command_type, status (ok, error, enqueued) |
protean.command.duration |
command_type, status |
protean.handler.invocations |
handler_name, handler_type, status (ok, error) |
protean.handler.duration |
handler_name, handler_type, status |
protean.uow.commits |
(none) |
protean.uow.events_per_commit |
(none) |
protean.outbox.published |
(none) |
protean.outbox.failed |
(none) |
protean.outbox.latency |
(none) |
TraceParent propagation
Protean propagates distributed trace context across message boundaries using
the W3C traceparent header format.
How it works
-
Injection -- When a command is enriched (
CommandProcessor.enrich()), the current OTel span context is serialized into aTraceParentvalue object and stored inmessage.metadata.headers.traceparent. -
Storage -- The
TraceParentheader travels with the message through the event store or broker, surviving serialization/deserialization. -
Extraction -- When the Engine processes a message (
handle_message()), it extracts theTraceParentfrom message headers and passes it as the parent OTel context totracer.start_as_current_span().
This means a single distributed trace can span:
HTTP request (FastAPI)
└── protean.command.process (API server)
└── protean.handler.execute (API server, sync)
└── protean.uow.commit (API server)
... message persisted, picked up by server ...
protean.engine.handle_message (Server, links to same trace)
└── protean.handler.execute (Server)
└── protean.uow.commit (Server)
The key functions are in protean.utils.telemetry:
inject_traceparent_from_context()-- captures the current span as aTraceParentvalue objectextract_context_from_traceparent(traceparent)-- converts aTraceParentback to an OTelContext
FastAPI auto-instrumentation
Protean provides a one-line integration to instrument your FastAPI application with OpenTelemetry:
from protean.integrations.fastapi import instrument_app
instrument_app(app, domain)
This wraps opentelemetry-instrumentation-fastapi and uses the domain's
tracer and meter providers, so HTTP request spans automatically become parents
of any command/event processing spans created during the request.
What gets instrumented
- Every HTTP request creates a span with standard semantic conventions
(
http.method,http.route,http.status_code) - These HTTP spans parent
protean.command.processandprotean.handler.executespans via OTel context propagation - Incoming
traceparentheaders from upstream services are respected
Options
instrument_app(
app,
domain,
excluded_urls="health,ready", # Skip health check endpoints
)
The call is safe even when opentelemetry is not installed -- it returns
False and logs a warning.
Excluding Observatory endpoints
When the Observatory runs alongside your application, you may want to exclude its endpoints from tracing to avoid noise:
instrument_app(app, domain, excluded_urls="metrics,stream,api/health")
/metrics endpoint convergence
The Observatory's /metrics endpoint (Prometheus text exposition format) is
aware of OTel. When telemetry is enabled and opentelemetry-exporter-prometheus
is installed:
- A
PrometheusMetricReaderis attached to the domain'sMeterProvider - All OTel counters, histograms, and infrastructure gauges are served via
prometheus_client.generate_latest() - Infrastructure metrics (outbox, broker health, subscription lag) are
registered as
ObservableGaugecallbacks on the OTel meter
When OTel is not enabled, the endpoint falls back to the original hand-rolled Prometheus implementation with identical behavior. No configuration change is needed -- the convergence is automatic.
Observatory vs OpenTelemetry
Protean offers two complementary observability paths:
| Observatory | OpenTelemetry | |
|---|---|---|
| Audience | Developer running protean server locally |
Ops team shipping telemetry to Grafana/Datadog |
| Infrastructure | Zero-config (Redis already present) | Requires collector + backend + visualization |
| Data model | Flat MessageTrace events in time-ordered stream |
Parent-child span trees with context propagation |
| Query patterns | XRANGE by handler/stream/time window | TraceQL/PromQL in external tool |
| Real-time | SSE via Redis Pub/Sub | Not applicable (batch export) |
| Install | Built-in (needs Redis broker) | pip install "protean[telemetry]" |
When to use which
- Observatory -- Local development and debugging. Real-time SSE dashboard, REST API for trace history, zero configuration beyond having Redis.
- OpenTelemetry -- Production monitoring. Vendor-agnostic spans and metrics exported to your APM platform. Distributed tracing across service boundaries.
They share instrumentation callsites but have independent emission paths. Both can run simultaneously -- the Observatory's trace emitter and OTel spans are emitted from the same code points in the Engine and handlers without interference.
APM setup guides
Jaeger
Run Jaeger all-in-one with OTLP support:
docker run -d --name jaeger \
-p 16686:16686 \
-p 4317:4317 \
jaegertracing/all-in-one:latest
Configure your domain:
[telemetry]
enabled = true
exporter = "otlp"
endpoint = "http://localhost:4317"
Open http://localhost:16686 to view traces.
Grafana Tempo + Grafana
# docker-compose.yml (excerpt)
services:
tempo:
image: grafana/tempo:latest
ports:
- "4317:4317" # OTLP gRPC
- "3200:3200" # Tempo API
command: ["-config.file=/etc/tempo.yml"]
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
Point Protean at Tempo's OTLP endpoint:
[telemetry]
enabled = true
endpoint = "http://localhost:4317"
Datadog
Use the Datadog Agent's OTLP ingest:
[telemetry]
enabled = true
exporter = "otlp"
endpoint = "http://localhost:4317"
[telemetry.resource_attributes]
"deployment.environment" = "production"
Configure the Datadog Agent to accept OTLP gRPC on port 4317.
Console exporter (debugging)
For quick local debugging without an APM backend:
[telemetry]
enabled = true
exporter = "console"
Spans and metrics are printed to stdout.
Contributor conventions
When adding new instrumentation to Protean, follow these established patterns to keep the telemetry codebase consistent.
Import rule
Never import opentelemetry outside of src/protean/utils/telemetry.py.
All OTEL interaction must go through the gateway module's public API.
If you need a new OTEL capability (e.g., span links, baggage), add a
helper to telemetry.py and call that helper from your instrumented code.
See ADR-0008 for the rationale.
Adding a new span
Use this pattern at every instrumented callsite:
from protean.utils.telemetry import get_tracer, set_span_error
tracer = get_tracer(domain) # or self._domain.tracer, current_domain.tracer
with tracer.start_as_current_span(
"protean.<subsystem>.<operation>",
record_exception=False,
set_status_on_exception=False,
) as span:
span.set_attribute("protean.<subsystem>.<attribute>", value)
try:
# ... actual work ...
except Exception as exc:
set_span_error(span, exc)
raise
Key rules:
- Span name:
protean.<subsystem>.<operation>(e.g.,protean.cache.get,protean.broker.publish). - Attribute prefix:
protean.<subsystem>.<attribute>(e.g.,protean.cache.key,protean.broker.stream). - Always disable automatic exception recording with
record_exception=False, set_status_on_exception=False. Then callset_span_error(span, exc)explicitly in your exception handler. This gives precise control over what gets recorded and keeps theStatusCodeimport centralized in the gateway module. - Never import
StatusCode,Status, or anyopentelemetrysymbol in the instrumented file. Useset_span_error()instead.
Why explicit error recording?
OTEL's default behavior records every exception that escapes the span's
context manager, including exceptions that are expected and handled
upstream (e.g., ObjectNotFoundError in a repository get that returns
None). By disabling automatic recording and calling set_span_error()
only in the appropriate except block, Protean ensures:
- Only genuine failures are marked as errors in traces.
- The
StatusCode.ERRORimport stays intelemetry.py, preserving the import boundary. - Exception messages and stack traces are attached at the point where the error is best understood, not where it happens to propagate.
Adding a new metric
Metrics are defined as a fixed set of instruments in the DomainMetrics
class (src/protean/utils/telemetry.py). To add a new metric:
-
Add the instrument to
DomainMetrics.__init__():self.my_new_counter = meter.create_counter( name="protean.subsystem.metric_name", unit="{item}", # OTEL unit string description="What this measures", ) -
Record at the callsite using
get_domain_metrics():from protean.utils.telemetry import get_domain_metrics metrics = get_domain_metrics(domain) metrics.my_new_counter.add(1, {"label_name": label_value})
Key rules:
- Metric name:
protean.<subsystem>.<metric>(e.g.,protean.cache.hits). - Units: Use OTEL conventions --
"s"for seconds,"{item}"for counts,"By"for bytes. - Labels at recording time, not at creation time. The instrument is
created once; labels are passed with each
.add()or.record()call. - Counters for events that happened (commands processed, messages published). Histograms for distributions (latency, batch sizes).
- The
DomainMetricsinstance is cached per domain -- created on first access viaget_domain_metrics(domain)and cleared onshutdown_telemetry(). Never create instruments outside this class.
Adding TraceParent propagation to a new message pathway
If you add a new message pathway (e.g., a new subscription type or a new way commands enter the system), ensure trace continuity:
- On message creation: call
inject_traceparent_from_context()and store the result in the message'sMessageHeaders.traceparent. - On message arrival: call
extract_context_from_traceparent(msg.metadata.headers.traceparent)and pass the result ascontext=tostart_as_current_span().
If either side is missed, the distributed trace will break at that boundary. See the three existing injection/extraction points in ADR-0008 for reference.
Next steps
- Observability reference -- Full Observatory API, trace events, and Prometheus metric reference
- Monitoring -- Observatory setup, key metrics, alerting
- FastAPI Integration -- Domain context middleware and exception handlers
- Engine Architecture -- How the engine manages subscriptions and lifecycle