Error Handling

Introduction

Handling errors in message-based systems is crucial for building robust applications. Mersal provides a comprehensive error handling system with features for:

  1. Tracking and managing errors

  2. Implementing retry strategies

  3. Identifying messages that should fail immediately (fail-fast)

  4. Handling poisonous messages via dead letter queues

These features help ensure that your application can gracefully handle failures, improve reliability, and avoid getting stuck in endless retry loops.

Usage

Notes

Default Error Handling

Mersal comes with a default error handling strategy that provides these key capabilities:

  • Error Tracking: Records errors for each message and tracks how many times it has failed

  • Retry Logic: Automatically retries failed messages up to a configurable number of times

  • Fail-Fast Detection: Identifies exceptions that should immediately fail without retries

  • Dead Letter Queue: Forwards messages that fail repeatedly to a dedicated error queue

The default retry strategy is implemented in the DefaultRetryStrategy class, which creates a DefaultRetryStep in the message processing pipeline.

Customizing Error Handling

You can customize the error handling mechanism by configuring these components:

  1. Error Tracker: Tracks errors for messages and determines when they’ve failed too many times

  2. Error Handler: Handles poisonous messages, typically by sending them to a dead letter queue

  3. Fail-Fast Checker: Determines which exceptions should cause immediate failure without retries

Here’s an example of configuring error handling when creating a Mersal application:

from mersal.app import Mersal
from mersal.retry.error_tracking.in_memory_error_tracker import InMemoryErrorTracker
from mersal.retry.error_handling.deadletter_queue_error_handler import DeadletterQueueErrorHandler
from mersal.retry.fail_fast.default_fail_fast_checker import DefaultFailFastChecker
from mersal.retry.default_retry_strategy import DefaultRetryStrategy
from mersal.retry.retry_strategy_settings import RetryStrategySettings

# Configure error handling components
error_tracker = InMemoryErrorTracker(maximum_failure_times=5)

# Define exceptions that should fail immediately
fail_fast_exceptions = [ValueError, KeyError]
fail_fast_checker = DefaultFailFastChecker(fail_fast_exceptions)

# Create the app with custom error handling
app = Mersal(
    # Other configuration...
    retry_strategy=DefaultRetryStrategy(
        error_tracker=error_tracker,
        error_handler=DeadletterQueueErrorHandler(transport, "error_queue"),
        fail_fast_checker=fail_fast_checker,
    ),
    retry_strategy_settings=RetryStrategySettings(
        error_queue_name="error_queue",
        max_no_of_retries=5
    )
)

Error Tracking

The error tracker component maintains the state of errors for each message:

  • Records exceptions that occur during message processing

  • Determines when a message has failed too many times

  • Supports marking messages as “final” to skip retries

  • Provides access to previous exceptions for diagnostics

Mersal includes an in-memory implementation (InMemoryErrorTracker), but you can implement the ErrorTracker protocol to create a persistent tracker using a database.

from mersal.retry.error_tracking.error_tracker import ErrorTracker
import uuid

class CustomErrorTracker(ErrorTracker):
    async def register_error(self, message_id: uuid.UUID, exception: Exception):
        # Record the error in your persistence store
        ...

    async def clean_up(self, message_id: uuid.UUID):
        # Remove error tracking for this message
        ...

    async def has_failed_too_many_times(self, message_id: uuid.UUID) -> bool:
        # Determine if message has exceeded retry limit
        ...

    async def mark_as_final(self, message_id: uuid.UUID):
        # Mark message as final (no more retries)
        ...

    async def get_exceptions(self, message_id: uuid.UUID) -> list[Exception]:
        # Return all exceptions for this message
        ...

Fail-Fast Strategies

Not all errors should be retried. Some exceptions indicate that retrying will never succeed, such as:

  • Validation errors

  • Authorization failures

  • Malformed message content

The fail-fast checker determines which exceptions should cause immediate failure:

from mersal.retry.fail_fast.default_fail_fast_checker import DefaultFailFastChecker

# Define exceptions that should never be retried
fail_fast_exceptions = [
    ValueError,  # Message validation issues
    PermissionError,  # Authorization failures
    KeyError,  # Missing required data
    TypeError  # Type conversion errors
]

fail_fast_checker = DefaultFailFastChecker(fail_fast_exceptions)

Dead Letter Queue Handling

When a message fails repeatedly, it’s sent to a dead letter queue for later investigation. This prevents the system from getting stuck in endless retry loops.

The default implementation (DeadletterQueueErrorHandler) sends failed messages to a configured error queue:

from mersal.retry.error_handling.deadletter_queue_error_handler import DeadletterQueueErrorHandler

# Create error handler that sends to "error_queue"
error_handler = DeadletterQueueErrorHandler(
    transport,
    error_queue_name="error_queue"
)

The error handler adds the exception details to the message headers before forwarding it, making it easier to diagnose issues.

Examples

Complete Error Handling Configuration
 1from mersal.app import Mersal
 2from mersal.retry.error_tracking.in_memory_error_tracker import InMemoryErrorTracker
 3from mersal.retry.error_handling.deadletter_queue_error_handler import DeadletterQueueErrorHandler
 4from mersal.retry.fail_fast.default_fail_fast_checker import DefaultFailFastChecker
 5from mersal.retry.default_retry_strategy import DefaultRetryStrategy
 6from mersal.retry.retry_strategy_settings import RetryStrategySettings
 7from mersal.transport.in_memory import InMemoryTransport
 8
 9# Define message and handler
10class MyMessage:
11    def __init__(self, value: str):
12        self.value = value
13
14class MyHandler:
15    def __init__(self):
16        self.processed = []
17        self.failed = []
18
19    async def __call__(self, message: MyMessage):
20        if message.value == "fail":
21            raise ValueError("Message processing failed")
22        self.processed.append(message)
23
24# Configure error handling
25transport = InMemoryTransport()
26
27# Configure which exceptions should fail immediately without retries
28fail_fast_exceptions = [KeyError, TypeError]
29fail_fast_checker = DefaultFailFastChecker(fail_fast_exceptions)
30
31# Track errors for up to 3 attempts
32error_tracker = InMemoryErrorTracker(maximum_failure_times=3)
33
34# Create app with custom retry strategy
35app = Mersal(
36    transport=transport,
37    # Configure error handling components
38    retry_strategy=DefaultRetryStrategy(
39        error_tracker=error_tracker,
40        error_handler=DeadletterQueueErrorHandler(transport, "error_queue"),
41        fail_fast_checker=fail_fast_checker,
42    ),
43    retry_strategy_settings=RetryStrategySettings(
44        error_queue_name="error_queue",
45        max_no_of_retries=3
46    )
47)
48
49# Register message handler
50handler = MyHandler()
51app.register_handler(MyMessage, handler)
52
53# Process error queue messages
54error_handler = MyHandler()
55app.subscribe("error_queue", MyMessage, error_handler)
56
57# Main processing loop
58async def process_messages():
59    # This will be retried up to 3 times
60    await app.send("myqueue", MyMessage("normal"))
61
62    # This will fail and go to error queue
63    await app.send("myqueue", MyMessage("fail"))
64
65    # This will fail fast without retries
66    try:
67        await app.send("myqueue", object())  # TypeError
68    except TypeError:
69        print("Failed fast as expected")

Summary

  • Mersal provides a comprehensive error handling system for processing messages

  • Error tracking maintains the state of failed messages

  • Fail-fast checking prevents unnecessary retries for certain exceptions

  • The dead letter queue handles persistently failing messages

  • Default implementations work out of the box, but all components are customizable