Outbox¶
Introduction¶
When handling a message, it’s typical to send messages (send commands or publish events) to various Mersal apps (including the local app that is currently handling the message). There can be two scenarios where the final outcome results in a broken state.
Message handling fails (e.g. data is not persisted) but the messages are sent.
Message handling succeeds but the sent messages fail.
Both are undesirable states; the first can result in publishing false information to other systems (e.g. RegisterStudentMessageHandler
that fails to persist data but publishes a StudentRegisteredEvent
to other systems). The second will lead to broken business processes (e.g a finance system that is subscribing to StudentRegisteredEvent
to complete financial registration will fail to function.)
The solution is to persist outgoing messages along the business data using the same persistence transaction. A successful transaction means the business data and the outgoing messages are stored. A failed transactions means nothing is stored.
The outgoing messages are said to be stored in an outbox hence the name “Outbox pattern”
Since there needs to be a transaction, using the outbox pattern requires a persistence mechanism that supports atomic transactions (see [1] [page 145-146] for implementing this pattern with NoSQL databases).
Usage¶
Notes¶
The outbox pattern can be used in Mersal by passing an instance of OutboxConfig
during initialization. The configuration requires an outbox persistence mechanism that implements OutboxStorage
.
# incomplete code
from mersal.outbox import OutboxConfig
from mersal.app import Mersal
app = Mersal(outbox_config=OutboxConfig(...))
Provided implementation of OutboxStorage
:
SQLAlchemyOutboxStorage
InMemoryOutboxStorage
, only used for testing since it has no concept of a transaction.
When using the outbox feature, it’s important to not commit nor close the database transaction within the message handlers. That action should be handled by the outbox storage or during the unit of work step.
Examples¶
Outbox with SQLAlchemy
1from dataclasses import dataclass, field
2from typing import Any, cast
3
4from mersal_alchemy.sqlalchemy_outbox_storage import (
5 SQLAlchemyOutboxStorageConfig,
6)
7from mersal_msgspec import MsgspecSerializer
8from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
9from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
10
11from mersal.activation import BuiltinHandlerActivator
12from mersal.app import Mersal
13from mersal.outbox import OutboxConfig
14from mersal.pipeline import MessageContext
15from mersal.transport.in_memory import (
16 InMemoryNetwork,
17 InMemoryTransportConfig,
18)
19
20__all__ = (
21 "AddUser",
22 "AddUserMessageHandler",
23 "Base",
24 "PromoteUser",
25 "PromoteUserMessageHandler",
26 "PromotedUsersRepo",
27 "User",
28 "app_factory",
29)
30
31
32@dataclass
33class AddUser:
34 username: str
35
36
37@dataclass
38class PromoteUser:
39 username: str
40
41
42NETWORK = InMemoryNetwork()
43
44
45@dataclass
46class PromotedUsersRepo:
47 users: list[str] = field(default_factory=list)
48
49
50PROMOTED_USERS = PromotedUsersRepo()
51
52
53class Base(DeclarativeBase):
54 pass
55
56
57class User(Base):
58 __tablename__ = "users"
59
60 username: Mapped[str] = mapped_column(primary_key=True)
61
62
63class AddUserMessageHandler:
64 def __init__(
65 self,
66 mersal: Mersal,
67 message_context: MessageContext,
68 session_factory: async_sessionmaker,
69 ) -> None:
70 self.mersal = mersal
71 self.session = session_factory()
72 transaction_context = message_context.transaction_context
73 transaction_context.items["db-session"] = self.session
74
75 async def __call__(self, message: AddUser) -> None:
76 user = User(username=message.username)
77 self.session.add(user)
78
79 # Remember, do not commit session
80
81 await self.mersal.send_local(PromoteUser(username=message.username))
82
83
84class PromoteUserMessageHandler:
85 async def __call__(self, message: PromoteUser) -> Any:
86 # please don't do this in production
87 PROMOTED_USERS.users.append(message.username)
88
89
90async def app_factory() -> Mersal:
91 engine = create_async_engine("sqlite+aiosqlite://", echo=True)
92 async with engine.begin() as conn:
93 await conn.run_sync(Base.metadata.create_all)
94 session_factory = async_sessionmaker(engine)
95
96 outbox_storage = SQLAlchemyOutboxStorageConfig(
97 async_session_factory=session_factory,
98 table_name="outbox",
99 session_extractor=lambda transaction_context: cast("AsyncSession", transaction_context.items.get("db-session")),
100 ).storage
101 queue_address = "test-queue"
102 transport = InMemoryTransportConfig(network=NETWORK, input_queue_address=queue_address).transport
103 activator = BuiltinHandlerActivator()
104 activator.register(
105 AddUser,
106 lambda message_context, mersal: AddUserMessageHandler(
107 mersal=mersal,
108 message_context=message_context,
109 session_factory=session_factory,
110 ),
111 )
112 activator.register(
113 PromoteUser,
114 lambda _, __: PromoteUserMessageHandler(),
115 )
116 return Mersal(
117 "m1",
118 activator,
119 transport=transport,
120 outbox=OutboxConfig(storage=outbox_storage),
121 message_body_serializer=MsgspecSerializer(object_types={AddUser, PromoteUser}),
122 )
Looking at the emphasized lines, the database session is created upon handling the message. This same session needs to be shared with the outbox storage. This is done using the transaction context as shown. Other configurations are possible.
Outbox with Unit of Work (SQLAlchemy Unit of Work)
TODO
Outbox with Idempotency
TODO
Summary¶
Don’t close the database transaction in the message handler.
Use Mersal idempotency feature with the outbox feature. Set it to completely skip message handling.
Preferable to use the outbox feature with the unit of work pattern.
Internal Implementation¶
Once the outgoing messages are persisted in the outbox. A relay checks the outbox at a fixed interval for any stored messages (currently set at 1 second).
Ref [1] and [2] discuss the implementation and alternatives. One alternative is a push based relay. This type of relay needs to be supported by the database.
Road Map¶
See Outbox Project