From 8459c079508d6ed50bef5cf03a4ea5e299c98d8e Mon Sep 17 00:00:00 2001
From: Valentin Lorentz <vlorentz@softwareheritage.org>
Date: Wed, 19 Mar 2025 12:05:54 +0100
Subject: [PATCH 1/2] Make migration row insertion/update atomic

---
 swh/storage/cassandra/cql.py            | 57 ++++++++++++++++++++-----
 swh/storage/cassandra/migrations.py     | 57 ++++++++++++++++++++-----
 swh/storage/tests/test_cli_cassandra.py |  9 ++--
 3 files changed, 99 insertions(+), 24 deletions(-)

diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
index eba4ffcd..9738edbb 100644
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -224,23 +224,23 @@ TArg = TypeVar("TArg")
 TSelf = TypeVar("TSelf")
 
 
-def _insert_query(row_class: Type[BaseRow]) -> str:
+def _insert_query(row_class: Type[BaseRow], if_not_exists: bool = False) -> str:
     columns = row_class.cols()
     return (
         f"INSERT INTO {{keyspace}}.{row_class.TABLE} ({', '.join(columns)}) "
         f"VALUES ({', '.join('?' for _ in columns)})"
-    )
+    ) + (" IF NOT EXISTS" if if_not_exists else "")
 
 
 def _prepared_insert_statement(
-    row_class: Type[BaseRow],
+    row_class: Type[BaseRow], if_not_exists: bool = False
 ) -> Callable[
     [Callable[[TSelf, TArg, NamedArg(Any, "statement")], TRet]],  # noqa
     Callable[[TSelf, TArg], TRet],
 ]:
     """Shorthand for using `_prepared_statement` for `INSERT INTO`
     statements."""
-    return _prepared_statement(_insert_query(row_class))
+    return _prepared_statement(_insert_query(row_class, if_not_exists=if_not_exists))
 
 
 def _prepared_exists_statement(
@@ -435,8 +435,8 @@ class CqlRunner:
         except (ReadTimeout, WriteTimeout) as e:
             raise QueryTimeout(*e.args) from None
 
-    def _add_one(self, statement, obj: "DataclassInstance") -> None:
-        self.execute_with_retries(statement, dataclasses.astuple(obj))
+    def _add_one(self, statement, obj: "DataclassInstance") -> ResultSet:
+        return self.execute_with_retries(statement, dataclasses.astuple(obj))
 
     def _add_many(self, statement, objs: Sequence[BaseRow]) -> None:
         tables = {obj.TABLE for obj in objs}
@@ -503,19 +503,56 @@ class CqlRunner:
     # 'migration' table
     ##########################
 
-    @_prepared_insert_statement(MigrationRow)
-    def migration_add_one(self, migration: MigrationRow, *, statement) -> None:
-        self._add_one(statement, migration)
+    @_prepared_insert_statement(MigrationRow, if_not_exists=True)
+    def migration_add_one_if_not_exists(
+        self, migration: MigrationRow, *, statement
+    ) -> None:
+        """Adds a :class:`MigrationRow`.
+
+        Raises:
+            swh.storage.migrations.MigrationAlreadyExists: if there is already a known
+                migration with that id.
+        """
+        from .migrations import MigrationAlreadyExists
+
+        (result,) = self._add_one(statement, migration)
+        if not result["[applied]"]:
+            raise MigrationAlreadyExists([migration.id])
 
-    @_prepared_insert_statement(MigrationRow)
+    @_prepared_insert_statement(MigrationRow, if_not_exists=True)
     def migration_add_concurrent(
         self, migrations: List[MigrationRow], *, statement
     ) -> None:
+        """Inserts or updates the given migration rows"""
         if len(migrations) == 0:
             # nothing to do
             return
         self._add_many(statement, migrations)
 
+    @_prepared_statement(
+        f"UPDATE {{keyspace}}.{MigrationRow.TABLE} "
+        f"SET dependencies=?, min_read_version=?, status=? "
+        f"WHERE id=? "
+        f"IF status = ?"
+    )
+    def migration_update_one(
+        self, migration: MigrationRow, expected_status: str, *, statement
+    ) -> None:
+        """Updates the dependencies/min_read_version/status of an existing migration.
+        Errors if the migration is not already in the database, or does not have the
+        ``expected_status``.
+        """
+        (row,) = self.execute_with_retries(
+            statement,
+            (
+                migration.dependencies,
+                migration.min_read_version,
+                migration.status,
+                migration.id,
+                expected_status,
+            ),
+        )
+
     @_prepared_select_statement(MigrationRow, "WHERE id IN ?")
     def migration_get(self, migration_ids, *, statement) -> Iterable[MigrationRow]:
         return map(
diff --git a/swh/storage/cassandra/migrations.py b/swh/storage/cassandra/migrations.py
index 424536d9..00c54eef 100644
--- a/swh/storage/cassandra/migrations.py
+++ b/swh/storage/cassandra/migrations.py
@@ -16,6 +16,28 @@ from .model import MigrationRow
 logger = logging.getLogger(__name__)
 
 
+class BaseMigrationException(Exception):
+    """Base class for exceptions related to migrations."""
+
+
+class MigrationAlreadyExists(BaseMigrationException):
+    """Raised when trying to insert a rows in the ``migration`` table, but the id
+    already exists.
+
+    Typically happens when ``swh storage cassandra upgrade`` runs twice at the same time.
+    """
+
+
+class UnexpectedMigrationStatusExists(BaseMigrationException):
+    """Raised when trying to change a migration status from state A to state B,
+    but it was not in state A.
+
+    Typically happens when ``swh storage cassandra upgrade`` runs twice at the same time,
+    or when ``swh storage cassandra mark-upgraded`` was used while
+    ``swh storage cassandra upgrade`` is running.
+    """
+
+
 class MigrationStatus(enum.Enum):
     PENDING = "pending"
     """The migration was not applied yet"""
@@ -128,14 +150,20 @@ def apply_migrations(
                 )
                 remaining_migrations_missing_dependencies.append(migration)
                 continue
-            cql_runner.migration_add_one(
-                MigrationRow(
-                    id=migration.id,
-                    dependencies=migration.dependencies,
-                    min_read_version=migration.min_read_version,
-                    status=MigrationStatus.RUNNING.value,
-                )
+
+            row = MigrationRow(
+                id=migration.id,
+                dependencies=migration.dependencies,
+                min_read_version=migration.min_read_version,
+                status=MigrationStatus.RUNNING.value,
             )
+            try:
+                cql_runner.migration_add_one_if_not_exists(row)
+            except MigrationAlreadyExists:
+                cql_runner.migration_update_one(
+                    row, expected_status=MigrationStatus.PENDING.value
+                )
+
             if migration.script is None:
                 logger.info("Skipping %s", migration.id)
                 if migration.help:
@@ -144,13 +172,14 @@ def apply_migrations(
             else:
                 logger.info("Running %s...", migration.id)
                 migration.script(cql_runner)
-                cql_runner.migration_add_one(
+                cql_runner.migration_update_one(
                     MigrationRow(
                         id=migration.id,
                         dependencies=migration.dependencies,
                         min_read_version=migration.min_read_version,
                         status=MigrationStatus.COMPLETED.value,
-                    )
+                    ),
+                    expected_status=MigrationStatus.RUNNING.value,
                 )
                 logger.info("Done.")
                 statuses[migration.id] = MigrationStatus.COMPLETED
@@ -193,5 +222,11 @@ def create_migrations_table_if_needed(cql_runner: CqlRunner) -> None:
             min_read_version=migration.min_read_version,
             status=MigrationStatus.COMPLETED.value,
         )
-        cql_runner.migration_add_concurrent([migration_row])
-        logger.info("'migrations' table created.")
+        try:
+            cql_runner.migration_add_one_if_not_exists(migration_row)
+        except MigrationAlreadyExists:
+            logger.warning(
+                "'migration' table was created by an other process at the same time"
+            )
+        else:
+            logger.info("'migrations' table created.")
diff --git a/swh/storage/tests/test_cli_cassandra.py b/swh/storage/tests/test_cli_cassandra.py
index 0d348cc3..83d66f7a 100644
--- a/swh/storage/tests/test_cli_cassandra.py
+++ b/swh/storage/tests/test_cli_cassandra.py
@@ -141,9 +141,12 @@ class TestCassandraCli:
                 f"SELECT * FROM {swh_storage_cassandra_keyspace}.test_table", []
             )
         finally:
-            swh_storage._cql_runner.execute_with_retries(
-                f"DROP TABLE {swh_storage_cassandra_keyspace}.test_table", []
-            )
+            try:
+                swh_storage._cql_runner.execute_with_retries(
+                    f"DROP TABLE {swh_storage_cassandra_keyspace}.test_table", []
+                )
+            except Exception:
+                pass
 
     def test_upgrade_all_from_v2_9(
         self, swh_storage, swh_storage_cassandra_keyspace, invoke, mocker
-- 
GitLab


From 8019cd5290de913344fb55c4b1c03c38f21d46da Mon Sep 17 00:00:00 2001
From: Valentin Lorentz <vlorentz@softwareheritage.org>
Date: Wed, 19 Mar 2025 12:20:37 +0100
Subject: [PATCH 2/2] Finish implementing migration_update_one + add tests

---
 swh/storage/cassandra/cql.py        | 17 ++++++++++-----
 swh/storage/cassandra/migrations.py |  2 +-
 swh/storage/tests/test_cassandra.py | 33 ++++++++++++++++++++++++++++-
 3 files changed, 45 insertions(+), 7 deletions(-)

diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py
index 9738edbb..33007c5b 100644
--- a/swh/storage/cassandra/cql.py
+++ b/swh/storage/cassandra/cql.py
@@ -510,8 +510,8 @@ class CqlRunner:
         """Adds a :class:`MigrationRow`.
 
         Raises:
-            swh.storage.migrations.MigrationAlreadyExists: if there is already a known
-                migration with that id.
+            swh.storage.cassandra.migrations.MigrationAlreadyExists: if there is already
+                a migration with that id in the database.
         """
         from .migrations import MigrationAlreadyExists
 
@@ -539,10 +539,15 @@ class CqlRunner:
         self, migration: MigrationRow, expected_status: str, *, statement
     ) -> None:
         """Updates the dependencies/min_read_version/status of an existing migration.
-        Errors if the migration is not already in the database, or does not have the
-        ``expected_status``.
+
+        Raises:
+            ValueError: the migration is not already in the database
+            swh.storage.cassandra.migrations.UnexpectedMigrationStatus: the migration
+                in the database does not have the ``expected_status``.
         """
-        (row,) = self.execute_with_retries(
+        from .migrations import UnexpectedMigrationStatus
+
+        (result,) = self.execute_with_retries(
             statement,
             (
                 migration.dependencies,
@@ -552,6 +557,8 @@ class CqlRunner:
                 expected_status,
             ),
         )
+        if not result["[applied]"]:
+            raise UnexpectedMigrationStatus([migration.id])
 
     @_prepared_select_statement(MigrationRow, "WHERE id IN ?")
     def migration_get(self, migration_ids, *, statement) -> Iterable[MigrationRow]:
diff --git a/swh/storage/cassandra/migrations.py b/swh/storage/cassandra/migrations.py
index 00c54eef..bb2b3484 100644
--- a/swh/storage/cassandra/migrations.py
+++ b/swh/storage/cassandra/migrations.py
@@ -28,7 +28,7 @@ class MigrationAlreadyExists(BaseMigrationException):
     """
 
 
-class UnexpectedMigrationStatusExists(BaseMigrationException):
+class UnexpectedMigrationStatus(BaseMigrationException):
     """Raised when trying to change a migration status from state A to state B,
     but it was not in state A.
 
diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py
index dfc55f3c..6c76b7cf 100644
--- a/swh/storage/tests/test_cassandra.py
+++ b/swh/storage/tests/test_cassandra.py
@@ -26,8 +26,13 @@ from swh.model.model import (
 )
 from swh.storage import get_storage
 from swh.storage.cassandra.cql import BATCH_INSERT_MAX_SIZE
+from swh.storage.cassandra.migrations import (
+    MIGRATIONS,
+    MigrationAlreadyExists,
+    UnexpectedMigrationStatus,
+)
 import swh.storage.cassandra.model
-from swh.storage.cassandra.model import BaseRow, ContentRow, ExtIDRow
+from swh.storage.cassandra.model import BaseRow, ContentRow, ExtIDRow, MigrationRow
 from swh.storage.cassandra.schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS, TABLES
 from swh.storage.cassandra.storage import (
     DIRECTORY_ENTRIES_INSERT_ALGOS,
@@ -665,6 +670,32 @@ class TestStorageDeletion(_TestStorageDeletion):
         return row["count"]
 
 
+@pytest.mark.cassandra
+class TestCqlRunner:
+    def test_insert_known_migration(self, swh_storage):
+        with pytest.raises(MigrationAlreadyExists):
+            swh_storage._cql_runner.migration_add_one_if_not_exists(
+                MigrationRow(
+                    id=MIGRATIONS[0].id,
+                    dependencies=set(),
+                    min_read_version="1.0.0",
+                    status="completed",
+                )
+            )
+
+    def test_update_migration_unexpected_status(self, swh_storage):
+        with pytest.raises(UnexpectedMigrationStatus):
+            swh_storage._cql_runner.migration_update_one(
+                MigrationRow(
+                    id=MIGRATIONS[0].id,
+                    dependencies=set(),
+                    min_read_version="1.0.0",
+                    status="completed",
+                ),
+                expected_status="running",
+            )
+
+
 @pytest.mark.cassandra
 @pytest.mark.parametrize(
     "allow_overwrite,object_type",
-- 
GitLab