Skip to content

BaseBroker

Message broker interface. All broker adapters (Inline, Redis Stream, Redis PubSub, etc.) implement this contract.

See Broker Adapters for concrete adapter configuration.

This class outlines the base broker functions, to be satisfied by all implementing brokers.

It is also a marker interface for registering broker classes with the domain

Source code in src/protean/port/broker.py
80
81
82
83
84
85
86
87
88
89
90
def __init__(
    self, name: str, domain: "Domain", conn_info: dict[str, str | bool]
) -> None:
    self.name = name
    self.domain = domain
    self.conn_info = conn_info

    self._subscribers = defaultdict(set)
    self._last_ping_time = None
    self._last_ping_success = None
    self._start_time = time.time()

capabilities abstractmethod property

capabilities: BrokerCapabilities

Return the capabilities of this broker implementation.

RETURNS DESCRIPTION
BrokerCapabilities

The capabilities supported by this broker

TYPE: BrokerCapabilities

has_capability

has_capability(capability: BrokerCapabilities) -> bool

Check if broker has a specific capability.

PARAMETER DESCRIPTION
capability

The capability to check for

TYPE: BrokerCapabilities

RETURNS DESCRIPTION
bool

True if the broker has the capability, False otherwise

TYPE: bool

Source code in src/protean/port/broker.py
101
102
103
104
105
106
107
108
109
110
def has_capability(self, capability: BrokerCapabilities) -> bool:
    """Check if broker has a specific capability.

    Args:
        capability: The capability to check for

    Returns:
        bool: True if the broker has the capability, False otherwise
    """
    return capability in self.capabilities

has_all_capabilities

has_all_capabilities(capabilities: BrokerCapabilities) -> bool

Check if broker has all the specified capabilities.

PARAMETER DESCRIPTION
capabilities

The capabilities to check for

TYPE: BrokerCapabilities

RETURNS DESCRIPTION
bool

True if the broker has all capabilities, False otherwise

TYPE: bool

Source code in src/protean/port/broker.py
112
113
114
115
116
117
118
119
120
121
def has_all_capabilities(self, capabilities: BrokerCapabilities) -> bool:
    """Check if broker has all the specified capabilities.

    Args:
        capabilities: The capabilities to check for

    Returns:
        bool: True if the broker has all capabilities, False otherwise
    """
    return (self.capabilities & capabilities) == capabilities

has_any_capability

has_any_capability(capabilities: BrokerCapabilities) -> bool

Check if broker has any of the specified capabilities.

PARAMETER DESCRIPTION
capabilities

The capabilities to check for

TYPE: BrokerCapabilities

RETURNS DESCRIPTION
bool

True if the broker has any of the capabilities, False otherwise

TYPE: bool

Source code in src/protean/port/broker.py
123
124
125
126
127
128
129
130
131
132
def has_any_capability(self, capabilities: BrokerCapabilities) -> bool:
    """Check if broker has any of the specified capabilities.

    Args:
        capabilities: The capabilities to check for

    Returns:
        bool: True if the broker has any of the capabilities, False otherwise
    """
    return bool(self.capabilities & capabilities)

publish

publish(stream: str, message: dict) -> Optional[str]

Publish a message to the broker.

PARAMETER DESCRIPTION
stream

The stream to which the message should be published

TYPE: str

message

The message payload to be published

TYPE: dict

RETURNS DESCRIPTION
str

The identifier of the message. The content of the identifier is broker-specific.

TYPE: Optional[str]

Optional[str]

All brokers are guaranteed to provide message identifiers.

RAISES DESCRIPTION
ValidationError

If message is an empty dict

Source code in src/protean/port/broker.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def publish(self, stream: str, message: dict) -> Optional[str]:
    """Publish a message to the broker.

    Args:
        stream (str): The stream to which the message should be published
        message (dict): The message payload to be published

    Returns:
        str: The identifier of the message. The content of the identifier is broker-specific.
        All brokers are guaranteed to provide message identifiers.

    Raises:
        ValidationError: If message is an empty dict
    """
    if not message:
        raise ValidationError({"message": ["Message cannot be empty"]})

    if current_uow:
        logger.debug(f"Recording message {message} in {current_uow} for dispatch")

        current_uow.register_message(stream, message)
    else:
        try:
            identifier = self._publish(stream, message)
        except Exception as e:
            # Check if this is a connection-related error and attempt recovery
            if self._is_connection_error(e):
                logger.warning(f"Connection error during publish: {e}")
                if self._ensure_connection():
                    # Retry the operation once after reconnection
                    identifier = self._publish(stream, message)
                else:
                    raise
            else:
                raise

        if (
            self.domain.config["message_processing"] == Processing.SYNC.value
            and self._subscribers[stream]
        ):
            for subscriber_cls in self._subscribers[stream]:
                subscriber = subscriber_cls()
                subscriber(message)

        return identifier

ping

ping() -> bool

Test broker connectivity.

RETURNS DESCRIPTION
bool

True if broker is reachable and responsive, False otherwise

TYPE: bool

Source code in src/protean/port/broker.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def ping(self) -> bool:
    """Test broker connectivity.

    Returns:
        bool: True if broker is reachable and responsive, False otherwise
    """
    try:
        start_time = time.time()
        result = self._ping()
        self._last_ping_time = time.time() - start_time
        self._last_ping_success = result
        return result
    except Exception as e:
        logger.debug(f"Ping failed for broker {self.name}: {e}")
        self._last_ping_time = None
        self._last_ping_success = False
        return False

health_stats

health_stats() -> dict

Get comprehensive health statistics for the broker.

RETURNS DESCRIPTION
dict

Health statistics with the following structure: { 'status': 'healthy' | 'degraded' | 'unhealthy', 'connected': bool, 'last_ping_ms': float | None, 'uptime_seconds': float, 'details': dict # Broker-specific details }

TYPE: dict

Source code in src/protean/port/broker.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def health_stats(self) -> dict:
    """Get comprehensive health statistics for the broker.

    Returns:
        dict: Health statistics with the following structure:
            {
                'status': 'healthy' | 'degraded' | 'unhealthy',
                'connected': bool,
                'last_ping_ms': float | None,
                'uptime_seconds': float,
                'details': dict  # Broker-specific details
            }
    """
    try:
        # Get broker-specific health details
        broker_details = self._health_stats()

        # Perform a fresh ping to get current connectivity status
        is_connected = self.ping()

        # Determine overall health status
        if is_connected and broker_details.get("healthy", True):
            status = "healthy"
        elif is_connected:
            status = "degraded"  # Connected but some issues reported
        else:
            status = "unhealthy"

        # Calculate uptime since broker initialization
        uptime_seconds = time.time() - self._start_time

        return {
            "status": status,
            "connected": is_connected,
            "last_ping_ms": self._last_ping_time * 1000
            if self._last_ping_time is not None
            else None,
            "uptime_seconds": uptime_seconds,
            "details": broker_details,
        }
    except Exception as e:
        logger.error(f"Error gathering health stats for broker {self.name}: {e}")
        return {
            "status": "unhealthy",
            "connected": False,
            "last_ping_ms": None,
            "uptime_seconds": 0,
            "details": {"error": str(e)},
        }

ensure_connection

ensure_connection() -> bool

Ensure broker connection is healthy, attempt reconnection if needed.

This method can be called explicitly or is triggered automatically when connection-related exceptions are encountered.

RETURNS DESCRIPTION
bool

True if connection is healthy/restored, False otherwise

TYPE: bool

Source code in src/protean/port/broker.py
248
249
250
251
252
253
254
255
256
257
def ensure_connection(self) -> bool:
    """Ensure broker connection is healthy, attempt reconnection if needed.

    This method can be called explicitly or is triggered automatically
    when connection-related exceptions are encountered.

    Returns:
        bool: True if connection is healthy/restored, False otherwise
    """
    return self._ensure_connection()

get_next

get_next(stream: str, consumer_group: str) -> dict | None

Retrieve the next message to process from broker.

PARAMETER DESCRIPTION
stream

The stream from which to retrieve the message

TYPE: str

consumer_group

The consumer group identifier

TYPE: str

RETURNS DESCRIPTION
dict

The message payload, or None if no messages available

TYPE: dict | None

Source code in src/protean/port/broker.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def get_next(self, stream: str, consumer_group: str) -> dict | None:
    """Retrieve the next message to process from broker.

    Args:
        stream (str): The stream from which to retrieve the message
        consumer_group (str): The consumer group identifier

    Returns:
        dict: The message payload, or None if no messages available
    """
    # Check if broker supports consumer groups
    if not self.has_capability(BrokerCapabilities.CONSUMER_GROUPS):
        logger.warning(f"Broker {self.name} does not support consumer groups")
        return None

    try:
        return self._get_next(stream, consumer_group)
    except Exception as e:
        # Check if this is a connection-related error and attempt recovery
        if self._is_connection_error(e):
            logger.warning(f"Connection error during get_next: {e}")
            if self._ensure_connection():
                # Retry the operation once after reconnection
                return self._get_next(stream, consumer_group)
            else:
                raise
        else:
            raise

read

read(stream: str, consumer_group: str, no_of_messages: int) -> list[tuple[str, dict]]

Read messages from the broker.

PARAMETER DESCRIPTION
stream

The stream from which to read messages

TYPE: str

consumer_group

The consumer group identifier

TYPE: str

no_of_messages

The number of messages to read

TYPE: int

RETURNS DESCRIPTION
list[tuple[str, dict]]

list[tuple[str, dict]]: The list of (identifier, message) tuples

Source code in src/protean/port/broker.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
def read(
    self, stream: str, consumer_group: str, no_of_messages: int
) -> list[tuple[str, dict]]:
    """Read messages from the broker.

    Args:
        stream (str): The stream from which to read messages
        consumer_group (str): The consumer group identifier
        no_of_messages (int): The number of messages to read

    Returns:
        list[tuple[str, dict]]: The list of (identifier, message) tuples
    """
    # Check if broker supports consumer groups
    if not self.has_capability(BrokerCapabilities.CONSUMER_GROUPS):
        logger.warning(f"Broker {self.name} does not support consumer groups")
        return []

    try:
        return self._read(stream, consumer_group, no_of_messages)
    except Exception as e:
        # Check if this is a connection-related error and attempt recovery
        if self._is_connection_error(e):
            logger.warning(f"Connection error during read: {e}")
            if self._ensure_connection():
                # Retry the operation once after reconnection
                return self._read(stream, consumer_group, no_of_messages)
            else:
                raise
        else:
            raise

ack

ack(stream: str, identifier: str, consumer_group: str) -> bool

Acknowledge successful processing of a message.

PARAMETER DESCRIPTION
stream

The stream from which the message was received

TYPE: str

identifier

The unique identifier of the message to acknowledge

TYPE: str

consumer_group

The consumer group that processed the message

TYPE: str

RETURNS DESCRIPTION
bool

True if the message was successfully acknowledged, False otherwise

TYPE: bool

Source code in src/protean/port/broker.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def ack(self, stream: str, identifier: str, consumer_group: str) -> bool:
    """Acknowledge successful processing of a message.

    Args:
        stream (str): The stream from which the message was received
        identifier (str): The unique identifier of the message to acknowledge
        consumer_group (str): The consumer group that processed the message

    Returns:
        bool: True if the message was successfully acknowledged, False otherwise
    """
    # Check if broker supports acknowledgment
    if not self.has_capability(BrokerCapabilities.ACK_NACK):
        logger.warning(
            f"Broker {self.name} does not support message acknowledgment"
        )
        return False

    try:
        return self._ack(stream, identifier, consumer_group)
    except Exception as e:
        # Check if this is a connection-related error and attempt recovery
        if self._is_connection_error(e):
            logger.warning(f"Connection error during ack: {e}")
            if self._ensure_connection():
                # Retry the operation once after reconnection
                return self._ack(stream, identifier, consumer_group)
            else:
                raise
        else:
            raise

nack

nack(stream: str, identifier: str, consumer_group: str) -> bool

Negative acknowledge - mark message for reprocessing.

PARAMETER DESCRIPTION
stream

The stream from which the message was received

TYPE: str

identifier

The unique identifier of the message to nack

TYPE: str

consumer_group

The consumer group that failed to process the message

TYPE: str

RETURNS DESCRIPTION
bool

True if the message was successfully marked for reprocessing, False otherwise

TYPE: bool

Source code in src/protean/port/broker.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
def nack(self, stream: str, identifier: str, consumer_group: str) -> bool:
    """Negative acknowledge - mark message for reprocessing.

    Args:
        stream (str): The stream from which the message was received
        identifier (str): The unique identifier of the message to nack
        consumer_group (str): The consumer group that failed to process the message

    Returns:
        bool: True if the message was successfully marked for reprocessing, False otherwise
    """
    # Check if broker supports negative acknowledgment
    if not self.has_capability(BrokerCapabilities.ACK_NACK):
        logger.warning(
            f"Broker {self.name} does not support message negative acknowledgment"
        )
        return False

    try:
        return self._nack(stream, identifier, consumer_group)
    except Exception as e:
        # Check if this is a connection-related error and attempt recovery
        if self._is_connection_error(e):
            logger.warning(f"Connection error during nack: {e}")
            if self._ensure_connection():
                # Retry the operation once after reconnection
                return self._nack(stream, identifier, consumer_group)
            else:
                raise
        else:
            raise

read_blocking

read_blocking(stream: str, consumer_group: str, consumer_name: str, timeout_ms: int = 5000, count: int = 1) -> list[tuple[str, dict]]

Read messages from the broker using blocking mode.

This is an optional method that brokers can implement to support efficient blocking reads for stream-based subscriptions.

PARAMETER DESCRIPTION
stream

The stream from which to read messages

TYPE: str

consumer_group

The consumer group identifier

TYPE: str

consumer_name

The unique consumer name within the group

TYPE: str

timeout_ms

Timeout in milliseconds to wait for messages (0 = block indefinitely)

TYPE: int DEFAULT: 5000

count

Maximum number of messages to read

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
list[tuple[str, dict]]

list[tuple[str, dict]]: The list of (identifier, message) tuples

Source code in src/protean/port/broker.py
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
def read_blocking(
    self,
    stream: str,
    consumer_group: str,
    consumer_name: str,
    timeout_ms: int = 5000,
    count: int = 1,
) -> list[tuple[str, dict]]:
    """Read messages from the broker using blocking mode.

    This is an optional method that brokers can implement to support
    efficient blocking reads for stream-based subscriptions.

    Args:
        stream (str): The stream from which to read messages
        consumer_group (str): The consumer group identifier
        consumer_name (str): The unique consumer name within the group
        timeout_ms (int): Timeout in milliseconds to wait for messages (0 = block indefinitely)
        count (int): Maximum number of messages to read

    Returns:
        list[tuple[str, dict]]: The list of (identifier, message) tuples
    """
    # Check if broker supports blocking reads
    if not self.has_capability(BrokerCapabilities.BLOCKING_READ):
        # Fall back to regular read for brokers that don't support blocking
        return self._read(stream, consumer_group, count)

    try:
        return self._read_blocking(
            stream, consumer_group, consumer_name, timeout_ms, count
        )
    except Exception as e:
        # Check if this is a connection-related error and attempt recovery
        if self._is_connection_error(e):
            logger.warning(f"Connection error during read_blocking: {e}")
            if self._ensure_connection():
                # Retry the operation once after reconnection
                return self._read_blocking(
                    stream, consumer_group, consumer_name, timeout_ms, count
                )
            else:
                raise
        else:
            raise

info

info() -> dict

Get information about consumer groups and consumers in each group.

RETURNS DESCRIPTION
dict

Information about consumer groups and their consumers

TYPE: dict

Source code in src/protean/port/broker.py
557
558
559
560
561
562
563
def info(self) -> dict:
    """Get information about consumer groups and consumers in each group.

    Returns:
        dict: Information about consumer groups and their consumers
    """
    return self._info()

register

register(subscriber_cls: Type[BaseSubscriber]) -> None

Register a subscriber to this broker against its stream.

PARAMETER DESCRIPTION
subscriber_cls

The subscriber class connected to the stream.

TYPE: Type[BaseSubscriber]

Source code in src/protean/port/broker.py
573
574
575
576
577
578
579
580
581
582
583
584
585
def register(self, subscriber_cls: Type[BaseSubscriber]) -> None:
    """Register a subscriber to this broker against its stream.

    Args:
        subscriber_cls: The subscriber class connected to the stream.
    """
    stream = subscriber_cls.meta_.stream

    self._subscribers[stream].add(subscriber_cls)

    logger.debug(
        f"Broker {self.name}: Registered Subscriber {subscriber_cls.__name__} for stream {stream}"
    )