Error Handling¶
Introduction¶
Handling errors in message-based systems is crucial for building robust applications. Mersal provides a comprehensive error handling system with features for:
Tracking and managing errors
Implementing retry strategies
Identifying messages that should fail immediately (fail-fast)
Handling poisonous messages via dead letter queues
These features help ensure that your application can gracefully handle failures, improve reliability, and avoid getting stuck in endless retry loops.
Usage¶
Notes¶
Default Error Handling¶
Mersal comes with a default error handling strategy that provides these key capabilities:
Error Tracking: Records errors for each message and tracks how many times it has failed
Retry Logic: Automatically retries failed messages up to a configurable number of times
Fail-Fast Detection: Identifies exceptions that should immediately fail without retries
Dead Letter Queue: Forwards messages that fail repeatedly to a dedicated error queue
The default retry strategy is implemented in the DefaultRetryStrategy
class, which creates a DefaultRetryStep
in the message processing pipeline.
Customizing Error Handling¶
You can customize the error handling mechanism by configuring these components:
Error Tracker: Tracks errors for messages and determines when they’ve failed too many times
Error Handler: Handles poisonous messages, typically by sending them to a dead letter queue
Fail-Fast Checker: Determines which exceptions should cause immediate failure without retries
Here’s an example of configuring error handling when creating a Mersal application:
from mersal.app import Mersal
from mersal.retry.error_tracking.in_memory_error_tracker import InMemoryErrorTracker
from mersal.retry.error_handling.deadletter_queue_error_handler import DeadletterQueueErrorHandler
from mersal.retry.fail_fast.default_fail_fast_checker import DefaultFailFastChecker
from mersal.retry.default_retry_strategy import DefaultRetryStrategy
from mersal.retry.retry_strategy_settings import RetryStrategySettings
# Configure error handling components
error_tracker = InMemoryErrorTracker(maximum_failure_times=5)
# Define exceptions that should fail immediately
fail_fast_exceptions = [ValueError, KeyError]
fail_fast_checker = DefaultFailFastChecker(fail_fast_exceptions)
# Create the app with custom error handling
app = Mersal(
# Other configuration...
retry_strategy=DefaultRetryStrategy(
error_tracker=error_tracker,
error_handler=DeadletterQueueErrorHandler(transport, "error_queue"),
fail_fast_checker=fail_fast_checker,
),
retry_strategy_settings=RetryStrategySettings(
error_queue_name="error_queue",
max_no_of_retries=5
)
)
Error Tracking¶
The error tracker component maintains the state of errors for each message:
Records exceptions that occur during message processing
Determines when a message has failed too many times
Supports marking messages as “final” to skip retries
Provides access to previous exceptions for diagnostics
Mersal includes an in-memory implementation (InMemoryErrorTracker), but you can implement the ErrorTracker protocol to create a persistent tracker using a database.
from mersal.retry.error_tracking.error_tracker import ErrorTracker
import uuid
class CustomErrorTracker(ErrorTracker):
async def register_error(self, message_id: uuid.UUID, exception: Exception):
# Record the error in your persistence store
...
async def clean_up(self, message_id: uuid.UUID):
# Remove error tracking for this message
...
async def has_failed_too_many_times(self, message_id: uuid.UUID) -> bool:
# Determine if message has exceeded retry limit
...
async def mark_as_final(self, message_id: uuid.UUID):
# Mark message as final (no more retries)
...
async def get_exceptions(self, message_id: uuid.UUID) -> list[Exception]:
# Return all exceptions for this message
...
Fail-Fast Strategies¶
Not all errors should be retried. Some exceptions indicate that retrying will never succeed, such as:
Validation errors
Authorization failures
Malformed message content
The fail-fast checker determines which exceptions should cause immediate failure:
from mersal.retry.fail_fast.default_fail_fast_checker import DefaultFailFastChecker
# Define exceptions that should never be retried
fail_fast_exceptions = [
ValueError, # Message validation issues
PermissionError, # Authorization failures
KeyError, # Missing required data
TypeError # Type conversion errors
]
fail_fast_checker = DefaultFailFastChecker(fail_fast_exceptions)
Dead Letter Queue Handling¶
When a message fails repeatedly, it’s sent to a dead letter queue for later investigation. This prevents the system from getting stuck in endless retry loops.
The default implementation (DeadletterQueueErrorHandler) sends failed messages to a configured error queue:
from mersal.retry.error_handling.deadletter_queue_error_handler import DeadletterQueueErrorHandler
# Create error handler that sends to "error_queue"
error_handler = DeadletterQueueErrorHandler(
transport,
error_queue_name="error_queue"
)
The error handler adds the exception details to the message headers before forwarding it, making it easier to diagnose issues.
Examples¶
Complete Error Handling Configuration
1from mersal.app import Mersal
2from mersal.retry.error_tracking.in_memory_error_tracker import InMemoryErrorTracker
3from mersal.retry.error_handling.deadletter_queue_error_handler import DeadletterQueueErrorHandler
4from mersal.retry.fail_fast.default_fail_fast_checker import DefaultFailFastChecker
5from mersal.retry.default_retry_strategy import DefaultRetryStrategy
6from mersal.retry.retry_strategy_settings import RetryStrategySettings
7from mersal.transport.in_memory import InMemoryTransport
8
9# Define message and handler
10class MyMessage:
11 def __init__(self, value: str):
12 self.value = value
13
14class MyHandler:
15 def __init__(self):
16 self.processed = []
17 self.failed = []
18
19 async def __call__(self, message: MyMessage):
20 if message.value == "fail":
21 raise ValueError("Message processing failed")
22 self.processed.append(message)
23
24# Configure error handling
25transport = InMemoryTransport()
26
27# Configure which exceptions should fail immediately without retries
28fail_fast_exceptions = [KeyError, TypeError]
29fail_fast_checker = DefaultFailFastChecker(fail_fast_exceptions)
30
31# Track errors for up to 3 attempts
32error_tracker = InMemoryErrorTracker(maximum_failure_times=3)
33
34# Create app with custom retry strategy
35app = Mersal(
36 transport=transport,
37 # Configure error handling components
38 retry_strategy=DefaultRetryStrategy(
39 error_tracker=error_tracker,
40 error_handler=DeadletterQueueErrorHandler(transport, "error_queue"),
41 fail_fast_checker=fail_fast_checker,
42 ),
43 retry_strategy_settings=RetryStrategySettings(
44 error_queue_name="error_queue",
45 max_no_of_retries=3
46 )
47)
48
49# Register message handler
50handler = MyHandler()
51app.register_handler(MyMessage, handler)
52
53# Process error queue messages
54error_handler = MyHandler()
55app.subscribe("error_queue", MyMessage, error_handler)
56
57# Main processing loop
58async def process_messages():
59 # This will be retried up to 3 times
60 await app.send("myqueue", MyMessage("normal"))
61
62 # This will fail and go to error queue
63 await app.send("myqueue", MyMessage("fail"))
64
65 # This will fail fast without retries
66 try:
67 await app.send("myqueue", object()) # TypeError
68 except TypeError:
69 print("Failed fast as expected")
Summary¶
Mersal provides a comprehensive error handling system for processing messages
Error tracking maintains the state of failed messages
Fail-fast checking prevents unnecessary retries for certain exceptions
The dead letter queue handles persistently failing messages
Default implementations work out of the box, but all components are customizable