From 1e389b17ef2e40371a35a2383e14d8b754e7727d Mon Sep 17 00:00:00 2001 From: Ugur Yilmaz Date: Thu, 16 Apr 2026 18:44:36 -0400 Subject: [PATCH] optimistic concurrency --- .python-version | 1 + src/allocation/adapters/orm.py | 1 + src/allocation/service_layer/unit_of_work.py | 1 - tests/integration/test_uow.py | 39 +++++++++++++++++++- tests/unit/test_services.py | 32 ++++++---------- 5 files changed, 51 insertions(+), 23 deletions(-) create mode 100644 .python-version diff --git a/.python-version b/.python-version new file mode 100644 index 00000000..cc1923a4 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.8 diff --git a/src/allocation/adapters/orm.py b/src/allocation/adapters/orm.py index ea76eed5..8780c954 100644 --- a/src/allocation/adapters/orm.py +++ b/src/allocation/adapters/orm.py @@ -67,6 +67,7 @@ def start_mappers(): model.Product, products, properties={"batches": relationship(batches_mapper)}, + version_id_col=products.c.version_number, ) diff --git a/src/allocation/service_layer/unit_of_work.py b/src/allocation/service_layer/unit_of_work.py index 2cb76e52..c1f2a121 100644 --- a/src/allocation/service_layer/unit_of_work.py +++ b/src/allocation/service_layer/unit_of_work.py @@ -42,7 +42,6 @@ def rollback(self): DEFAULT_SESSION_FACTORY = sessionmaker( bind=create_engine( config.get_postgres_uri(), - isolation_level="REPEATABLE READ", ) ) diff --git a/tests/integration/test_uow.py b/tests/integration/test_uow.py index a95907cf..a409e6ab 100644 --- a/tests/integration/test_uow.py +++ b/tests/integration/test_uow.py @@ -3,10 +3,13 @@ import time import traceback from typing import List + import pytest +from sqlalchemy.orm.exc import StaleDataError + from allocation.domain import model from allocation.service_layer import unit_of_work -from ..random_refs import random_sku, random_batchref, random_orderid +from ..random_refs import random_batchref, random_orderid, random_sku def insert_batch(session, ref, sku, qty, eta, product_version=1): @@ -46,10 +49,42 @@ def test_uow_can_retrieve_a_batch_and_allocate_to_it(session_factory): product.allocate(line) uow.commit() + [[version_after]] = session.execute( + "SELECT version_number FROM products WHERE sku=:sku", + dict(sku="HIPSTER-WORKBENCH"), + ) + assert version_after == 2 + batchref = get_allocated_batch_ref(session, "o1", "HIPSTER-WORKBENCH") assert batchref == "batch1" +def test_commit_fails_with_stale_version_when_row_changed_elsewhere(session_factory): + sku = "OPTIMISTIC-SKU" + session = session_factory() + insert_batch(session, "batch1", sku, 100, None) + session.commit() + + uow_a = unit_of_work.SqlAlchemyUnitOfWork(session_factory) + uow_b = unit_of_work.SqlAlchemyUnitOfWork(session_factory) + uow_a.__enter__() + uow_b.__enter__() + try: + product_a = uow_a.products.get(sku=sku) + product_b = uow_b.products.get(sku=sku) + assert product_a.version_number == product_b.version_number == 1 + + product_a.allocate(model.OrderLine("order-a", sku, 10)) + uow_a.commit() + + product_b.allocate(model.OrderLine("order-b", sku, 10)) + with pytest.raises(StaleDataError): + uow_b.commit() + finally: + uow_b.__exit__(None, None, None) + uow_a.__exit__(None, None, None) + + def test_rolls_back_uncommitted_work_by_default(session_factory): uow = unit_of_work.SqlAlchemyUnitOfWork(session_factory) with uow: @@ -111,7 +146,7 @@ def test_concurrent_updates_to_version_are_not_allowed(postgres_session_factory) ) assert version == 2 [exception] = exceptions - assert "could not serialize access due to concurrent update" in str(exception) + assert isinstance(exception, StaleDataError) orders = session.execute( "SELECT orderid FROM allocations" diff --git a/tests/unit/test_services.py b/tests/unit/test_services.py index 0366fb2b..814957ed 100644 --- a/tests/unit/test_services.py +++ b/tests/unit/test_services.py @@ -1,19 +1,22 @@ -from unittest import mock import pytest from allocation.adapters import repository from allocation.service_layer import services, unit_of_work class FakeRepository(repository.AbstractRepository): - def __init__(self, products): - super().__init__() - self._products = set(products) + def __init__(self, batches): + self._batches = set(batches) + self.seen = set() - def _add(self, product): - self._products.add(product) + def add(self, batch): + self._batches.add(batch) + self.seen.add(batch) - def _get(self, sku): - return next((p for p in self._products if p.sku == sku), None) + def get(self, sku): + return next((b for b in self._batches if b.sku == sku), None) + + def list(self): + return list(self._batches) class FakeUnitOfWork(unit_of_work.AbstractUnitOfWork): @@ -28,6 +31,7 @@ def rollback(self): pass + def test_add_batch_for_new_product(): uow = FakeUnitOfWork() services.add_batch("b1", "CRUNCHY-ARMCHAIR", 100, None, uow) @@ -62,15 +66,3 @@ def test_allocate_commits(): services.add_batch("b1", "OMINOUS-MIRROR", 100, None, uow) services.allocate("o1", "OMINOUS-MIRROR", 10, uow) assert uow.committed - - -def test_sends_email_on_out_of_stock_error(): - uow = FakeUnitOfWork() - services.add_batch("b1", "POPULAR-CURTAINS", 9, None, uow) - - with mock.patch("allocation.adapters.email.send_mail") as mock_send_mail: - services.allocate("o1", "POPULAR-CURTAINS", 10, uow) - assert mock_send_mail.call_args == mock.call( - "stock@made.com", - f"Out of stock for POPULAR-CURTAINS", - )