Skip to content

BaseBroker

Message broker interface. All broker adapters (Inline, Redis Stream, Redis PubSub, etc.) implement this contract.

See Broker Adapters for concrete adapter configuration.

Base class for all broker implementations.

Provides shared behavior (UoW integration, connection recovery, capability checks, subscriber registration) via the Template Method pattern. Subclasses implement the abstract _underscore methods for broker-specific logic.

Source code in src/protean/port/broker.py
111
112
113
114
115
116
117
118
119
120
121
def __init__(
    self, name: str, domain: "Domain", conn_info: dict[str, str | bool]
) -> None:
    self.name = name
    self.domain = domain
    self.conn_info = conn_info

    self._subscribers = defaultdict(set)
    self._last_ping_time = None
    self._last_ping_success = None
    self._start_time = time.time()

capabilities abstractmethod property

capabilities: BrokerCapabilities

Return the capabilities of this broker implementation.

RETURNS DESCRIPTION
BrokerCapabilities

The capabilities supported by this broker

TYPE: BrokerCapabilities

has_capability

has_capability(capability: BrokerCapabilities) -> bool

Check if broker has a specific capability.

PARAMETER DESCRIPTION
capability

The capability to check for

TYPE: BrokerCapabilities

RETURNS DESCRIPTION
bool

True if the broker has the capability, False otherwise

TYPE: bool

Source code in src/protean/port/broker.py
132
133
134
135
136
137
138
139
140
141
def has_capability(self, capability: BrokerCapabilities) -> bool:
    """Check if broker has a specific capability.

    Args:
        capability: The capability to check for

    Returns:
        bool: True if the broker has the capability, False otherwise
    """
    return capability in self.capabilities

has_all_capabilities

has_all_capabilities(
    capabilities: BrokerCapabilities,
) -> bool

Check if broker has all the specified capabilities.

PARAMETER DESCRIPTION
capabilities

The capabilities to check for

TYPE: BrokerCapabilities

RETURNS DESCRIPTION
bool

True if the broker has all capabilities, False otherwise

TYPE: bool

Source code in src/protean/port/broker.py
143
144
145
146
147
148
149
150
151
152
def has_all_capabilities(self, capabilities: BrokerCapabilities) -> bool:
    """Check if broker has all the specified capabilities.

    Args:
        capabilities: The capabilities to check for

    Returns:
        bool: True if the broker has all capabilities, False otherwise
    """
    return (self.capabilities & capabilities) == capabilities

has_any_capability

has_any_capability(
    capabilities: BrokerCapabilities,
) -> bool

Check if broker has any of the specified capabilities.

PARAMETER DESCRIPTION
capabilities

The capabilities to check for

TYPE: BrokerCapabilities

RETURNS DESCRIPTION
bool

True if the broker has any of the capabilities, False otherwise

TYPE: bool

Source code in src/protean/port/broker.py
154
155
156
157
158
159
160
161
162
163
def has_any_capability(self, capabilities: BrokerCapabilities) -> bool:
    """Check if broker has any of the specified capabilities.

    Args:
        capabilities: The capabilities to check for

    Returns:
        bool: True if the broker has any of the capabilities, False otherwise
    """
    return bool(self.capabilities & capabilities)

publish

publish(stream: str, message: dict) -> Optional[str]

Publish a message to the broker.

PARAMETER DESCRIPTION
stream

The stream to which the message should be published

TYPE: str

message

The message payload to be published

TYPE: dict

RETURNS DESCRIPTION
str

The identifier of the message. The content of the identifier is broker-specific.

TYPE: Optional[str]

Optional[str]

All brokers are guaranteed to provide message identifiers.

RAISES DESCRIPTION
ValidationError

If message is an empty dict

Source code in src/protean/port/broker.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def publish(self, stream: str, message: dict) -> Optional[str]:
    """Publish a message to the broker.

    Args:
        stream (str): The stream to which the message should be published
        message (dict): The message payload to be published

    Returns:
        str: The identifier of the message. The content of the identifier is broker-specific.
        All brokers are guaranteed to provide message identifiers.

    Raises:
        ValidationError: If message is an empty dict
    """
    if not message:
        raise ValidationError({"message": ["Message cannot be empty"]})

    if current_uow:
        logger.debug(f"Recording message {message} in {current_uow} for dispatch")

        current_uow.register_message(stream, message, broker_name=self.name)
    else:
        try:
            identifier = self._publish(stream, message)
        except Exception as e:
            # Check if this is a connection-related error and attempt recovery
            if self._is_connection_error(e):
                logger.warning(f"Connection error during publish: {e}")
                if self._ensure_connection():
                    # Retry the operation once after reconnection
                    identifier = self._publish(stream, message)
                else:
                    raise
            else:
                raise

        if (
            self.domain.config["message_processing"] == Processing.SYNC.value
            and self._subscribers[stream]
        ):
            for subscriber_cls in self._subscribers[stream]:
                subscriber = subscriber_cls()
                subscriber(message)

        return identifier

ping

ping() -> bool

Test broker connectivity.

RETURNS DESCRIPTION
bool

True if broker is reachable and responsive, False otherwise

TYPE: bool

Source code in src/protean/port/broker.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def ping(self) -> bool:
    """Test broker connectivity.

    Returns:
        bool: True if broker is reachable and responsive, False otherwise
    """
    try:
        start_time = time.time()
        result = self._ping()
        self._last_ping_time = time.time() - start_time
        self._last_ping_success = result
        return result
    except Exception as e:
        logger.debug(f"Ping failed for broker {self.name}: {e}")
        self._last_ping_time = None
        self._last_ping_success = False
        return False

health_stats

health_stats() -> dict

Get comprehensive health statistics for the broker.

RETURNS DESCRIPTION
dict

Health statistics with the following structure: { 'status': 'healthy' | 'degraded' | 'unhealthy', 'connected': bool, 'last_ping_ms': float | None, 'uptime_seconds': float, 'details': dict # Broker-specific details }

TYPE: dict

Source code in src/protean/port/broker.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def health_stats(self) -> dict:
    """Get comprehensive health statistics for the broker.

    Returns:
        dict: Health statistics with the following structure:
            {
                'status': 'healthy' | 'degraded' | 'unhealthy',
                'connected': bool,
                'last_ping_ms': float | None,
                'uptime_seconds': float,
                'details': dict  # Broker-specific details
            }
    """
    try:
        # Get broker-specific health details
        broker_details = self._health_stats()

        # Perform a fresh ping to get current connectivity status
        is_connected = self.ping()

        # Determine overall health status
        if is_connected and broker_details.get("healthy", True):
            status = "healthy"
        elif is_connected:
            status = "degraded"  # Connected but some issues reported
        else:
            status = "unhealthy"

        # Calculate uptime since broker initialization
        uptime_seconds = time.time() - self._start_time

        return {
            "status": status,
            "connected": is_connected,
            "last_ping_ms": self._last_ping_time * 1000
            if self._last_ping_time is not None
            else None,
            "uptime_seconds": uptime_seconds,
            "details": broker_details,
        }
    except Exception as e:
        logger.error(f"Error gathering health stats for broker {self.name}: {e}")
        return {
            "status": "unhealthy",
            "connected": False,
            "last_ping_ms": None,
            "uptime_seconds": 0,
            "details": {"error": str(e)},
        }

ensure_connection

ensure_connection() -> bool

Ensure broker connection is healthy, attempt reconnection if needed.

This method can be called explicitly or is triggered automatically when connection-related exceptions are encountered.

RETURNS DESCRIPTION
bool

True if connection is healthy/restored, False otherwise

TYPE: bool

Source code in src/protean/port/broker.py
279
280
281
282
283
284
285
286
287
288
def ensure_connection(self) -> bool:
    """Ensure broker connection is healthy, attempt reconnection if needed.

    This method can be called explicitly or is triggered automatically
    when connection-related exceptions are encountered.

    Returns:
        bool: True if connection is healthy/restored, False otherwise
    """
    return self._ensure_connection()

get_next

get_next(
    stream: str, consumer_group: str
) -> tuple[str, dict] | None

Retrieve the next message to process from broker.

PARAMETER DESCRIPTION
stream

The stream from which to retrieve the message

TYPE: str

consumer_group

The consumer group identifier

TYPE: str

RETURNS DESCRIPTION
tuple[str, dict] | None

tuple[str, dict] | None: A tuple of (identifier, message) or None if no messages available

Source code in src/protean/port/broker.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
def get_next(self, stream: str, consumer_group: str) -> tuple[str, dict] | None:
    """Retrieve the next message to process from broker.

    Args:
        stream (str): The stream from which to retrieve the message
        consumer_group (str): The consumer group identifier

    Returns:
        tuple[str, dict] | None: A tuple of (identifier, message) or None if no messages available
    """
    # Check if broker supports consumer groups
    if not self.has_capability(BrokerCapabilities.CONSUMER_GROUPS):
        logger.warning(f"Broker {self.name} does not support consumer groups")
        return None

    try:
        return self._get_next(stream, consumer_group)
    except Exception as e:
        # Check if this is a connection-related error and attempt recovery
        if self._is_connection_error(e):
            logger.warning(f"Connection error during get_next: {e}")
            if self._ensure_connection():
                # Retry the operation once after reconnection
                return self._get_next(stream, consumer_group)
            else:
                raise
        else:
            raise

read

read(
    stream: str, consumer_group: str, no_of_messages: int
) -> list[tuple[str, dict]]

Read messages from the broker.

PARAMETER DESCRIPTION
stream

The stream from which to read messages

TYPE: str

consumer_group

The consumer group identifier

TYPE: str

no_of_messages

The number of messages to read

TYPE: int

RETURNS DESCRIPTION
list[tuple[str, dict]]

list[tuple[str, dict]]: The list of (identifier, message) tuples

Source code in src/protean/port/broker.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
def read(
    self, stream: str, consumer_group: str, no_of_messages: int
) -> list[tuple[str, dict]]:
    """Read messages from the broker.

    Args:
        stream (str): The stream from which to read messages
        consumer_group (str): The consumer group identifier
        no_of_messages (int): The number of messages to read

    Returns:
        list[tuple[str, dict]]: The list of (identifier, message) tuples
    """
    # Check if broker supports consumer groups
    if not self.has_capability(BrokerCapabilities.CONSUMER_GROUPS):
        logger.warning(f"Broker {self.name} does not support consumer groups")
        return []

    try:
        return self._read(stream, consumer_group, no_of_messages)
    except Exception as e:
        # Check if this is a connection-related error and attempt recovery
        if self._is_connection_error(e):
            logger.warning(f"Connection error during read: {e}")
            if self._ensure_connection():
                # Retry the operation once after reconnection
                return self._read(stream, consumer_group, no_of_messages)
            else:
                raise
        else:
            raise

ack

ack(
    stream: str, identifier: str, consumer_group: str
) -> bool

Acknowledge successful processing of a message.

PARAMETER DESCRIPTION
stream

The stream from which the message was received

TYPE: str

identifier

The unique identifier of the message to acknowledge

TYPE: str

consumer_group

The consumer group that processed the message

TYPE: str

RETURNS DESCRIPTION
bool

True if the message was successfully acknowledged, False otherwise

TYPE: bool

Source code in src/protean/port/broker.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
def ack(self, stream: str, identifier: str, consumer_group: str) -> bool:
    """Acknowledge successful processing of a message.

    Args:
        stream (str): The stream from which the message was received
        identifier (str): The unique identifier of the message to acknowledge
        consumer_group (str): The consumer group that processed the message

    Returns:
        bool: True if the message was successfully acknowledged, False otherwise
    """
    # Check if broker supports acknowledgment
    if not self.has_capability(BrokerCapabilities.ACK_NACK):
        logger.warning(
            f"Broker {self.name} does not support message acknowledgment"
        )
        return False

    try:
        return self._ack(stream, identifier, consumer_group)
    except Exception as e:
        # Check if this is a connection-related error and attempt recovery
        if self._is_connection_error(e):
            logger.warning(f"Connection error during ack: {e}")
            if self._ensure_connection():
                # Retry the operation once after reconnection
                return self._ack(stream, identifier, consumer_group)
            else:
                raise
        else:
            raise

nack

nack(
    stream: str, identifier: str, consumer_group: str
) -> bool

Negative acknowledge - mark message for reprocessing.

PARAMETER DESCRIPTION
stream

The stream from which the message was received

TYPE: str

identifier

The unique identifier of the message to nack

TYPE: str

consumer_group

The consumer group that failed to process the message

TYPE: str

RETURNS DESCRIPTION
bool

True if the message was successfully marked for reprocessing, False otherwise

TYPE: bool

Source code in src/protean/port/broker.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def nack(self, stream: str, identifier: str, consumer_group: str) -> bool:
    """Negative acknowledge - mark message for reprocessing.

    Args:
        stream (str): The stream from which the message was received
        identifier (str): The unique identifier of the message to nack
        consumer_group (str): The consumer group that failed to process the message

    Returns:
        bool: True if the message was successfully marked for reprocessing, False otherwise
    """
    # Check if broker supports negative acknowledgment
    if not self.has_capability(BrokerCapabilities.ACK_NACK):
        logger.warning(
            f"Broker {self.name} does not support message negative acknowledgment"
        )
        return False

    try:
        return self._nack(stream, identifier, consumer_group)
    except Exception as e:
        # Check if this is a connection-related error and attempt recovery
        if self._is_connection_error(e):
            logger.warning(f"Connection error during nack: {e}")
            if self._ensure_connection():
                # Retry the operation once after reconnection
                return self._nack(stream, identifier, consumer_group)
            else:
                raise
        else:
            raise

read_blocking

read_blocking(
    stream: str,
    consumer_group: str,
    consumer_name: str,
    timeout_ms: int = 5000,
    count: int = 1,
) -> list[tuple[str, dict]]

Read messages from the broker using blocking mode.

This is an optional method that brokers can implement to support efficient blocking reads for stream-based subscriptions.

PARAMETER DESCRIPTION
stream

The stream from which to read messages

TYPE: str

consumer_group

The consumer group identifier

TYPE: str

consumer_name

The unique consumer name within the group

TYPE: str

timeout_ms

Timeout in milliseconds to wait for messages (0 = block indefinitely)

TYPE: int DEFAULT: 5000

count

Maximum number of messages to read

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
list[tuple[str, dict]]

list[tuple[str, dict]]: The list of (identifier, message) tuples

Source code in src/protean/port/broker.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
def read_blocking(
    self,
    stream: str,
    consumer_group: str,
    consumer_name: str,
    timeout_ms: int = 5000,
    count: int = 1,
) -> list[tuple[str, dict]]:
    """Read messages from the broker using blocking mode.

    This is an optional method that brokers can implement to support
    efficient blocking reads for stream-based subscriptions.

    Args:
        stream (str): The stream from which to read messages
        consumer_group (str): The consumer group identifier
        consumer_name (str): The unique consumer name within the group
        timeout_ms (int): Timeout in milliseconds to wait for messages (0 = block indefinitely)
        count (int): Maximum number of messages to read

    Returns:
        list[tuple[str, dict]]: The list of (identifier, message) tuples
    """
    # Check if broker supports blocking reads
    if not self.has_capability(BrokerCapabilities.BLOCKING_READ):
        # Fall back to regular read for brokers that don't support blocking
        return self._read(stream, consumer_group, count)

    try:
        return self._read_blocking(
            stream, consumer_group, consumer_name, timeout_ms, count
        )
    except Exception as e:
        # Check if this is a connection-related error and attempt recovery
        if self._is_connection_error(e):
            logger.warning(f"Connection error during read_blocking: {e}")
            if self._ensure_connection():
                # Retry the operation once after reconnection
                return self._read_blocking(
                    stream, consumer_group, consumer_name, timeout_ms, count
                )
            else:
                raise
        else:
            raise

info

info() -> dict

Get information about consumer groups and consumers in each group.

RETURNS DESCRIPTION
dict

Information about consumer groups and their consumers

TYPE: dict

Source code in src/protean/port/broker.py
588
589
590
591
592
593
594
def info(self) -> dict:
    """Get information about consumer groups and consumers in each group.

    Returns:
        dict: Information about consumer groups and their consumers
    """
    return self._info()

dlq_list

dlq_list(
    dlq_streams: list[str], limit: int = 100
) -> list[DLQEntry]

List DLQ messages across specified DLQ streams.

PARAMETER DESCRIPTION
dlq_streams

List of DLQ stream names to query.

TYPE: list[str]

limit

Maximum total messages to return.

TYPE: int DEFAULT: 100

RETURNS DESCRIPTION
list[DLQEntry]

List of DLQEntry objects sorted by failure time (newest first).

Source code in src/protean/port/broker.py
608
609
610
611
612
613
614
615
616
617
618
619
620
621
def dlq_list(self, dlq_streams: list[str], limit: int = 100) -> list[DLQEntry]:
    """List DLQ messages across specified DLQ streams.

    Args:
        dlq_streams: List of DLQ stream names to query.
        limit: Maximum total messages to return.

    Returns:
        List of DLQEntry objects sorted by failure time (newest first).
    """
    if not self.has_capability(BrokerCapabilities.DEAD_LETTER_QUEUE):
        logger.warning(f"Broker {self.name} does not support DLQ management")
        return []
    return self._dlq_list(dlq_streams, limit)

dlq_inspect

dlq_inspect(
    dlq_stream: str, dlq_id: str
) -> DLQEntry | None

Inspect a specific DLQ message by its DLQ entry ID.

PARAMETER DESCRIPTION
dlq_stream

The DLQ stream to look in.

TYPE: str

dlq_id

The entry identifier within the DLQ stream.

TYPE: str

RETURNS DESCRIPTION
DLQEntry | None

DLQEntry if found, None otherwise.

Source code in src/protean/port/broker.py
623
624
625
626
627
628
629
630
631
632
633
634
635
636
def dlq_inspect(self, dlq_stream: str, dlq_id: str) -> DLQEntry | None:
    """Inspect a specific DLQ message by its DLQ entry ID.

    Args:
        dlq_stream: The DLQ stream to look in.
        dlq_id: The entry identifier within the DLQ stream.

    Returns:
        DLQEntry if found, None otherwise.
    """
    if not self.has_capability(BrokerCapabilities.DEAD_LETTER_QUEUE):
        logger.warning(f"Broker {self.name} does not support DLQ management")
        return None
    return self._dlq_inspect(dlq_stream, dlq_id)

dlq_replay

dlq_replay(
    dlq_stream: str, dlq_id: str, target_stream: str
) -> bool

Replay a single DLQ message back to its original stream.

PARAMETER DESCRIPTION
dlq_stream

The DLQ stream the message is in.

TYPE: str

dlq_id

The entry identifier within the DLQ stream.

TYPE: str

target_stream

The stream to re-publish the message to.

TYPE: str

RETURNS DESCRIPTION
bool

True if replayed successfully, False otherwise.

Source code in src/protean/port/broker.py
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
def dlq_replay(self, dlq_stream: str, dlq_id: str, target_stream: str) -> bool:
    """Replay a single DLQ message back to its original stream.

    Args:
        dlq_stream: The DLQ stream the message is in.
        dlq_id: The entry identifier within the DLQ stream.
        target_stream: The stream to re-publish the message to.

    Returns:
        True if replayed successfully, False otherwise.
    """
    if not self.has_capability(BrokerCapabilities.DEAD_LETTER_QUEUE):
        logger.warning(f"Broker {self.name} does not support DLQ management")
        return False
    return self._dlq_replay(dlq_stream, dlq_id, target_stream)

dlq_replay_all

dlq_replay_all(dlq_stream: str, target_stream: str) -> int

Replay all DLQ messages from a stream back to the original stream.

PARAMETER DESCRIPTION
dlq_stream

The DLQ stream to drain.

TYPE: str

target_stream

The stream to re-publish messages to.

TYPE: str

RETURNS DESCRIPTION
int

Number of messages replayed.

Source code in src/protean/port/broker.py
654
655
656
657
658
659
660
661
662
663
664
665
666
667
def dlq_replay_all(self, dlq_stream: str, target_stream: str) -> int:
    """Replay all DLQ messages from a stream back to the original stream.

    Args:
        dlq_stream: The DLQ stream to drain.
        target_stream: The stream to re-publish messages to.

    Returns:
        Number of messages replayed.
    """
    if not self.has_capability(BrokerCapabilities.DEAD_LETTER_QUEUE):
        logger.warning(f"Broker {self.name} does not support DLQ management")
        return 0
    return self._dlq_replay_all(dlq_stream, target_stream)

dlq_purge

dlq_purge(dlq_stream: str) -> int

Purge all messages from a DLQ stream.

PARAMETER DESCRIPTION
dlq_stream

The DLQ stream to purge.

TYPE: str

RETURNS DESCRIPTION
int

Number of messages purged.

Source code in src/protean/port/broker.py
669
670
671
672
673
674
675
676
677
678
679
680
681
def dlq_purge(self, dlq_stream: str) -> int:
    """Purge all messages from a DLQ stream.

    Args:
        dlq_stream: The DLQ stream to purge.

    Returns:
        Number of messages purged.
    """
    if not self.has_capability(BrokerCapabilities.DEAD_LETTER_QUEUE):
        logger.warning(f"Broker {self.name} does not support DLQ management")
        return 0
    return self._dlq_purge(dlq_stream)

close

close() -> None

Close the broker and release all connections.

Subclasses that hold external resources (connection pools, sockets, etc.) should override this to perform cleanup. The default implementation is a no-op so that adapters without external resources (e.g. the inline broker) work without changes.

Source code in src/protean/port/broker.py
703
704
705
706
707
708
709
710
def close(self) -> None:
    """Close the broker and release all connections.

    Subclasses that hold external resources (connection pools, sockets,
    etc.) should override this to perform cleanup.  The default
    implementation is a no-op so that adapters without external
    resources (e.g. the inline broker) work without changes.
    """

register

register(subscriber_cls: Type[BaseSubscriber]) -> None

Register a subscriber to this broker against its stream.

PARAMETER DESCRIPTION
subscriber_cls

The subscriber class connected to the stream.

TYPE: Type[BaseSubscriber]

Source code in src/protean/port/broker.py
712
713
714
715
716
717
718
719
720
721
722
723
724
def register(self, subscriber_cls: Type[BaseSubscriber]) -> None:
    """Register a subscriber to this broker against its stream.

    Args:
        subscriber_cls: The subscriber class connected to the stream.
    """
    stream = subscriber_cls.meta_.stream

    self._subscribers[stream].add(subscriber_cls)

    logger.debug(
        f"Broker {self.name}: Registered Subscriber {subscriber_cls.__name__} for stream {stream}"
    )