diff --git a/docs/cassandra-migrations.rst b/docs/cassandra-migrations.rst new file mode 100644 index 0000000000000000000000000000000000000000..5f5f2bd738200f65618b74d8ca0b44b72b8cbe39 --- /dev/null +++ b/docs/cassandra-migrations.rst @@ -0,0 +1,83 @@ +.. _swh-storage-cassandra-migrations: + +Cassandra migrations +==================== + +While swh-storage's PostgreSQL backend relies on the generic ``swh db`` CLI provided by +:ref:`swh-core` to manage migrations, its Cassandra backend uses a specific design and CLI. + + +Migration model +--------------- + +Due to the potentially long runtime (tables need to be rewritten to apply some changes), +and non-transactionality of migrations in Cassandra, we model migrations as a DAG instead +of a sequence of updates. This allows, for example, running two migrations in parallel, +as long as they don't depend on each other. + +There are two types of migrations: those that can be done automatically (and are implemented +in Python), and those that need to be performed manually (for example, by setting up a +:ref:`replayer <architecture-overview>`, to write the content of a new table from the +:ref:`journal <swh-journal>`). + +Each migration has the following properties: + +``id`` + A unique human-readable name, starting with the date it was written. + The date does not have any impact in the order migrations are applied. + +``dependencies`` + A set of migration ids that need to be applied before this migration can start + +``min_read_version`` + What minimum version of swh-storage can read the database after this migration + started (or after it is completed). + +``status`` + a string among ``pending``, ``running``, or ``completed`` + + +Version checking +---------------- + +swh-storage's :meth:`check_config <swh.storage.interface.StorageInterface.check_config` +returns :const:`False` and prints a warning if its database meets any of the following conditions: + +* any migration with status ``running`` or ``completed`` has a ``min_read_version`` strictly + newer than the current swh-storage version, meaning the Python code was rolled back and is + no longer able to read the database +* any migration it considers "required" does not have status ``completed``. This means the database + needs to be upgraded (or its current upgrades must be finished), as the Python code no longer + supports this old version of the database + +In order to avoid long downtimes, this means that updates that need a long-running migrations +happen this way: + +1. Version ``N`` is released, which adds a new migration ``X`` required by version ``N`` but + still compatible with version ``N-1``; and optionally a new migration ``Y`` which is **not** required + by version ``N`` and may not be compatible with version ``N-1`` +2. Some database clients can be updated to ``N``; migration ``X`` is applied +3. Remaining database clients must be updated to ``N`` +4. Migration ``Y`` is applied +5. Version ``N+1`` is released, which removes support for databases without migration ``X``, or even ``Y`` +6. Database clients can be updated to ``N+1`` + +Operations +---------- + +``swh storage cassandra init`` + Creates a new keyspace, types and tables, and marks all migrations known by the current + Python code as completed. + +``swh storage cassandra list-migrations`` + Prints all migrations known to the Python code and their current status. + In case the database has a migration applied that is not known to the Python code + (ie. the Python code was rolled back), it is not displayed. + +``swh storage cassandra upgrade`` + Applies one or more migrations. If some migrations cannot run automatically, documentation for + how to run the migration is displayed to the user, and further migrations are not applied. + +``swh storage cassandra mark-upgraded`` + Tells the database a migration was applied. This is typically used after manually applying + a migration, as ``swh storage cassandra upgrade`` does it on its own for automated migrations. diff --git a/docs/index.rst b/docs/index.rst index 5cdae8623546e16b699fae367b768336f1f32600..f30d44b640e3ec607dea574f9805e2897558dad6 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -83,6 +83,7 @@ Reference Documentation :maxdepth: 2 db-schema + cassandra-migrations cli .. only:: standalone_package_doc diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py index 09989d64a91fea2b145aafbd338e5d5c186266e0..6dcc43f38c7ff74348de86056021181a949f6c71 100644 --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from importlib.metadata import PackageNotFoundError, distribution from typing import TYPE_CHECKING, Any, Dict, List import warnings @@ -10,6 +11,12 @@ if TYPE_CHECKING: from .interface import StorageInterface +try: + __version__ = distribution("swh-storage").version +except PackageNotFoundError: + __version__ = "devel" + + StorageSpec = Dict[str, Any] diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py index 304bc741db1b6aa34322070a01bb00f8e854889d..eba4ffcd418b2dba6227c5befc0dfeef9f6ff78b 100644 --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -71,6 +71,7 @@ from .model import ( ExtIDRow, MetadataAuthorityRow, MetadataFetcherRow, + MigrationRow, ObjectCountRow, ObjectReferenceRow, ObjectReferencesTableRow, @@ -144,7 +145,7 @@ def get_execution_profiles( # datacenter as the client (DCAwareRoundRobinPolicy) -def create_keyspace(cql_runner: "CqlRunner", *, durable_writes=True): +def create_keyspace(cql_runner: "CqlRunner", *, durable_writes=True) -> None: extra_params = "" if not durable_writes: extra_params = "AND durable_writes = false" @@ -159,13 +160,34 @@ def create_keyspace(cql_runner: "CqlRunner", *, durable_writes=True): [], ) cql_runner.execute_with_retries(f'USE "{cql_runner.keyspace}"', []) - for table_name, query in CREATE_TABLES_QUERIES.items(): - current_table_options = cql_runner.table_options.get(table_name, "") - if current_table_options.strip(): - current_table_options = "AND " + current_table_options - query = query.format(table_options=current_table_options) - logger.debug("Running:\n%s", query) - cql_runner.execute_with_retries(query, []) + for table_name in CREATE_TABLES_QUERIES: + create_table(cql_runner, table_name) + + +def create_table(cql_runner: "CqlRunner", table_name: str) -> None: + query = CREATE_TABLES_QUERIES[table_name] + current_table_options = cql_runner.table_options.get(table_name, "") + if current_table_options.strip(): + current_table_options = "AND " + current_table_options + query = query.format(table_options=current_table_options) + logger.debug("Running:\n%s", query) + cql_runner.execute_with_retries(query, []) + + +def mark_all_migrations_completed(cql_runner: "CqlRunner") -> None: + from .migrations import MIGRATIONS, MigrationStatus + + cql_runner.migration_add_concurrent( + [ + MigrationRow( + id=migration.id, + dependencies=migration.dependencies, + min_read_version=migration.min_read_version, + status=MigrationStatus.COMPLETED.value, + ) + for migration in MIGRATIONS + ] + ) TRet = TypeVar("TRet") @@ -477,6 +499,37 @@ class CqlRunner: return [id_ for id_ in ids if id_ not in found_ids] + ########################## + # 'migration' table + ########################## + + @_prepared_insert_statement(MigrationRow) + def migration_add_one(self, migration: MigrationRow, *, statement) -> None: + self._add_one(statement, migration) + + @_prepared_insert_statement(MigrationRow) + def migration_add_concurrent( + self, migrations: List[MigrationRow], *, statement + ) -> None: + if len(migrations) == 0: + # nothing to do + return + self._add_many(statement, migrations) + + @_prepared_select_statement(MigrationRow, "WHERE id IN ?") + def migration_get(self, migration_ids, *, statement) -> Iterable[MigrationRow]: + return map( + MigrationRow.from_dict, + self.execute_with_retries(statement, [migration_ids]), + ) + + @_prepared_select_statement(MigrationRow) + def migration_list(self, *, statement) -> Iterable[MigrationRow]: + return map( + MigrationRow.from_dict, + self.execute_with_retries(statement, []), + ) + ########################## # 'content' table ########################## diff --git a/swh/storage/cassandra/migrations.py b/swh/storage/cassandra/migrations.py new file mode 100644 index 0000000000000000000000000000000000000000..b22cb2cd2300b3fbec754d990645886079c53f4b --- /dev/null +++ b/swh/storage/cassandra/migrations.py @@ -0,0 +1,197 @@ +# Copyright (C) 2024 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import dataclasses +import enum +import graphlib +import logging +import textwrap +from typing import Callable, Iterable, Optional, Sequence + +from .cql import CqlRunner, create_table +from .model import MigrationRow + +logger = logging.getLogger(__name__) + + +class MigrationStatus(enum.Enum): + PENDING = "pending" + """The migration was not applied yet""" + RUNNING = "running" + COMPLETED = "completed" + + +@dataclasses.dataclass +class Migration: + id: str + """Unique identifier of this migration. + + Should have the format: ``YYYY-MM-DD_developer_readable_name``""" + + dependencies: set[str] + """Set of identifiers of migrations this migration depends on""" + + min_read_version: str + """Lowest version of the Python code that should be allowed to read the database + if this migration is applied""" + + script: Optional[Callable[[CqlRunner], None]] + """If provided, this is a function that runs the migration. + + If not provided, the migration must be run manually, using steps described + in the documentation""" + + help: Optional[str] + """Documentation of the migration + + Typically describes what to do if ``script`` is :const:`None`.""" + + required: bool + """Whether this migration must be applied for the current version of the Python code + to allow instantiating :class:`swh.storage.cassandra.CassandraStorage`.""" + + +MIGRATIONS: tuple[Migration, ...] = ( + Migration( + id="2024-12-12_init", + dependencies=set(), + min_read_version="2.9.0", + script=lambda _cql_runner: None, + help="Dummy migration that represents the database schema as of v2.9.0" "", + required=True, + ), +) + + +def list_migrations( + cql_runner: CqlRunner, rows: Optional[Sequence[MigrationRow]] = None +) -> list[tuple[Migration, MigrationStatus]]: + """Returns all known migrations, in topological order + + ``rows``, should be the value returned by ``cql_runner.migration_list``. + + This includes migrations that are not required to instantiate + :class:`swh.storage.cassandra.CassandraStorage`.""" + dependency_graph = {m.id: m.dependencies for m in MIGRATIONS} + if rows is None: + rows = list(cql_runner.migration_list()) + statuses = {row.id: row.status for row in rows} + migrations = {migration.id: migration for migration in MIGRATIONS} + return [ + ( + migrations[migration_id], + MigrationStatus(statuses.get(migration_id, "pending")), + ) + for migration_id in graphlib.TopologicalSorter(dependency_graph).static_order() + ] + + +def apply_migrations( + cql_runner: CqlRunner, ids_to_apply: Iterable[str] +) -> tuple[bool, Sequence[Migration], Sequence[Migration]]: + """Applies migrations with the given ids (unless they already are). + + Returns: + * whether any was run, and + * which migrations cannot run because they are missing dependencies + * which migrations still need to be run manually. + """ + applied_any = False + remaining_manual_migrations = [] + remaining_migrations_missing_dependencies = [] + + statuses = { + migration.id: status for (migration, status) in list_migrations(cql_runner) + } + for migration_id in ids_to_apply: + if migration_id not in statuses: + raise ValueError(f"Unknown migration: {migration_id}") + + migrations_to_apply = [ + migration for migration in MIGRATIONS if migration.id in ids_to_apply + ] + for migration in migrations_to_apply: + status = statuses[migration.id] + if status == MigrationStatus.PENDING: + missing_dependencies = { + dependency + for dependency in migration.dependencies + if statuses[dependency] != MigrationStatus.COMPLETED + } + if missing_dependencies: + logger.warning( + "Cannot apply %s: depends on %s", + migration.id, + ", ".join(missing_dependencies), + ) + remaining_migrations_missing_dependencies.append(migration) + continue + cql_runner.migration_add_one( + MigrationRow( + id=migration.id, + dependencies=migration.dependencies, + min_read_version=migration.min_read_version, + status=MigrationStatus.RUNNING.value, + ) + ) + if migration.script is None: + logger.info("Skipping %s", migration.id) + if migration.help: + logger.info("%s", textwrap.indent(migration.help, " ")) + remaining_manual_migrations.append(migration) + else: + logger.info("Running %s...", migration.id) + migration.script(cql_runner) + cql_runner.migration_add_one( + MigrationRow( + id=migration.id, + dependencies=migration.dependencies, + min_read_version=migration.min_read_version, + status=MigrationStatus.COMPLETED.value, + ) + ) + logger.info("Done.") + statuses[migration.id] = MigrationStatus.COMPLETED + applied_any = True + + return ( + applied_any, + remaining_manual_migrations, + remaining_migrations_missing_dependencies, + ) + + +def create_migrations_table_if_needed(cql_runner: CqlRunner) -> None: + if not list( + cql_runner.execute_with_retries( + """ + SELECT table_name + FROM system_schema.tables + WHERE keyspace_name=%s AND table_name='migration' + """, + [cql_runner.keyspace], + ) + ): + logger.info("'migrations' table does not exist yet, creating it.") + + # 'migration' table does not exist. Create it: + cql_runner.execute_with_retries(f'USE "{cql_runner.keyspace}"', []) + create_table(cql_runner, "migration") + + # And mark the dummy initial migration as done, as it corresponds to the schema + # of the last swh-storage version before adding the 'migrations' table. + # Other migrations could not have run before this is done. + (migration,) = [ + migration for migration in MIGRATIONS if migration.id == "2024-12-12_init" + ] + + migration_row = MigrationRow( + id=migration.id, + dependencies=migration.dependencies, + min_read_version=migration.min_read_version, + status=MigrationStatus.COMPLETED.value, + ) + cql_runner.migration_add_concurrent([migration_row]) + logger.info("'migrations' table created.") diff --git a/swh/storage/cassandra/model.py b/swh/storage/cassandra/model.py index f0e6506187b434148eb7330c1b64da2706409839..67b647b41d4b121ff8166c918f2eae251b9e1a4b 100644 --- a/swh/storage/cassandra/model.py +++ b/swh/storage/cassandra/model.py @@ -97,6 +97,18 @@ class BaseRow: return dataclasses.asdict(cast("DataclassInstance", self)) +@dataclasses.dataclass +class MigrationRow(BaseRow): + TABLE = "migration" + PARTITION_KEY = ("id",) + + id: str + dependencies: set[str] + min_read_version: str + status: str + """``pending``/``running``/``completed``""" + + @dataclasses.dataclass class ContentRow(BaseRow): TABLE = "content" diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py index 333b76ff712430481a95c4cce4c97ea0f8ebabb0..149a1d17097a6038efa048b6bd7356722cecb52f 100644 --- a/swh/storage/cassandra/schema.py +++ b/swh/storage/cassandra/schema.py @@ -21,6 +21,16 @@ CREATE TYPE IF NOT EXISTS person ( name blob, email blob );""", + "migration": """ +CREATE TABLE IF NOT EXISTS migration ( + id ascii, + dependencies frozen<set<ascii>>, + min_read_version ascii, + status ascii, + PRIMARY KEY ((id)) +) WITH + comment = 'Set of known database migrations' + {table_options};""", "content": """ CREATE TABLE IF NOT EXISTS content ( sha1 blob, @@ -289,6 +299,7 @@ CREATE TABLE IF NOT EXISTS {keyspace}.{name} ( {table_options};""" # noqa: B950 TABLES = [ + "migration", "skipped_content", "content", "revision", diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py index ee27ef7b8ebd3d772ef052a04d92190b3141376c..2bd173bd36c2fb8d55b9ec0e4ddd36fa0f9a4048 100644 --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -28,6 +28,7 @@ from typing import ( ) import attr +from packaging.version import Version from swh.core.api.classes import stream_results from swh.core.api.serializers import msgpack_dumps, msgpack_loads @@ -54,6 +55,7 @@ from swh.model.model import ( ) from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID from swh.model.swhids import ObjectType as SwhidObjectType +from swh.storage import __version__ from swh.storage.interface import ( VISIT_STATUSES, HashDict, @@ -81,6 +83,7 @@ from ..exc import ( from ..utils import remove_keys from .common import TOKEN_BEGIN, TOKEN_END, hash_url from .cql import CqlRunner +from .migrations import MigrationStatus, list_migrations from .model import ( BaseRow, ContentRow, @@ -257,6 +260,40 @@ class CassandraStorage: def check_config(self, *, check_write: bool) -> bool: self._cql_runner.check_read() + current_version = Version(__version__) + + incompatible_migrations = [] + + rows = list(self._cql_runner.migration_list()) + for row in rows: + if row.status != MigrationStatus.PENDING: + min_read_version = Version(row.min_read_version) + if min_read_version > current_version: + incompatible_migrations.append((min_read_version, row.id)) + + if incompatible_migrations: + incompatible_migrations.sort() # sort by min_read_version + logger.warning( + "Database contains unsupported migrations: %s", + ", ".join( + f"{id_} ({min_read_version})" + for (min_read_version, id_) in incompatible_migrations + ), + ) + return False + + missing_migrations: list[str] = [] + for migration, status in list_migrations(self._cql_runner, rows=rows): + if migration.required and status != MigrationStatus.COMPLETED: + missing_migrations.append(migration.id) + + if missing_migrations: + logger.warning( + "Database missing required migrations: %s", + ", ".join(missing_migrations), + ) + return False + return True ########################## diff --git a/swh/storage/cli.py b/swh/storage/cli.py index fac2a846cad03bbc1296952185c7c703ba8eee64..ae9711e5ee649ecee374b31c2521903185fa064d 100644 --- a/swh/storage/cli.py +++ b/swh/storage/cli.py @@ -65,12 +65,9 @@ def storage(ctx, config_file, check_config): ctx.obj["check_config"] = check_config -@storage.command(name="create-keyspace") +@storage.group(name="cassandra", context_settings=CONTEXT_SETTINGS) @click.pass_context -def create_keyspace(ctx): - """Creates a Cassandra keyspace with table definitions suitable for use - by swh-storage's Cassandra backend""" - from swh.storage.cassandra import create_keyspace +def cassandra(ctx) -> None: from swh.storage.cassandra.cql import CqlRunner config = ctx.obj["config"]["storage"] @@ -89,7 +86,7 @@ def create_keyspace(ctx): if key not in config: ctx.fail(f"Missing {key} key in config file.") - cql_runner = CqlRunner( + ctx.obj["cql_runner"] = CqlRunner( hosts=config["hosts"], port=config.get("port", 9042), keyspace=config["keyspace"], @@ -99,9 +96,135 @@ def create_keyspace(ctx): register_user_types=False, # requires the keyspace to exist first ) - create_keyspace(cql_runner) - print("Done.") +@cassandra.command(name="init") +@click.pass_context +def cassandra_init(ctx) -> None: + """Creates a Cassandra keyspace with table definitions suitable for use + by swh-storage's Cassandra backend""" + from swh.storage.cassandra.cql import create_keyspace, mark_all_migrations_completed + + create_keyspace(ctx.obj["cql_runner"]) + mark_all_migrations_completed(ctx.obj["cql_runner"]) + + click.echo("Done.") + + +@cassandra.command(name="list-migrations") +@click.pass_context +def cassandra_list_migrations(ctx) -> None: + """Creates a Cassandra keyspace with table definitions suitable for use + by swh-storage's Cassandra backend""" + import textwrap + + from swh.storage.cassandra.migrations import list_migrations + + for migration, status in list_migrations(ctx.obj["cql_runner"]): + click.echo( + f"{migration.id}{' (required)' if migration.required else ''}: {status.value}" + ) + if migration.help: + click.echo(textwrap.indent(migration.help, prefix=" ")) + click.echo("") + + +@cassandra.command(name="upgrade") +@click.option("--migration", "migration_ids", multiple=True) +@click.pass_context +def cassandra_upgrade(ctx, migration_ids: tuple[str, ...]) -> None: + """Applies all pending migrations that can run automatically""" + from swh.storage.cassandra.migrations import ( + MIGRATIONS, + apply_migrations, + create_migrations_table_if_needed, + ) + + cql_runner = ctx.obj["cql_runner"] + + create_migrations_table_if_needed(cql_runner) + + ( + applied_any, + remaining_manual_migrations, + remaining_migrations_missing_dependencies, + ) = apply_migrations( + cql_runner, + (migration_ids or {migration.id for migration in MIGRATIONS}), + ) + + if remaining_manual_migrations: + click.echo( + "Some migrations need to be manually applied: " + + ", ".join(migration.id for migration in remaining_manual_migrations) + ) + ctx.exit(1) + elif remaining_migrations_missing_dependencies: + click.echo( + "Some migrations could not be applied because a dependency is missing: " + + ", ".join( + migration.id for migration in remaining_migrations_missing_dependencies + ) + ) + ctx.exit(2) + elif applied_any: + click.echo("Done.") + ctx.exit(0) + else: + click.echo("No migration to run") + ctx.exit(3) + + +@cassandra.command(name="mark-upgraded") +@click.option("--migration", "migration_ids", multiple=True) +@click.pass_context +def cassandra_mark_upgraded(ctx, migration_ids: tuple[str, ...]) -> None: + """Marks a migration as run""" + from swh.storage.cassandra.migrations import MIGRATIONS, MigrationStatus + from swh.storage.cassandra.model import MigrationRow + + logger = logging.getLogger(__name__) + + cql_runner = ctx.obj["cql_runner"] + + if migration_ids: + migrations = { + migration.id: migration + for migration in MIGRATIONS + if migration.id in migration_ids + } + else: + migrations = {migration.id: migration for migration in MIGRATIONS} + + unknown_migrations = set(migrations) - {migration.id for migration in MIGRATIONS} + if unknown_migrations: + raise click.ClickException( + f"Unknown migrations: {', '.join(unknown_migrations)}" + ) + + rows = cql_runner.migration_get(list(migrations)) + for row in rows: + if row.status == MigrationStatus.COMPLETED: + logger.warning("Migration %s was already completed", row.id) + + new_rows = [] + for migration in migrations.values(): + new_rows.append( + MigrationRow( + id=migration.id, + dependencies=migration.dependencies, + min_read_version=migration.min_read_version, + status=MigrationStatus.COMPLETED.value, + ) + ) + + if new_rows: + cql_runner.migration_add_concurrent(new_rows) + click.echo( + f"Migrations {', '.join(row.id for row in new_rows)} marked as complete." + ) + else: + click.echo("Nothing to do.") + ctx.exit(3) @storage.command(name="rpc-serve") @@ -356,7 +479,7 @@ def replay( except KeyboardInterrupt: ctx.exit(0) else: - print("Done.") + click.echo("Done.") finally: if notify: notify("STOPPING=1") diff --git a/swh/storage/pytest_plugin.py b/swh/storage/pytest_plugin.py index 8a25bf721f435a26acff292634f4cd469b196a5d..6f38814421ef333004b821ca63866920689c4097 100644 --- a/swh/storage/pytest_plugin.py +++ b/swh/storage/pytest_plugin.py @@ -252,6 +252,7 @@ def swh_storage_cassandra_backend_config( swh_storage_cassandra_keyspace, cassandra_auth_provider_config, ): + from swh.storage.cassandra.cql import CqlRunner, mark_all_migrations_completed from swh.storage.cassandra.schema import TABLES (hosts, port) = swh_storage_cassandra_cluster @@ -268,6 +269,16 @@ def swh_storage_cassandra_backend_config( auth_provider=cassandra_auth_provider_config, ) + cql_runner = CqlRunner( + hosts, + keyspace, + port, + auth_provider=cassandra_auth_provider_config, + consistency_level="ONE", + ) + + mark_all_migrations_completed(cql_runner) + yield storage_config storage = get_storage(**storage_config) diff --git a/swh/storage/tests/test_cassandra.py b/swh/storage/tests/test_cassandra.py index 3d2556952ee4acf77d696e2be4faf2511b7b7af4..dfc55f3cbd312e055d5602575a423aa9770a1a23 100644 --- a/swh/storage/tests/test_cassandra.py +++ b/swh/storage/tests/test_cassandra.py @@ -64,6 +64,8 @@ def _python_type_to_cql_type_re(ty: type) -> str: return "boolean" elif ty is dict: return "frozen<list <list<blob>> >" + elif ty == set[str]: + return "frozen<set<(ascii|text)>>" elif ty is Date: return "date" elif getattr(ty, "__origin__", None) is Union: @@ -643,6 +645,7 @@ class TestStorageDeletion(_TestStorageDeletion): return list( set(TABLES) - { + "migration", "metadata_authority", "metadata_fetcher", "extid", diff --git a/swh/storage/tests/test_cassandra_migration.py b/swh/storage/tests/test_cassandra_migration.py index 044df7b8b73098fffaa65a81f9c0595439037dc1..0c58edaca130c04cc077f57765bb6f144b5b3a75 100644 --- a/swh/storage/tests/test_cassandra_migration.py +++ b/swh/storage/tests/test_cassandra_migration.py @@ -22,6 +22,13 @@ from swh.storage.cassandra.cql import ( _prepared_insert_statement, _prepared_select_statement, ) +from swh.storage.cassandra.migrations import ( + MIGRATIONS, + Migration, + MigrationStatus, + apply_migrations, + list_migrations, +) from swh.storage.cassandra.model import ContentRow from swh.storage.cassandra.schema import CONTENT_INDEX_TEMPLATE, HASH_ALGORITHMS from swh.storage.cassandra.storage import CassandraStorage @@ -110,16 +117,69 @@ def test_add_content_column( """Adds a column to the 'content' table and a new matching index. This is a simple migration, as it does not require an update to the primary key. """ + cql_runner = swh_storage._cql_runner + content_xor_hash = byte_xor_hash(StorageData.content.data) # First insert some existing data swh_storage.content_add([StorageData.content, StorageData.content2]) + # declare the migration + def byte_xor_migration_script(cql_runner): + cql_runner.execute_with_retries(f"USE {cql_runner.keyspace}", []) + cql_runner.execute_with_retries("ALTER TABLE content ADD byte_xor blob", []) + for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"): + cql_runner.execute_with_retries( + statement.format(main_algo="byte_xor").format(table_options=""), [] + ) + + BYTE_XOR_ADD_COLUMN_MIGRATION = "2024-12-18_byte_xor_add_column" + BYTE_XOR_FILL_COLUMN_MIGRATION = "2024-12-18_byte_xor_fill_column" + mocker.patch( + "swh.storage.cassandra.migrations.MIGRATIONS", + [ + *MIGRATIONS, + Migration( + id=BYTE_XOR_ADD_COLUMN_MIGRATION, + help="Adds a byte_xor column as a hash", + script=byte_xor_migration_script, + dependencies={"2024-12-12_init"}, + required=True, + min_read_version="2.9.0", + ), + Migration( + id=BYTE_XOR_FILL_COLUMN_MIGRATION, + help="Applied when the byte_xor column has no null values anymore", + script=None, + dependencies={BYTE_XOR_ADD_COLUMN_MIGRATION}, + required=False, + min_read_version="2.9.0", + ), + ], + ) + for migration, status in list_migrations(cql_runner): + if migration.id == BYTE_XOR_ADD_COLUMN_MIGRATION: + assert status == MigrationStatus.PENDING + break + else: + assert False, f"{BYTE_XOR_ADD_COLUMN_MIGRATION} missing from revision list" + assert not swh_storage.check_config( + check_write=False + ), "CassandraStorage does not detect it is missing a migration" + # Then update the schema - session = swh_storage._cql_runner._cluster.connect(swh_storage._cql_runner.keyspace) - session.execute("ALTER TABLE content ADD byte_xor blob") - for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"): - session.execute(statement.format(main_algo="byte_xor").format(table_options="")) + apply_migrations(cql_runner, [BYTE_XOR_ADD_COLUMN_MIGRATION]) + + # Check migration is marked as applied + for migration, status in list_migrations(cql_runner): + if migration.id == BYTE_XOR_ADD_COLUMN_MIGRATION: + assert status == MigrationStatus.COMPLETED + break + else: + assert False, f"{BYTE_XOR_ADD_COLUMN_MIGRATION} missing from revision list" + assert swh_storage.check_config( + check_write=False + ), "CassandraStorage does not detect the missing migration was applied" # Should not affect the running code at all: assert swh_storage.content_get([StorageData.content.sha1]) == [ @@ -245,30 +305,88 @@ def test_change_content_pk( and make this new column part of the primary key This is a complex migration, as it requires copying the whole table """ + cql_runner = swh_storage._cql_runner content_xor_hash = byte_xor_hash(StorageData.content.data) session = swh_storage._cql_runner._cluster.connect(swh_storage._cql_runner.keyspace) # First insert some existing data swh_storage.content_add([StorageData.content, StorageData.content2]) - # Then add a new table and a new index - session.execute( - """ - CREATE TABLE IF NOT EXISTS content_v2 ( - sha1 blob, - sha1_git blob, - sha256 blob, - blake2s256 blob, - byte_xor blob, - length bigint, - ctime timestamp, - -- creation time, i.e. time of (first) injection into the storage - status ascii, - PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256, byte_xor)) - );""" + # declare the migration, which adds a table and a new index + def byte_xor_add_table_migration_script(cql_runner): + cql_runner.execute_with_retries(f"USE {cql_runner.keyspace}", []) + cql_runner.execute_with_retries( + """ + CREATE TABLE IF NOT EXISTS content_v2 ( + sha1 blob, + sha1_git blob, + sha256 blob, + blake2s256 blob, + byte_xor blob, + length bigint, + ctime timestamp, + -- creation time, i.e. time of (first) injection into the storage + status ascii, + PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256, byte_xor)) + );""", + [], + ) + for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"): + cql_runner.execute_with_retries( + statement.format(main_algo="byte_xor").format(table_options=""), [] + ) + + def byte_xor_remove_table_migration_script(cql_runner): + cql_runner.execute_with_retries(f"USE {cql_runner.keyspace}", []) + cql_runner.execute_with_retries("DROP TABLE content", []) + + BYTE_XOR_ADD_TABLE_MIGRATION = "2024-12-18_byte_xor_add_table" + BYTE_XOR_REMOVE_TABLE_MIGRATION = "2024-12-18_byte_xor_remove_column" + mocker.patch( + "swh.storage.cassandra.migrations.MIGRATIONS", + [ + *MIGRATIONS, + Migration( + id=BYTE_XOR_ADD_TABLE_MIGRATION, + help="Adds a byte_xor column as a hash", + script=byte_xor_add_table_migration_script, + dependencies={"2024-12-12_init"}, + required=True, + min_read_version="2.9.0", + ), + Migration( + id=BYTE_XOR_REMOVE_TABLE_MIGRATION, + help="Applied when the byte_xor column has no null values anymore", + script=byte_xor_remove_table_migration_script, + dependencies={BYTE_XOR_ADD_TABLE_MIGRATION}, + required=False, + min_read_version="999!1.0.0", # incompatible with current version + ), + ], ) - for statement in CONTENT_INDEX_TEMPLATE.split("\n\n"): - session.execute(statement.format(main_algo="byte_xor").format(table_options="")) + for migration, status in list_migrations(cql_runner): + if migration.id == BYTE_XOR_ADD_TABLE_MIGRATION: + assert status == MigrationStatus.PENDING + break + else: + assert False, f"{BYTE_XOR_ADD_TABLE_MIGRATION} missing from revision list" + assert not swh_storage.check_config( + check_write=False + ), "CassandraStorage does not detect it is missing a migration" + + # Add the a new table and a new index + apply_migrations(cql_runner, [BYTE_XOR_ADD_TABLE_MIGRATION]) + + # Check migration is marked as applied + for migration, status in list_migrations(cql_runner): + if migration.id == BYTE_XOR_ADD_TABLE_MIGRATION: + assert status == MigrationStatus.COMPLETED + break + else: + assert False, f"{BYTE_XOR_ADD_TABLE_MIGRATION} missing from migration list" + assert swh_storage.check_config( + check_write=False + ), "CassandraStorage does not detect the missing migration was applied" # Should not affect the running code at all: assert swh_storage.content_get([StorageData.content.sha1]) == [ @@ -329,7 +447,21 @@ def test_change_content_pk( ] # Remove the old table: - session.execute("DROP TABLE content") + apply_migrations(cql_runner, [BYTE_XOR_REMOVE_TABLE_MIGRATION]) + + # check CassandraStorage rejects the migration, because it is declared as needing + # a newer swh-storage version + assert not swh_storage.check_config(check_write=False), ( + f"CassandraStorage does not detect it is too old to support " + f"{BYTE_XOR_REMOVE_TABLE_MIGRATION}" + ) + + # pretend we upgraded the Python code + mocker.patch("swh.storage.__version__", "999!1.0.0") + + assert not swh_storage.check_config( + check_write=False + ), f"CassandraStorage believes it is too old to support {BYTE_XOR_REMOVE_TABLE_MIGRATION}" # Object is still available, because we don't use it anymore assert swh_storage.content_find({"byte_xor": content_xor_hash}) == [ # type: ignore diff --git a/swh/storage/tests/test_cli.py b/swh/storage/tests/test_cli.py index f5d43c6dd7265ff679ca7f8787c770389c6992d0..ef841b43427453b978f8880d9a45ff709eccae60 100644 --- a/swh/storage/tests/test_cli.py +++ b/swh/storage/tests/test_cli.py @@ -5,7 +5,6 @@ import copy import logging -import os import pathlib import re import tempfile @@ -17,7 +16,7 @@ import pytest import yaml from swh.journal.serializers import key_to_kafka, value_to_kafka -from swh.model.model import Origin, Snapshot, SnapshotBranch, SnapshotTargetType +from swh.model.model import Snapshot, SnapshotBranch, SnapshotTargetType from swh.storage import get_storage from swh.storage.cli import storage as cli from swh.storage.replay import OBJECT_CONVERTERS @@ -62,35 +61,6 @@ def invoke(*args, env=None, input=None, journal_config=None, local_config=None): return ret -@pytest.mark.cassandra -def test_create_keyspace( - swh_storage_cassandra_cluster, - cassandra_auth_provider_config, -): - (hosts, port) = swh_storage_cassandra_cluster - keyspace = "test" + os.urandom(10).hex() - - storage_config = dict( - cls="cassandra", - hosts=hosts, - port=port, - keyspace=keyspace, - journal_writer={"cls": "memory"}, - objstorage={"cls": "memory"}, - auth_provider=cassandra_auth_provider_config, - ) - - result = invoke("create-keyspace", local_config={"storage": storage_config}) - assert result.exit_code == 0, result.output - assert result.output == "Done.\n" - - # Check we can write and read to it - storage = get_storage(**storage_config) - origin = Origin(url="http://example.org") - storage.origin_add([origin]) - assert storage.origin_get([origin.url]) == [origin] - - def test_replay( swh_storage, kafka_prefix: str, diff --git a/swh/storage/tests/test_cli_cassandra.py b/swh/storage/tests/test_cli_cassandra.py new file mode 100644 index 0000000000000000000000000000000000000000..47ea63518089cec58935811ad7eca4779fbb2b88 --- /dev/null +++ b/swh/storage/tests/test_cli_cassandra.py @@ -0,0 +1,434 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import functools +import logging +import os + +from cassandra import InvalidRequest +import pytest + +from swh.model.model import Origin +from swh.storage import get_storage +from swh.storage.cassandra.migrations import MIGRATIONS, Migration + +logger = logging.getLogger(__name__) + + +@pytest.fixture +def swh_storage_backend_config(swh_storage_cassandra_backend_config): + return swh_storage_cassandra_backend_config + + +@pytest.fixture +def invoke(swh_storage_cassandra_backend_config): + from .test_cli import invoke + + @functools.wraps(invoke) + def newf(*args, **kwargs): + assert "local_config" not in kwargs + return invoke( + *args, + local_config={"storage": swh_storage_cassandra_backend_config}, + **kwargs, + ) + + return newf + + +@pytest.mark.cassandra +class TestCassandraCli: + def test_init( + self, + swh_storage_cassandra_cluster, + cassandra_auth_provider_config, + ): + # not using the the invoke fixture because it uses the keyspace that is already + # initialized, which would make this test pointless + from .test_cli import invoke + + (hosts, port) = swh_storage_cassandra_cluster + keyspace = "test" + os.urandom(10).hex() + + storage_config = dict( + cls="cassandra", + hosts=hosts, + port=port, + keyspace=keyspace, + journal_writer={"cls": "memory"}, + objstorage={"cls": "memory"}, + auth_provider=cassandra_auth_provider_config, + ) + + result = invoke("cassandra", "init", local_config={"storage": storage_config}) + assert result.exit_code == 0, result.output + assert result.output == "Done.\n" + + # Check we can write and read to it + storage = get_storage(**storage_config) + origin = Origin(url="http://example.org") + storage.origin_add([origin]) + assert storage.origin_get([origin.url]) == [origin] + + def test_list_migrations(self, swh_storage, invoke, mocker): + # keep only the first migration + mocker.patch("swh.storage.cassandra.migrations.MIGRATIONS", MIGRATIONS[0:1]) + + result = invoke("cassandra", "list-migrations") + assert result.exit_code == 0, result.output + assert result.output == ( + "2024-12-12_init (required): completed\n" + " Dummy migration that represents the database schema as of v2.9.0\n" + "\n" + ) + + def test_list_migrations_pending(self, swh_storage, invoke, mocker): + new_migration = Migration( + id="2024-12-19_test_migration", + dependencies=set(), + min_read_version="2.9.0", + script=None, + help="Test migration", + required=False, + ) + mocker.patch( + "swh.storage.cassandra.migrations.MIGRATIONS", + [MIGRATIONS[0], new_migration], + ) + + result = invoke("cassandra", "list-migrations") + assert result.exit_code == 0, result.output + assert result.output == ( + "2024-12-12_init (required): completed\n" + " Dummy migration that represents the database schema as of v2.9.0\n" + "\n" + "2024-12-19_test_migration: pending\n" + " Test migration\n" + "\n" + ) + + def test_upgrade_all( + self, swh_storage, swh_storage_cassandra_keyspace, invoke, mocker + ): + def migration_script(cql_runner): + cql_runner.execute_with_retries(f"USE {cql_runner.keyspace}", []) + cql_runner.execute_with_retries( + "CREATE TABLE IF NOT EXISTS test_table (abc blob PRIMARY KEY);", [] + ) + + new_migration = Migration( + id="2024-12-19_test_migration", + dependencies=set(), + min_read_version="2.9.0", + script=migration_script, + help="Test migration", + required=False, + ) + mocker.patch( + "swh.storage.cassandra.migrations.MIGRATIONS", [*MIGRATIONS, new_migration] + ) + + try: + # create the table + result = invoke("cassandra", "upgrade") + assert result.exit_code == 0, result.output + assert result.output == "Done.\n" + + # check the table exists + swh_storage._cql_runner.execute_with_retries( + f"SELECT * FROM {swh_storage_cassandra_keyspace}.test_table", [] + ) + finally: + swh_storage._cql_runner.execute_with_retries( + f"DROP TABLE {swh_storage_cassandra_keyspace}.test_table", [] + ) + + def test_upgrade_all_from_v2_9( + self, swh_storage, swh_storage_cassandra_keyspace, invoke, mocker + ): + """Tests upgrading from v2.9.x, which did not have a 'migrations' table.""" + assert len(MIGRATIONS) == 1, ( + "This test won't work correctly after we make more changes to the schema, " + "as it relies on the schema being v2.9's plus only the migrations table." + ) + cql_runner = swh_storage._cql_runner + + cql_runner.execute_with_retries( + f"DROP TABLE {cql_runner.keyspace}.migration", [] + ) + + def migration_script(cql_runner): + cql_runner.execute_with_retries(f"USE {cql_runner.keyspace}", []) + cql_runner.execute_with_retries( + "CREATE TABLE IF NOT EXISTS test_table (abc blob PRIMARY KEY);", [] + ) + + new_migration = Migration( + id="2024-12-19_test_migration", + dependencies=set(), + min_read_version="2.9.0", + script=migration_script, + help="Test migration", + required=False, + ) + mocker.patch( + "swh.storage.cassandra.migrations.MIGRATIONS", [*MIGRATIONS, new_migration] + ) + + try: + # create the table + result = invoke("cassandra", "upgrade") + assert result.exit_code == 0, result.output + assert result.output == "Done.\n" + + # check the table exists + swh_storage._cql_runner.execute_with_retries( + f"SELECT * FROM {swh_storage_cassandra_keyspace}.test_table", [] + ) + finally: + swh_storage._cql_runner.execute_with_retries( + f"DROP TABLE {swh_storage_cassandra_keyspace}.test_table", [] + ) + + def test_upgrade_crashing( + self, swh_storage, swh_storage_cassandra_keyspace, invoke, mocker + ): + class TestException(Exception): + pass + + def migration_script(cql_runner): + raise TestException("Oh no, a crash!") + + new_migration = Migration( + id="2024-12-19_test_migration", + dependencies=set(), + min_read_version="2.9.0", + script=migration_script, + help="Test migration", + required=False, + ) + mocker.patch( + "swh.storage.cassandra.migrations.MIGRATIONS", + [MIGRATIONS[0], new_migration], + ) + + # run the buggy migration + result = invoke("cassandra", "upgrade") + assert result.exit_code == 1, result.output + assert result.output == "" # raised an exception, traceback on stderr + + result = invoke("cassandra", "list-migrations") + assert result.exit_code == 0, result.output + assert result.output == ( + "2024-12-12_init (required): completed\n" + " Dummy migration that represents the database schema as of v2.9.0\n" + "\n" + "2024-12-19_test_migration: running\n" + " Test migration\n" + "\n" + ) + + def test_upgrade_partial( + self, swh_storage, swh_storage_cassandra_keyspace, invoke, mocker + ): + def create_test_table_script(cql_runner): + cql_runner.execute_with_retries(f"USE {cql_runner.keyspace}", []) + cql_runner.execute_with_retries( + "CREATE TABLE IF NOT EXISTS test_table (abc blob PRIMARY KEY);", [] + ) + + def drop_test_table_script(cql_runner): + cql_runner.execute_with_retries(f"USE {cql_runner.keyspace}", []) + cql_runner.execute_with_retries("DROP TABLE test_table;", []) + + new_migration1 = Migration( + id="2024-12-19_create_test_table", + dependencies=set(), + min_read_version="2.9.0", + script=create_test_table_script, + help="Test migration", + required=False, + ) + new_migration2 = Migration( + id="2024-12-19_drop_test_table", + dependencies={"2024-12-19_create_test_table"}, + min_read_version="2.9.0", + script=drop_test_table_script, + help="Test migration", + required=False, + ) + mocker.patch( + "swh.storage.cassandra.migrations.MIGRATIONS", + [*MIGRATIONS, new_migration1, new_migration2], + ) + + # create the table + result = invoke( + "cassandra", "upgrade", "--migration", "2024-12-19_create_test_table" + ) + assert result.exit_code == 0, result.output + assert result.output == "Done.\n" + + # check the table exists + swh_storage._cql_runner.execute_with_retries( + f"SELECT * FROM {swh_storage_cassandra_keyspace}.test_table", [] + ) + + # drop the table + result = invoke( + "cassandra", "upgrade", "--migration", "2024-12-19_drop_test_table" + ) + assert result.exit_code == 0, result.output + assert result.output == "Done.\n" + + # check the table does not exist anymore + with pytest.raises(InvalidRequest): + swh_storage._cql_runner.execute_with_retries( + f"SELECT * FROM {swh_storage_cassandra_keyspace}.test_table", [] + ) + + def test_upgrade_disordered( + self, swh_storage, swh_storage_cassandra_keyspace, invoke, mocker + ): + """Tries to apply a migration before its dependency""" + + def create_test_table1_script(cql_runner): + cql_runner.execute_with_retries(f"USE {cql_runner.keyspace}", []) + cql_runner.execute_with_retries( + "CREATE TABLE IF NOT EXISTS test_table1 (abc blob PRIMARY KEY);", [] + ) + + def create_test_table2_script(cql_runner): + cql_runner.execute_with_retries(f"USE {cql_runner.keyspace}", []) + cql_runner.execute_with_retries( + "CREATE TABLE IF NOT EXISTS test_table1 (abc blob PRIMARY KEY);", [] + ) + + new_migration1 = Migration( + id="2024-12-19_create_test_table1", + dependencies=set(), + min_read_version="2.9.0", + script=create_test_table1_script, + help="Test migration", + required=False, + ) + new_migration2 = Migration( + id="2024-12-19_create_test_table2", + dependencies={"2024-12-19_create_test_table1"}, + min_read_version="2.9.0", + script=create_test_table2_script, + help="Test migration", + required=False, + ) + mocker.patch( + "swh.storage.cassandra.migrations.MIGRATIONS", + [*MIGRATIONS, new_migration1, new_migration2], + ) + + # run migration before its dependency + result = invoke( + "cassandra", "upgrade", "--migration", "2024-12-19_create_test_table2" + ) + assert result.exit_code == 2, result.output + assert result.output == ( + "Some migrations could not be applied because a dependency is missing: " + "2024-12-19_create_test_table2\n" + ) + + # check the migration was not marked as applied + result = invoke("cassandra", "list-migrations") + assert result.exit_code == 0, result.output + assert result.output == ( + "2024-12-12_init (required): completed\n" + " Dummy migration that represents the database schema as of v2.9.0\n" + "\n" + "2024-12-19_create_test_table1: pending\n" + " Test migration\n" + "\n" + "2024-12-19_create_test_table2: pending\n" + " Test migration\n" + "\n" + ) + + # check the tables still do not exist + with pytest.raises(InvalidRequest): + swh_storage._cql_runner.execute_with_retries( + f"SELECT * FROM {swh_storage_cassandra_keyspace}.test_table1", [] + ) + with pytest.raises(InvalidRequest): + swh_storage._cql_runner.execute_with_retries( + f"SELECT * FROM {swh_storage_cassandra_keyspace}.test_table2", [] + ) + + def test_mark_upgraded( + self, swh_storage, swh_storage_cassandra_keyspace, invoke, mocker + ): + def create_test_table1_script(cql_runner): + cql_runner.execute_with_retries(f"USE {cql_runner.keyspace}", []) + cql_runner.execute_with_retries( + "CREATE TABLE IF NOT EXISTS test_table1 (abc blob PRIMARY KEY);", [] + ) + + def create_test_table2_script(cql_runner): + cql_runner.execute_with_retries(f"USE {cql_runner.keyspace}", []) + cql_runner.execute_with_retries( + "CREATE TABLE IF NOT EXISTS test_table2 (abc blob PRIMARY KEY);", [] + ) + + new_migration1 = Migration( + id="2024-12-19_create_test_table1", + dependencies=set(), + min_read_version="2.9.0", + script=create_test_table1_script, + help="Test migration", + required=False, + ) + new_migration2 = Migration( + id="2024-12-19_create_test_table2", + dependencies={"2024-12-19_create_test_table1"}, + min_read_version="2.9.0", + script=create_test_table2_script, + help="Test migration", + required=False, + ) + mocker.patch( + "swh.storage.cassandra.migrations.MIGRATIONS", + [*MIGRATIONS, new_migration1, new_migration2], + ) + # Pretend the first migration was applied + result = invoke( + "cassandra", "mark-upgraded", "--migration", "2024-12-19_create_test_table1" + ) + assert result.exit_code == 0, result.output + assert ( + result.output + == "Migrations 2024-12-19_create_test_table1 marked as complete.\n" + ) + + # run dependent migration + try: + result = invoke( + "cassandra", "upgrade", "--migration", "2024-12-19_create_test_table2" + ) + assert result.exit_code == 0, result.output + assert result.output == "Done.\n" + + # check the first table still does not exist + with pytest.raises(InvalidRequest): + swh_storage._cql_runner.execute_with_retries( + f"SELECT * FROM {swh_storage_cassandra_keyspace}.test_table1", [] + ) + + # check the second table now exists + swh_storage._cql_runner.execute_with_retries( + f"SELECT * FROM {swh_storage_cassandra_keyspace}.test_table2", [] + ) + finally: + try: + swh_storage._cql_runner.execute_with_retries( + f"DROP TABLE {swh_storage_cassandra_keyspace}.test_table2", [] + ) + except BaseException: + pass diff --git a/swh/storage/tests/test_in_memory.py b/swh/storage/tests/test_in_memory.py index d01782beeb05c642996dbddbedd1cc0e965de5b4..92a1940dd53cc19d6dc81bb901514991e9dcecf9 100644 --- a/swh/storage/tests/test_in_memory.py +++ b/swh/storage/tests/test_in_memory.py @@ -142,6 +142,7 @@ class TestInMemoryStorageGeneratedData(_TestStorageGeneratedData): class TestStorageDeletion(_TestStorageDeletion): def _affected_tables(self) -> List[str]: cassandra_tables = set(TABLES) - { + "migration", "metadata_authority", "metadata_fetcher", "extid",