Skip to content

Redis PubSub Broker

The Redis PubSub broker uses Redis Lists for simple queuing with consumer groups. Despite its name, it doesn't use Redis's native Pub/Sub mechanism but implements a queue-based messaging system.

Overview

This broker uses Redis Lists as queues where:

  • Publishers append messages to Redis lists using rpush
  • Subscribers read messages from lists using lindex with position tracking
  • Messages are persisted in Redis lists until Redis is flushed
  • Consumer groups track their position in each list independently

Installation

The Redis PubSub broker requires the redis Python package:

# Install Protean with Redis support
pip install "protean[redis]"

# Or install Redis package separately
pip install redis>=2.0.0

Configuration

[brokers.notifications]
provider = "redis_pubsub"
URI = "redis://localhost:6379/0"
IS_ASYNC = true  # Optional: Use async processing (default: false)

Configuration Options

Option Default Description
provider Required Must be "redis_pubsub" for Redis PubSub broker
URI Required Redis connection string
IS_ASYNC false Enable asynchronous message processing

Capabilities

The Redis PubSub broker provides simple queuing capabilities:

  • BASIC_PUBSUB - Not supported
  • SIMPLE_QUEUING - Consumer groups with position tracking
  • RELIABLE_MESSAGING - No acknowledgments (ack/nack not supported)
  • ORDERED_MESSAGING - No ordering guarantees
  • ENTERPRISE_STREAMING - Not supported

Usage Examples

Basic Publishing

from protean import Domain

domain = Domain(__name__)
domain.config['brokers'] = {
    'notifications': {
        'provider': 'redis_pubsub',
        'URI': 'redis://localhost:6379/0'
    }
}
domain.init()

# Publish a notification
domain.brokers['notifications'].publish(
    stream="user:notifications",  # Channel name in Redis
    message={
        "type": "notification",
        "user_id": "123",
        "title": "New Message",
        "body": "You have a new message!"
    }
)

Subscribing to Channels

@domain.subscriber(
    stream="user:notifications",
    broker="notifications"
)
class NotificationSubscriber:
    @handle("notification")
    def send_push_notification(self, message):
        # Send push notification to user's device
        push_service.send(
            user_id=message["user_id"],
            title=message["title"],
            body=message["body"]
        )

Message Distribution

Useful for distributing messages across consumer groups:

# WebSocket handler example
class ChatRoom:
    def __init__(self, room_id: str):
        self.room_id = room_id
        self.channel = f"chat:{room_id}"

    def send_message(self, user_id: str, message: str):
        # Broadcast to all connected clients
        domain.brokers['notifications'].publish(
            stream=self.channel,
            message={
                "type": "chat.message",
                "user_id": user_id,
                "message": message,
                "timestamp": datetime.utcnow().isoformat()
            }
        )

# Subscribe to chat messages
@domain.subscriber(
    stream="chat:*",
    broker="notifications"
)
class ChatSubscriber:
    @handle("chat.message")
    async def relay_to_websocket(self, message):
        # Send to WebSocket clients
        await websocket_manager.broadcast(
            room=message["stream"].split(":")[1],
            data=message
        )

Use Cases

1. Real-time Notifications

# User notification system
class NotificationService:
    def notify_user(self, user_id: str, notification: dict):
        domain.brokers['notifications'].publish(
            stream=f"user:{user_id}:notifications",
            message={
                "type": "notification",
                **notification
            }
        )

    def notify_followers(self, user_id: str, update: dict):
        # Notify all followers about user activity
        for follower_id in get_followers(user_id):
            self.notify_user(follower_id, update)

2. Cache Invalidation

# Coordinate cache invalidation across services
@domain.subscriber(stream="cache:invalidation", broker="notifications")
class CacheInvalidator:
    @handle("invalidate")
    def clear_cache(self, message):
        cache_key = message["key"]
        cache.delete(cache_key)
        logger.info(f"Invalidated cache key: {cache_key}")

# Trigger cache invalidation
def update_user_profile(user_id: str, data: dict):
    # Update database
    user_repo.update(user_id, data)

    # Broadcast cache invalidation
    domain.brokers['notifications'].publish(
        stream="cache:invalidation",
        message={
            "type": "invalidate",
            "key": f"user:{user_id}"
        }
    )

3. Metrics and Monitoring

# Collect metrics from distributed services
@domain.subscriber(stream="metrics:*", broker="notifications")
class MetricsCollector:
    @handle("metric")
    def collect_metric(self, message):
        metric_name = message["name"]
        value = message["value"]
        tags = message.get("tags", {})

        # Send to monitoring system
        statsd.gauge(metric_name, value, tags=tags)

# Emit metrics
def track_api_latency(endpoint: str, duration: float):
    domain.brokers['notifications'].publish(
        stream="metrics:api",
        message={
            "type": "metric",
            "name": "api.latency",
            "value": duration,
            "tags": {"endpoint": endpoint}
        }
    )

Limitations and Considerations

Limited Persistence

Messages are stored in Redis lists but have no durability guarantees:

# Messages persist in Redis lists, but lost if Redis restarts without persistence
domain.brokers['notifications'].publish(
    stream="important-events",
    message={"type": "critical", "data": "Stored in Redis list"}
)

# For better durability, use Redis Streams with AOF persistence
domain.brokers['reliable'].publish(  # Redis Stream broker
    stream="important-events",
    message={"type": "critical", "data": "More durable with Redis Streams"}
)

No Acknowledgment Support

# Messages cannot be acknowledged or rejected
# The broker doesn't track if messages were successfully processed
result = domain.brokers['notifications'].publish(
    stream="notifications",
    message={"type": "alert"}
)
# result is just a message identifier, no delivery confirmation

Simple Consumer Groups

Consumer groups are supported but with limitations:

# Consumer groups track position in the list independently
@domain.subscriber(stream="orders", broker="notifications", consumer_group="group1")
class OrderProcessor1:
    @handle("order.created")
    def process(self, msg):
        print("Group 1 processing")

@domain.subscriber(stream="orders", broker="notifications", consumer_group="group2")  
class OrderProcessor2:
    @handle("order.created")
    def process(self, msg):
        print("Group 2 processing")  # Different group, processes same messages

Position Tracking

Consumer groups track their position, but positions can be lost:

# Implement reconnection logic
class ResilientSubscriber:
    def __init__(self):
        self.connected = False
        self.reconnect_attempts = 0

    def connect(self):
        while self.reconnect_attempts < 5:
            try:
                # Subscribe to channel
                self.subscribe()
                self.connected = True
                break
            except ConnectionError:
                self.reconnect_attempts += 1
                time.sleep(2 ** self.reconnect_attempts)

Performance Considerations

Message Size

Keep messages small for optimal performance:

# Bad: Large message
domain.brokers['notifications'].publish(
    stream="updates",
    message={
        "type": "update",
        "data": large_binary_data  # Avoid!
    }
)

# Good: Reference to data
domain.brokers['notifications'].publish(
    stream="updates",
    message={
        "type": "update",
        "data_url": "s3://bucket/key",  # Reference instead
        "size_bytes": 1024
    }
)

Channel Naming

Use hierarchical channel names for efficient pattern matching:

# Good channel naming
"user:123:notifications"     # User-specific
"chat:room:456"              # Chat room
"system:alerts:critical"     # System alerts
"metrics:api:latency"        # API metrics

# Pattern subscriptions
"user:*:notifications"       # All user notifications
"chat:room:*"                # All chat rooms
"system:alerts:*"            # All system alerts
"metrics:*"                  # All metrics

Message Processing

Messages are processed sequentially per consumer group. Each group maintains its own position counter in Redis.

Monitoring and Debugging

Redis CLI Commands

Monitor Pub/Sub activity:

# List active channels
redis-cli PUBSUB CHANNELS

# Count subscribers for a channel
redis-cli PUBSUB NUMSUB user:123:notifications

# Monitor all Pub/Sub activity
redis-cli MONITOR | grep -E "PUBLISH|SUBSCRIBE"

# Subscribe to a channel for debugging
redis-cli SUBSCRIBE user:notifications

Logging

Enable debug logging:

import logging

# Enable Redis PubSub broker logging
logging.getLogger('protean.adapters.broker.redis_pubsub').setLevel(logging.DEBUG)

# Log all published messages
class LoggingPubSubBroker(RedisPubSubBroker):
    def _publish(self, stream: str, message: dict) -> int:
        logger.debug(f"Publishing to {stream}: {message}")
        result = super()._publish(stream, message)
        logger.debug(f"Delivered to {result} subscribers")
        return result

Health Checks

Monitor broker health:

def health_check():
    broker = domain.brokers['notifications']

    # Test connectivity
    if not broker.ping():
        return {"status": "unhealthy", "error": "Connection failed"}

    # Get health statistics
    stats = broker.health_stats()

    if stats.get('healthy', False):
        return {
            "status": "healthy",
            "connected_clients": stats.get('connected_clients', 0),
            "used_memory": stats.get('used_memory_human', 'unknown')
        }
    else:
        return {"status": "degraded", "error": stats.get('error', 'Unknown')}

Migration Strategies

To Redis Streams

When you need persistence and reliability:

# Before: Redis PubSub
[brokers.notifications]
provider = "redis_pubsub"
URI = "redis://localhost:6379/0"

# After: Redis Streams
[brokers.notifications]
provider = "redis"  # Redis Streams
URI = "redis://localhost:6379/0"
MAXLEN = 10000  # Keep last 10k messages

Key differences to handle: 1. Add consumer group parameter to subscribers 2. Implement message acknowledgment 3. Handle message IDs returned from publish

From Redis PubSub

When migrating from Redis PubSub to other brokers:

# Compatibility layer
class BrokerCompatibilityLayer:
    def __init__(self, old_broker, new_broker):
        self.old_broker = old_broker
        self.new_broker = new_broker
        self.migration_mode = True

    def publish(self, stream: str, message: dict):
        # Publish to both during migration
        if self.migration_mode:
            self.old_broker.publish(stream, message)
        return self.new_broker.publish(stream, message)

    def complete_migration(self):
        self.migration_mode = False

Best Practices

1. Use for Simple Queuing

Redis PubSub broker is suitable for: - Simple message distribution - Development and testing - Scenarios where ack/nack isn't needed - Basic consumer group functionality

Not suitable for: - Critical business events requiring acknowledgment - Complex message routing - Scenarios requiring message replay - High-throughput production systems

2. Implement Circuit Breakers

from circuit_breaker import CircuitBreaker

class ResilientPubSubBroker:
    def __init__(self, broker):
        self.broker = broker
        self.breaker = CircuitBreaker(
            failure_threshold=5,
            recovery_timeout=60
        )

    @circuit_breaker
    def publish(self, stream: str, message: dict):
        return self.broker.publish(stream, message)

3. Keep Messages Small

# Compress large messages
import zlib
import json

def publish_compressed(broker, stream: str, message: dict):
    serialized = json.dumps(message)
    if len(serialized) > 1024:  # Compress if > 1KB
        compressed = zlib.compress(serialized.encode())
        broker.publish(stream, {
            "compressed": True,
            "data": compressed.hex()
        })
    else:
        broker.publish(stream, message)

4. Monitor Subscriber Count

def ensure_subscribers(broker, stream: str, min_subscribers: int = 1):
    """Ensure minimum subscribers before publishing critical messages."""
    subscriber_count = broker.get_subscriber_count(stream)
    if subscriber_count < min_subscribers:
        logger.warning(
            f"Only {subscriber_count} subscribers for {stream}, "
            f"expected at least {min_subscribers}"
        )
        return False
    return True

Comparison with Other Brokers

Feature Redis PubSub Redis Streams Inline
Persistence Redis Lists Yes (durable) No
Delivery Guarantee None At-least-once Best-effort
Consumer Groups Basic Advanced Yes
Message Ordering No Yes No
Acknowledgments No Yes Yes
Performance High High Very High
Use Case Simple Queuing Event Streaming Development

Next Steps