outbox

class mersal.outbox.OutboxConfig

Bases: object

Configuration for the outbox feature.

__init__(storage: OutboxStorage) None
storage: OutboxStorage

Sets the Outbox storage.

class mersal.outbox.OutboxForwarder

Bases: object

Send messages stored in the outbox.

The class checks the outbox at a fixed period, when it finds any outbox messages, it sends them with a retry mechanism in the case of failure

__init__(periodic_task_factory: PeriodicAsyncTaskFactory, transport: Transport, outbox_storage: OutboxStorage, forwarding_period: float = 1) None

Initialize OutboxForwader.

Parameters:
  • periodic_task_factory – Creates an instance of PeriodicAsyncTask. The instance is responsible for running the periodic query & send.

  • transport – The relevant Transport.

  • outbox_storage – A storage for outbox messages that implements OutboxStorage.

  • forwarding_period – Period for rechecking the outbox storage (in seconds). Default to 1 second.

class mersal.outbox.OutboxMessage

Bases: object

Outbox message that is stored in the storage.

It is a simple wrapper around a transport message and its destination in addition to a unique identity given to each outbox message.

__init__(destination_address: str, headers: MessageHeaders, body: bytes, outbox_message_id: int | None = None) None
body: bytes

Original body of the transport message.

destination_address: str

Destination address for the outbox message

headers: MessageHeaders

Message headers for the outbox message.

outbox_message_id: int | None = None

Outbox message identifier.

The default being None is to make SQLAlchemy happy https://github.com/mersal-org/mersal/issues/17

transport_message() TransportMessage

Converts the message back to a TransportMessage.

class mersal.outbox.OutboxMessageBatch

Bases: Sequence[OutboxMessage]

__init__(messages: Iterable[OutboxMessage], complete_action: Callable[[...], Awaitable[Any]], close_action: Callable[[...], Awaitable[Any]]) None
class mersal.outbox.OutboxStorage

Bases: Protocol

A protocol that any Outbox storage must implement.

async __call__() None

Called upon setting up the outbox feature.

Can be used to run any initialization required by the storage. For example, creating the outbox database table or making sure it already exists.

__init__(*args, **kwargs)
async get_next_message_batch() OutboxMessageBatch

Provide messages stored in the outbox.

async save(outgoing_messages: Sequence[OutgoingMessage], transaction_context: TransactionContext) None

Save outbox messages.

The TransactionContext can be used to obtain objects related to the current message being handled. For example, to obtain the database transaction/session.

Parameters:
  • outgoing_messages – A list of messages to be stored in the outbox.

  • transaction_context – The TransactionContext for the message that is currently being handled.