Skip to content

Engine Architecture

The Engine class is the core of Protean's async message processing system. It manages subscriptions, coordinates message handling, and provides graceful lifecycle management.

Responsibilities

The Engine is responsible for:

  1. Registering handler subscriptions - Creating subscriptions for event handlers, command handlers, and projectors
  2. Managing broker subscriptions - Handling external message consumers
  3. Running outbox processors - Publishing messages from the transactional outbox
  4. Coordinating lifecycle - Starting, running, and gracefully shutting down all components
  5. Emitting trace events - Publishing structured MessageTrace events at each stage of message processing for real-time observability

Engine Initialization

When you create an Engine instance, it automatically discovers and registers all handlers from your domain:

from protean.server import Engine

# Create an engine for your domain
engine = Engine(domain)

# Or with options
engine = Engine(
    domain,
    test_mode=False,  # Set True for deterministic testing
    debug=False,      # Set True for verbose logging
)

Subscription Registration

The Engine associates subscriptions with each handler during initialization. The factory determines the subscription type based on the configuration hierarchy.

Handler Subscriptions

For each event handler and projector, the Engine creates one subscription per handler, inferring the stream category and resolving configuration:

  1. Infers the stream category from handler metadata or associated aggregate
  2. Resolves subscription configuration using the priority hierarchy
  3. Initializes the right Subscription Type, either a StreamSubscription or an EventStoreSubscription, for the handler.
# Example: How the engine registers an event handler
handler_cls = OrderEventHandler
stream_category = engine._infer_stream_category(handler_cls)  # e.g., "orders"

subscription = engine.subscription_factory.create_subscription(
    handler=handler_cls,
    stream_category=stream_category,
)

Command dispatch

When multiple command handlers belong to the same aggregate (and therefore the same stream category), the Engine consolidates them into a single subscription using a CommandDispatcher. Instead of creating N separate subscriptions that would compete for the same messages, the dispatcher reads each command once and routes it to the correct handler based on the command type:

# Two command handlers for the same aggregate
@domain.command_handler(part_of=User)
class RegisterUserHandler:
    @handle(RegisterUser)
    def register(self, command): ...

@domain.command_handler(part_of=User)
class DeactivateUserHandler:
    @handle(DeactivateUser)
    def deactivate(self, command): ...

# The engine creates ONE subscription keyed as "commands:{stream_category}"
# that routes RegisterUser → RegisterUserHandler
# and DeactivateUser → DeactivateUserHandler

The CommandDispatcher caches the deserialized domain object between handler resolution and execution to avoid double deserialization.

Broker Subscriptions

For external message subscribers, the Engine creates BrokerSubscription instances that connect to the configured message broker:

@domain.subscriber(broker="default", stream="external_events")
class ExternalEventSubscriber:
    @handle("OrderCreated")
    def handle_order_created(self, event):
        ...

Outbox Processors

When the outbox is enabled (via default_subscription_type = "stream"), the Engine creates an OutboxProcessor for each database provider to publish messages to the configured broker:

# Configuration in domain.toml
[server]
default_subscription_type = "stream"   # Enables outbox

[outbox]
broker = "default"
messages_per_tick = 10
tick_interval = 1

External Outbox Processors

When external_brokers is configured, the Engine creates additional OutboxProcessor instances — one per external broker per database provider. These processors only pick up outbox rows tagged with their target broker and publish using a stripped metadata envelope suitable for external consumers:

[outbox]
broker = "default"
external_brokers = ["partner_events"]   # Adds external processors

With one database provider, this creates two processors:

  • outbox-processor-default-to-default — internal events
  • outbox-processor-default-to-partner_events-external — published events to external broker

External processors emit outbox.external_published and outbox.external_failed trace events (distinct from internal outbox.published / outbox.failed), and skip priority lane routing since lanes are an internal concern.

See Dispatching Published Events to External Brokers for the full setup guide.

Configuration Hierarchy

When registering subscriptions, the Engine consults a hierarchy of configuration sources to determine the appropriate subscription type and options for each handler. The resolution process typically follows this order of precedence:

  1. Handler-level configuration: Explicit parameters set on the handler, such as stream name, broker, or processing options.
  2. Domain element metadata: Values inferred from associated aggregate, event, or command definitions.
  3. Profile or role defaults: Settings derived from the active configuration profile (e.g., "production", "projection") or handler role (such as projector vs. handler).
  4. Domain/global configuration: Defaults specified in the domain configuration files (domain.toml or equivalent).

This means that explicit intent at the handler level takes priority, but system-wide defaults provide sensible behavior when specifics are not set.

For example, if a handler specifies a custom stream name, it will be used; otherwise, the engine will infer the relevant category from the handler’s associated aggregate or fall back to profile/domain defaults.

Tracing

The Engine initializes a TraceEmitter at startup that publishes structured MessageTrace events as messages flow through the pipeline. Trace events are emitted at three points during handle_message:

  • handler.started -- Before the handler processes the message
  • handler.completed -- After successful processing (includes duration_ms)
  • handler.failed -- When the handler raises an exception (includes error)

Additional trace events are emitted by StreamSubscription (message.acked, message.nacked, message.dlq) and OutboxProcessor (outbox.published, outbox.failed).

Dual-channel output

The TraceEmitter writes to two Redis channels:

  • Pub/Sub (protean:trace) -- Real-time fan-out for SSE clients. The emitter checks subscriber count and skips when nobody is listening.
  • Stream (protean:traces) -- Time-bounded history for the Observatory dashboard and REST API. Old entries are automatically trimmed based on the configured retention period.

Trace retention

The Engine reads trace_retention_days from the domain's [observatory] configuration (default: 7 days). When set to 0, Stream persistence is disabled and only Pub/Sub broadcasting is available. If the configuration value is missing or invalid, the Engine falls back to the 7-day default.

The emitter adds zero overhead when no monitoring tools are subscribed and persistence is disabled -- see Observability for the full design, the Observatory monitoring server, and trace API endpoints.

Context Propagation

When the Engine processes a message, it restores the cross-cutting context that was present when the message was originally created. This is critical for multi-tenant applications, user-scoped auditing, and any scenario where async handlers need ambient context from the original request.

How it works

  1. During the original request, enrichers inject values from g (e.g., g.tenant_id, g.user_id) into the message's metadata.extensions.
  2. The message is persisted to the event store with its extensions intact.
  3. When the Engine picks up the message for async processing, it reads metadata.extensions and passes them as keyword arguments to domain_context():
# Simplified from engine.py — what the Engine does for each message
extensions = message.metadata.extensions or {}

with self.domain.domain_context(**extensions):
    g.message_in_context = message
    # Handler runs here — g.tenant_id, g.user_id, etc. are available

This means any key injected by an enricher is automatically available as an attribute on g inside the handler. No manual extraction is needed.

g.message_in_context

In addition to restoring enricher-injected values, the Engine sets g.message_in_context to the current message being processed. Handlers can use this to access the full message metadata:

# Access the full extensions dict
tenant_id = g.message_in_context.metadata.extensions.get("tenant_id")

# Or use the restored g attribute directly (simpler)
tenant_id = g.tenant_id

After the handler completes (or fails), the domain context is torn down and g is cleaned up — extensions do not leak between messages.

For a complete worked example of context propagation in a multi-tenant application, see Multi-Tenancy in Event-Driven Systems.


Running the Engine

For comprehensive information on how to start, configure, and operate the engine, including:

  • CLI commands and options
  • Test mode for deterministic testing
  • Debug mode for troubleshooting
  • Programmatic usage
  • Production deployment strategies
  • Signal handling and graceful shutdown
  • Monitoring, health checks, and observability

See the Running the Server guide.

Next Steps

  • Subscriptions: Learn how handlers connect to message sources and react to events.
  • Subscription Types: Compare StreamSubscription and EventStoreSubscription, and choose the right one for your workload.
  • Configuration: Dive deeper into configuring engine profiles, subscriptions, and runtime options.
  • Outbox Pattern: Understand reliable message publishing and transactional outbox processing.
  • Observability: Real-time tracing, the Observatory server, SSE streaming, and Prometheus metrics.
  • Running the Server: Explore how to deploy, operate, and monitor the server in different environments.