From 8459c079508d6ed50bef5cf03a4ea5e299c98d8e Mon Sep 17 00:00:00 2001 From: Valentin Lorentz <vlorentz@softwareheritage.org> Date: Wed, 19 Mar 2025 12:05:54 +0100 Subject: [PATCH 1/2] Make migration row insertion/update atomic --- swh/storage/cassandra/cql.py | 57 ++++++++++++++++++++----- swh/storage/cassandra/migrations.py | 57 ++++++++++++++++++++----- swh/storage/tests/test_cli_cassandra.py | 9 ++-- 3 files changed, 99 insertions(+), 24 deletions(-) diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py index eba4ffcd..9738edbb 100644 --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -224,23 +224,23 @@ TArg = TypeVar("TArg") TSelf = TypeVar("TSelf") -def _insert_query(row_class: Type[BaseRow]) -> str: +def _insert_query(row_class: Type[BaseRow], if_not_exists: bool = False) -> str: columns = row_class.cols() return ( f"INSERT INTO {{keyspace}}.{row_class.TABLE} ({', '.join(columns)}) " f"VALUES ({', '.join('?' for _ in columns)})" - ) + ) + (" IF NOT EXISTS" if if_not_exists else "") def _prepared_insert_statement( - row_class: Type[BaseRow], + row_class: Type[BaseRow], if_not_exists: bool = False ) -> Callable[ [Callable[[TSelf, TArg, NamedArg(Any, "statement")], TRet]], # noqa Callable[[TSelf, TArg], TRet], ]: """Shorthand for using `_prepared_statement` for `INSERT INTO` statements.""" - return _prepared_statement(_insert_query(row_class)) + return _prepared_statement(_insert_query(row_class, if_not_exists=if_not_exists)) def _prepared_exists_statement( @@ -435,8 +435,8 @@ class CqlRunner: except (ReadTimeout, WriteTimeout) as e: raise QueryTimeout(*e.args) from None - def _add_one(self, statement, obj: "DataclassInstance") -> None: - self.execute_with_retries(statement, dataclasses.astuple(obj)) + def _add_one(self, statement, obj: "DataclassInstance") -> ResultSet: + return self.execute_with_retries(statement, dataclasses.astuple(obj)) def _add_many(self, statement, objs: Sequence[BaseRow]) -> None: tables = {obj.TABLE for obj in objs} @@ -503,19 +503,56 @@ class CqlRunner: # 'migration' table ########################## - @_prepared_insert_statement(MigrationRow) - def migration_add_one(self, migration: MigrationRow, *, statement) -> None: - self._add_one(statement, migration) + @_prepared_insert_statement(MigrationRow, if_not_exists=True) + def migration_add_one_if_not_exists( + self, migration: MigrationRow, *, statement + ) -> None: + """Adds a :class:`MigrationRow`. + + Raises: + swh.storage.migrations.MigrationAlreadyExists: if there is already a known + migration with that id. + """ + from .migrations import MigrationAlreadyExists + + (result,) = self._add_one(statement, migration) + if not result["[applied]"]: + raise MigrationAlreadyExists([migration.id]) - @_prepared_insert_statement(MigrationRow) + @_prepared_insert_statement(MigrationRow, if_not_exists=True) def migration_add_concurrent( self, migrations: List[MigrationRow], *, statement ) -> None: + """Inserts or updates the given migration rows""" if len(migrations) == 0: # nothing to do return self._add_many(statement, migrations) + @_prepared_statement( + f"UPDATE {{keyspace}}.{MigrationRow.TABLE} " + f"SET dependencies=?, min_read_version=?, status=? " + f"WHERE id=? " + f"IF status = ?" + ) + def migration_update_one( + self, migration: MigrationRow, expected_status: str, *, statement + ) -> None: + """Updates the dependencies/min_read_version/status of an existing migration. + Errors if the migration is not already in the database, or does not have the + ``expected_status``. + """ + (row,) = self.execute_with_retries( + statement, + ( + migration.dependencies, + migration.min_read_version, + migration.status, + migration.id, + expected_status, + ), + ) + @_prepared_select_statement(MigrationRow, "WHERE id IN ?") def migration_get(self, migration_ids, *, statement) -> Iterable[MigrationRow]: return map( diff --git a/swh/storage/cassandra/migrations.py b/swh/storage/cassandra/migrations.py index 424536d9..00c54eef 100644 --- a/swh/storage/cassandra/migrations.py +++ b/swh/storage/cassandra/migrations.py @@ -16,6 +16,28 @@ from .model import MigrationRow logger = logging.getLogger(__name__) +class BaseMigrationException(Exception): + """Base class for exceptions related to migrations.""" + + +class MigrationAlreadyExists(BaseMigrationException): + """Raised when trying to insert a rows in the ``migration`` table, but the id + already exists. + + Typically happens when ``swh storage cassandra upgrade`` runs twice at the same time. + """ + + +class UnexpectedMigrationStatusExists(BaseMigrationException): + """Raised when trying to change a migration status from state A to state B, + but it was not in state A. + + Typically happens when ``swh storage cassandra upgrade`` runs twice at the same time, + or when ``swh storage cassandra mark-upgraded`` was used while + ``swh storage cassandra upgrade`` is running. + """ + + class MigrationStatus(enum.Enum): PENDING = "pending" """The migration was not applied yet""" @@ -128,14 +150,20 @@ def apply_migrations( ) remaining_migrations_missing_dependencies.append(migration) continue - cql_runner.migration_add_one( - MigrationRow( - id=migration.id, - dependencies=migration.dependencies, - min_read_version=migration.min_read_version, - status=MigrationStatus.RUNNING.value, - ) + + row = MigrationRow( + id=migration.id, + dependencies=migration.dependencies, + min_read_version=migration.min_read_version, + status=MigrationStatus.RUNNING.value, ) + try: + cql_runner.migration_add_one_if_not_exists(row) + except MigrationAlreadyExists: + cql_runner.migration_update_one( + row, expected_status=MigrationStatus.PENDING.value + ) + if migration.script is None: logger.info("Skipping %s", migration.id) if migration.help: @@ -144,13 +172,14 @@ def apply_migrations( else: logger.info("Running %s...", migration.id) migration.script(cql_runner) - cql_runner.migration_add_one( + cql_runner.migration_update_one( MigrationRow( id=migration.id, dependencies=migration.dependencies, min_read_version=migration.min_read_version, status=MigrationStatus.COMPLETED.value, - ) + ), + expected_status=MigrationStatus.RUNNING.value, ) logger.info("Done.") statuses[migration.id] = MigrationStatus.COMPLETED @@ -193,5 +222,11 @@ def create_migrations_table_if_needed(cql_runner: CqlRunner) -> None: min_read_version=migration.min_read_version, status=MigrationStatus.COMPLETED.value, ) - cql_runner.migration_add_concurrent([migration_row]) - logger.info("'migrations' table created.") + try: + cql_runner.migration_add_one_if_not_exists(migration_row) + except MigrationAlreadyExists: + logger.warning( + "'migration' table was created by an other process at the same time" + ) + else: + logger.info("'migrations' table created.") diff --git a/swh/storage/tests/test_cli_cassandra.py b/swh/storage/tests/test_cli_cassandra.py index 0d348cc3..83d66f7a 100644 --- a/swh/storage/tests/test_cli_cassandra.py +++ b/swh/storage/tests/test_cli_cassandra.py @@ -141,9 +141,12 @@ class TestCassandraCli: f"SELECT * FROM {swh_storage_cassandra_keyspace}.test_table", [] ) finally: - swh_storage._cql_runner.execute_with_retries( - f"DROP TABLE {swh_storage_cassandra_keyspace}.test_table", [] - ) + try: + swh_storage._cql_runner.execute_with_retries( + f"DROP TABLE {swh_storage_cassandra_keyspace}.test_table", [] + ) + except Exception: + pass def test_upgrade_all_from_v2_9( self, swh_storage, swh_storage_cassandra_keyspace, invoke, mocker -- GitLab From 8019cd5290de913344fb55c4b1c03c38f21d46da Mon Sep 17 00:00:00 2001 From: Valentin Lorentz <vlorentz@softwareheritage.org> Date: Wed, 19 Mar 2025 12:20:37 +0100 Subject: [PATCH 2/2] Finish implementing migration_update_one + add tests --- swh/storage/cassandra/cql.py | 17 ++++++++++----- swh/storage/cassandra/migrations.py | 2 +- swh/storage/tests/test_cassandra.py | 33 ++++++++++++++++++++++++++++- 3 files changed, 45 insertions(+), 7 deletions(-) diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py index 9738edbb..33007c5b 100644 --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -510,8 +510,8 @@ class CqlRunner: """Adds a :class:`MigrationRow`. Raises: - swh.storage.migrations.MigrationAlreadyExists: if there is already a known - migration with that id. + swh.storage.cassandra.migrations.MigrationAlreadyExists: if there is already + a migration with that id in the database. """ from .migrations import MigrationAlreadyExists @@ -539,10 +539,15 @@ class CqlRunner: self, migration: MigrationRow, expected_status: str, *, statement ) -> None: """Updates the dependencies/min_read_version/status of an existing migration. - Errors if the migration is not already in the database, or does not have the - ``expected_status``. + + Raises: + ValueError: the migration is not already in the database + swh.storage.cassandra.migrations.UnexpectedMigrationStatus: the migration + in the database does not have the ``expected_status``. """ - (row,) = self.execute_with_retries( + from .migrations import UnexpectedMigrationStatus + + (result,) = self.execute_with_retries( statement, ( migration.dependencies, @@ -552,6 +557,8 @@ class CqlRunner: expected_status, ), ) + if not result["[applied]"]: + raise UnexpectedMigrationStatus([migration.id]) @_prepared_select_statement(MigrationRow, "WHERE id IN ?") def migration_get(self, migration_ids, *, statement) -> Iterable[MigrationRow]: diff --git a/swh/storage/cassandra/migrations.py b/swh/storage/cassandra/migrations.py index 00c54eef..bb2b3484 100644 --- a/swh/storage/cassandra/migrations.py +++ b/swh/storage/cassandra/migrations.py @@ -28,7 +28,7 @@ class MigrationAlreadyExists(BaseMigrationException): """ -class UnexpectedMigrationStatusExists(BaseMigrationException): +class UnexpectedMigrationStatus(BaseMigrationException): """Raised when trying to change a migration status from state A to state B, but it was not in state A. diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py index dfc55f3c..6c76b7cf 100644 --- a/swh/storage/tests/test_cassandra.py +++ b/swh/storage/tests/test_cassandra.py @@ -26,8 +26,13 @@ from swh.model.model import ( ) from swh.storage import get_storage from swh.storage.cassandra.cql import BATCH_INSERT_MAX_SIZE +from swh.storage.cassandra.migrations import ( + MIGRATIONS, + MigrationAlreadyExists, + UnexpectedMigrationStatus, +) import swh.storage.cassandra.model -from swh.storage.cassandra.model import BaseRow, ContentRow, ExtIDRow +from swh.storage.cassandra.model import BaseRow, ContentRow, ExtIDRow, MigrationRow from swh.storage.cassandra.schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS, TABLES from swh.storage.cassandra.storage import ( DIRECTORY_ENTRIES_INSERT_ALGOS, @@ -665,6 +670,32 @@ class TestStorageDeletion(_TestStorageDeletion): return row["count"] +@pytest.mark.cassandra +class TestCqlRunner: + def test_insert_known_migration(self, swh_storage): + with pytest.raises(MigrationAlreadyExists): + swh_storage._cql_runner.migration_add_one_if_not_exists( + MigrationRow( + id=MIGRATIONS[0].id, + dependencies=set(), + min_read_version="1.0.0", + status="completed", + ) + ) + + def test_update_migration_unexpected_status(self, swh_storage): + with pytest.raises(UnexpectedMigrationStatus): + swh_storage._cql_runner.migration_update_one( + MigrationRow( + id=MIGRATIONS[0].id, + dependencies=set(), + min_read_version="1.0.0", + status="completed", + ), + expected_status="running", + ) + + @pytest.mark.cassandra @pytest.mark.parametrize( "allow_overwrite,object_type", -- GitLab