Running the Server
This guide covers how to start, configure, and operate the Protean server for processing async messages in your domain.
CLI Command
Start the server using the protean server command:
protean server [OPTIONS]
Options
| Option | Description | Default |
|---|---|---|
--domain |
Path to domain module | . (current directory) |
--test-mode |
Run in test mode | False |
--debug |
Enable debug logging | False |
--help |
Show help message |
Basic Usage
Starting the Server
# Start with domain in current directory
protean server
# Start with specific domain path
protean server --domain=src/my_domain
# Start with module path
protean server --domain=my_package.my_domain
# Start with specific instance
protean server --domain=my_domain:custom_domain
Domain Discovery
The server discovers your domain in this order:
- Environment variable:
PROTEAN_DOMAINif set - --domain parameter: Path or module specified
- Current directory: Looks for
domain.pyorsubdomain.py
Within a module, it looks for:
- Variable named
domainorsubdomain - Any variable that is a
Domaininstance - Raises error if multiple instances found
Debug Mode
Enable verbose logging for troubleshooting:
protean server --domain=my_domain --debug
Debug mode logs:
- Subscription registration details
- Message processing events
- Position updates
- Configuration resolution
Test Mode
Test mode processes available messages and exits, useful for integration tests:
protean server --domain=my_domain --test-mode
In test mode, the server:
- Starts all subscriptions and processors
- Runs multiple processing cycles
- Allows message chain propagation
- Shuts down after processing completes
Using Test Mode in Tests
Test mode enables deterministic testing of async message flows:
import pytest
from protean.server import Engine
def test_order_creates_inventory_reservation():
"""Test that creating an order reserves inventory."""
# Arrange: Create order (raises events)
with domain.domain_context():
order = Order.create(
customer_id="123",
items=[OrderItem(product_id="ABC", quantity=5)]
)
domain.repository_for(Order).add(order)
# Act: Process events in test mode
engine = Engine(domain, test_mode=True)
engine.run()
# Assert: Verify inventory was reserved
with domain.domain_context():
reservation = domain.repository_for(Reservation).get_by_order(order.id)
assert reservation is not None
assert reservation.quantity == 5
Testing Multi-Step Flows
Test mode handles cascading events automatically:
def test_order_fulfillment_flow():
"""Test complete order fulfillment flow."""
# Order created -> Inventory reserved -> Payment processed -> Order shipped
with domain.domain_context():
order = Order.create(...)
domain.repository_for(Order).add(order)
# Process all cascading events
engine = Engine(domain, test_mode=True)
engine.run()
with domain.domain_context():
order = domain.repository_for(Order).get(order.id)
assert order.status == "shipped"
Programmatic Usage
You can also start the engine programmatically:
from protean.server import Engine
from my_domain import domain
# Create and run the engine
engine = Engine(domain)
engine.run() # Blocking call
With Custom Options
engine = Engine(
domain,
test_mode=False,
debug=True,
)
engine.run()
Accessing Engine State
engine = Engine(domain)
# Check subscriptions
print(f"Handler subscriptions: {len(engine._subscriptions)}")
print(f"Broker subscriptions: {len(engine._broker_subscriptions)}")
print(f"Outbox processors: {len(engine._outbox_processors)}")
# Access subscription factory
factory = engine.subscription_factory
Signal Handling
The server handles shutdown signals gracefully:
| Signal | Behavior |
|---|---|
SIGINT (Ctrl+C) |
Graceful shutdown |
SIGTERM |
Graceful shutdown |
SIGHUP |
Graceful shutdown |
During graceful shutdown:
- Stop accepting new messages
- Complete processing of current batch
- Persist subscription positions
- Clean up resources
- Exit with appropriate code
Exit Codes
| Code | Meaning |
|---|---|
| 0 | Normal shutdown (signal or test mode completion) |
| 1 | Error during processing |
Production Deployment
Process Management
Use a process manager like systemd, supervisord, or Docker:
# /etc/systemd/system/protean-server.service
[Unit]
Description=Protean Message Server
After=network.target
[Service]
Type=simple
User=app
WorkingDirectory=/app
Environment=PROTEAN_ENV=production
ExecStart=/app/.venv/bin/protean server --domain=my_domain
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
Docker
FROM python:3.11-slim
WORKDIR /app
COPY . .
RUN pip install poetry && poetry install
ENV PROTEAN_ENV=production
CMD ["poetry", "run", "protean", "server", "--domain=my_domain"]
Kubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
name: protean-server
spec:
replicas: 3 # Multiple workers for scaling
selector:
matchLabels:
app: protean-server
template:
metadata:
labels:
app: protean-server
spec:
containers:
- name: server
image: my-app:latest
command: ["protean", "server", "--domain=my_domain"]
env:
- name: PROTEAN_ENV
value: "production"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
Scaling Considerations
StreamSubscription supports horizontal scaling:
- Multiple server instances can run concurrently
- Messages are distributed across consumers via Redis consumer groups
- Each message is processed by exactly one consumer
EventStoreSubscription has limited scaling:
- Multiple instances will process the same messages
- Use for projections where idempotency is guaranteed
- Consider using StreamSubscription for scalable workloads
Health Checks
Add health checks for production deployments:
# health_check.py
import sys
from my_domain import domain
def check_health():
try:
# Verify domain can activate
with domain.domain_context():
# Check broker connectivity
broker = domain.brokers.get("default")
if broker:
broker.ping() # If supported
# Check event store connectivity
if domain.event_store:
domain.event_store.store.ping() # If supported
return 0
except Exception as e:
print(f"Health check failed: {e}")
return 1
if __name__ == "__main__":
sys.exit(check_health())
Logging
Log Levels
| Level | What's Logged |
|---|---|
| ERROR | Exceptions, failed processing |
| WARNING | Retries, DLQ moves, deprecation warnings |
| INFO | Startup, shutdown, batch summaries |
| DEBUG | Message details, position updates, config resolution |
Configuring Logging
import logging
# Set log level for Protean server
logging.getLogger("protean.server").setLevel(logging.DEBUG)
# Or configure via logging config
LOGGING_CONFIG = {
"version": 1,
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "detailed",
}
},
"formatters": {
"detailed": {
"format": "%(asctime)s %(levelname)s %(name)s: %(message)s"
}
},
"loggers": {
"protean.server": {
"level": "DEBUG",
"handlers": ["console"],
}
}
}
Structured Logging
For production, consider structured logging:
import structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)
Monitoring
Key Metrics
Monitor these metrics in production:
| Metric | Description |
|---|---|
| Messages processed/sec | Throughput |
| Message processing latency | P50, P95, P99 |
| Error rate | Failed messages / total |
| Queue depth | Pending messages |
| Consumer lag | Position behind head |
Example Prometheus Metrics
from prometheus_client import Counter, Histogram, Gauge
messages_processed = Counter(
"protean_messages_processed_total",
"Total messages processed",
["handler", "status"]
)
processing_latency = Histogram(
"protean_message_processing_seconds",
"Message processing latency",
["handler"]
)
consumer_lag = Gauge(
"protean_consumer_lag",
"Messages behind head position",
["subscription"]
)
Next Steps
- Engine Architecture - Understand engine internals
- Configuration - Full configuration reference
- Subscription Types - Choose the right subscription