Unit of Work

Introduction

The usual preferred outcome after handling a message is to either commit all the changes or rollback all the changes. This is to ensure atomic operations.

While some database systems provide this via atomic transactions, the concept of “committing” is not just relevant to databases.

A message handler might have some other non-db side-effect that is executed immediately but then has to be reversed if a rollback happens. Another example would be a message that is handled by multiple handlers, having the unit of work ensures the work done by all handlers to be committed altogether or none at all (this can be done other ways as well, for example; sharing the database transaction between the handlers which is what the uow is doing anyway!)

There can be other scenarios but in short; the unit of work pattern allows message handling to be wrapped in a transaction. If the transaction is committed/rolledback then certain actions can be executed (usually database related actions).

Usage

Notes

Initialization

The unit of work pattern can be used in Mersal by passing an instance of UnitOfWorkConfig during initialization.

# incomplete code

from mersal.unit_of_work import UnitOfWorkConfig
from mersal.app import Mersal

app = Mersal(unit_of_work=UnitOfWorkConfig(...))

The config requires several inputs, how to create the unit of work object, what to do on commit/rollback/close. The examples below will show a usecase of SQLAlchemyUnitOfWork that is provided and ready to be used.

Accessing the unit of work

The unit of work object will be injected in TransactionContext using the key “uow”. This is useful as it can be retrieved within message handlers.

# incomplete code,

from mersal.transport import TransactionContext
from mersal.pipeline import MessageContext

class MessageHandler:
    def __init__(message_context: MessageContext):
        self.message_context = message_context

    async def __call__(self, message):
        transaction_context = self.message_context.transaction_context
        uow = transaction_context.items["uow"]

It’s important to not commit the unit of work within a message handler or commit/cancel the database connection it may contain.

commit_with_transaction

It is worth noting what the commit_with_transaction property means in UnitOfWorkConfig (this property should probably be clarified/renamed, too many “transactions” words being thrown around). commit_with_transaction refers to the point at which the unit of work commit/rollback happens. By default, this is set to False which means that the unit of work commit will be called after all the message handlers are invoked but before the TransactionContext is committed. If this is set to True, the unit of work will only commit as part of the TransactionContext commit actions.

Without including any other features, changing commit_with_transaction makes no difference. However, when using Outbox or Idempotency, then commit_with_transaction must be set to True. Those features are implemented using the TransactionContext commit actions. If the unit of works commits early (i.e when commit_with_transaction is False), then those features will not work!

Examples

Unit of Work with SQLAlchemy
unit_of_work_sqlalchemy.py
 1from dataclasses import dataclass
 2
 3from mersal_alchemy import (
 4    SQLAlchemyUnitOfWork,
 5    default_sqlalchemy_close_action,
 6    default_sqlalchemy_commit_action,
 7    default_sqlalchemy_rollback_action,
 8)
 9from mersal_msgspec import MsgspecSerializer
10from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
11from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
12
13from mersal.activation import BuiltinHandlerActivator
14from mersal.app import Mersal
15from mersal.pipeline import MessageContext
16from mersal.transport.in_memory import (
17    InMemoryNetwork,
18    InMemoryTransportConfig,
19)
20from mersal.unit_of_work import UnitOfWorkConfig
21
22__all__ = (
23    "AddUser",
24    "AddUserMessageHandler",
25    "Base",
26    "User",
27    "app_factory",
28)
29
30
31@dataclass
32class AddUser:
33    username: str
34
35
36NETWORK = InMemoryNetwork()
37
38
39class Base(DeclarativeBase):
40    pass
41
42
43class User(Base):
44    __tablename__ = "users"
45
46    username: Mapped[str] = mapped_column(primary_key=True)
47
48
49class AddUserMessageHandler:
50    def __init__(
51        self,
52        mersal: Mersal,
53        message_context: MessageContext,
54    ) -> None:
55        self.mersal = mersal
56        transaction_context = message_context.transaction_context
57        uow: SQLAlchemyUnitOfWork = transaction_context.items["uow"]
58        self.session = uow.session
59
60    async def __call__(self, message: AddUser) -> None:
61        user = User(username=message.username)
62        self.session.add(user)
63        # Remember, do not commit uow or session here
64
65
66engine = create_async_engine("sqlite+aiosqlite://", echo=True)
67
68
69async def app_factory() -> Mersal:
70    async with engine.begin() as conn:
71        await conn.run_sync(Base.metadata.create_all)
72    session_factory = async_sessionmaker(engine)
73
74    async def uow_factory(_: MessageContext) -> SQLAlchemyUnitOfWork:
75        return SQLAlchemyUnitOfWork(async_session_maker=session_factory)
76
77    unit_of_work = UnitOfWorkConfig(
78        uow_factory=uow_factory,
79        commit_action=default_sqlalchemy_commit_action,
80        rollback_action=default_sqlalchemy_rollback_action,
81        close_action=default_sqlalchemy_close_action,
82    )
83    queue_address = "test-queue"
84    transport = InMemoryTransportConfig(network=NETWORK, input_queue_address=queue_address).transport
85    activator = BuiltinHandlerActivator()
86    activator.register(
87        AddUser,
88        lambda message_context, mersal: AddUserMessageHandler(
89            mersal=mersal,
90            message_context=message_context,
91        ),
92    )
93    return Mersal(
94        "m1",
95        activator,
96        transport=transport,
97        unit_of_work=unit_of_work,
98        message_body_serializer=MsgspecSerializer(object_types={AddUser}),
99    )

Looking at the emphasized lines, the database session is created inside the unit of work. This unit of work is then accessed in the message handler. It contains the session to interact with the database.

Summary

  • Remember to set commit_with_transaction=True when using the outbox or idempotency features.

Internal Implementation

N/A

Road Map

See Unit of Work Project

References

Further Reading