Outbox

Introduction

When handling a message, it’s typical to send messages (send commands or publish events) to various Mersal apps (including the local app that is currently handling the message). There can be two scenarios where the final outcome results in a broken state.

  1. Message handling fails (e.g. data is not persisted) but the messages are sent.

  2. Message handling succeeds but the sent messages fail.

Both are undesirable states; the first can result in publishing false information to other systems (e.g. RegisterStudentMessageHandler that fails to persist data but publishes a StudentRegisteredEvent to other systems). The second will lead to broken business processes (e.g a finance system that is subscribing to StudentRegisteredEvent to complete financial registration will fail to function.)

The solution is to persist outgoing messages along the business data using the same persistence transaction. A successful transaction means the business data and the outgoing messages are stored. A failed transactions means nothing is stored.

The outgoing messages are said to be stored in an outbox hence the name “Outbox pattern”

Since there needs to be a transaction, using the outbox pattern requires a persistence mechanism that supports atomic transactions (see [1] [page 145-146] for implementing this pattern with NoSQL databases).

Usage

Notes

The outbox pattern can be used in Mersal by passing an instance of OutboxConfig during initialization. The configuration requires an outbox persistence mechanism that implements OutboxStorage.

# incomplete code

from mersal.outbox import OutboxConfig
from mersal.app import Mersal

app = Mersal(outbox_config=OutboxConfig(...))

Provided implementation of OutboxStorage:

  • SQLAlchemyOutboxStorage

  • InMemoryOutboxStorage, only used for testing since it has no concept of a transaction.

When using the outbox feature, it’s important to not commit nor close the database transaction within the message handlers. That action should be handled by the outbox storage or during the unit of work step.

Examples

Outbox with SQLAlchemy
outbox_sqlalchemy.py
  1from dataclasses import dataclass, field
  2from typing import Any, cast
  3
  4from mersal_alchemy.sqlalchemy_outbox_storage import (
  5    SQLAlchemyOutboxStorageConfig,
  6)
  7from mersal_msgspec import MsgspecSerializer
  8from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
  9from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
 10
 11from mersal.activation import BuiltinHandlerActivator
 12from mersal.app import Mersal
 13from mersal.outbox import OutboxConfig
 14from mersal.pipeline import MessageContext
 15from mersal.transport.in_memory import (
 16    InMemoryNetwork,
 17    InMemoryTransportConfig,
 18)
 19
 20__all__ = (
 21    "AddUser",
 22    "AddUserMessageHandler",
 23    "Base",
 24    "PromoteUser",
 25    "PromoteUserMessageHandler",
 26    "PromotedUsersRepo",
 27    "User",
 28    "app_factory",
 29)
 30
 31
 32@dataclass
 33class AddUser:
 34    username: str
 35
 36
 37@dataclass
 38class PromoteUser:
 39    username: str
 40
 41
 42NETWORK = InMemoryNetwork()
 43
 44
 45@dataclass
 46class PromotedUsersRepo:
 47    users: list[str] = field(default_factory=list)
 48
 49
 50PROMOTED_USERS = PromotedUsersRepo()
 51
 52
 53class Base(DeclarativeBase):
 54    pass
 55
 56
 57class User(Base):
 58    __tablename__ = "users"
 59
 60    username: Mapped[str] = mapped_column(primary_key=True)
 61
 62
 63class AddUserMessageHandler:
 64    def __init__(
 65        self,
 66        mersal: Mersal,
 67        message_context: MessageContext,
 68        session_factory: async_sessionmaker,
 69    ) -> None:
 70        self.mersal = mersal
 71        self.session = session_factory()
 72        transaction_context = message_context.transaction_context
 73        transaction_context.items["db-session"] = self.session
 74
 75    async def __call__(self, message: AddUser) -> None:
 76        user = User(username=message.username)
 77        self.session.add(user)
 78
 79        # Remember, do not commit session
 80
 81        await self.mersal.send_local(PromoteUser(username=message.username))
 82
 83
 84class PromoteUserMessageHandler:
 85    async def __call__(self, message: PromoteUser) -> Any:
 86        # please don't do this in production
 87        PROMOTED_USERS.users.append(message.username)
 88
 89
 90async def app_factory() -> Mersal:
 91    engine = create_async_engine("sqlite+aiosqlite://", echo=True)
 92    async with engine.begin() as conn:
 93        await conn.run_sync(Base.metadata.create_all)
 94    session_factory = async_sessionmaker(engine)
 95
 96    outbox_storage = SQLAlchemyOutboxStorageConfig(
 97        async_session_factory=session_factory,
 98        table_name="outbox",
 99        session_extractor=lambda transaction_context: cast("AsyncSession", transaction_context.items.get("db-session")),
100    ).storage
101    queue_address = "test-queue"
102    transport = InMemoryTransportConfig(network=NETWORK, input_queue_address=queue_address).transport
103    activator = BuiltinHandlerActivator()
104    activator.register(
105        AddUser,
106        lambda message_context, mersal: AddUserMessageHandler(
107            mersal=mersal,
108            message_context=message_context,
109            session_factory=session_factory,
110        ),
111    )
112    activator.register(
113        PromoteUser,
114        lambda _, __: PromoteUserMessageHandler(),
115    )
116    return Mersal(
117        "m1",
118        activator,
119        transport=transport,
120        outbox=OutboxConfig(storage=outbox_storage),
121        message_body_serializer=MsgspecSerializer(object_types={AddUser, PromoteUser}),
122    )

Looking at the emphasized lines, the database session is created upon handling the message. This same session needs to be shared with the outbox storage. This is done using the transaction context as shown. Other configurations are possible.

Outbox with Unit of Work (SQLAlchemy Unit of Work)

TODO

Outbox with Idempotency

TODO

Summary

  • Don’t close the database transaction in the message handler.

  • Use Mersal idempotency feature with the outbox feature. Set it to completely skip message handling.

  • Preferable to use the outbox feature with the unit of work pattern.

Internal Implementation

Once the outgoing messages are persisted in the outbox. A relay checks the outbox at a fixed interval for any stored messages (currently set at 1 second).

Ref [1] and [2] discuss the implementation and alternatives. One alternative is a push based relay. This type of relay needs to be supported by the database.

Road Map

See Outbox Project

References

Further Reading