From bd8e324c9e31524b131f47403b56af8a2198dcb2 Mon Sep 17 00:00:00 2001 From: David Douard <david.douard@sdfa3.org> Date: Fri, 15 Sep 2023 14:14:42 +0200 Subject: [PATCH] Refactor data model to use config_id instead of datastore in xxx_object tables These tables used to reference the datastore the invalid/missing object was found in, but not keeping the config entry, i.e. the checking session during wich the invalid/missing object was found, which can be an issue when more than one checking session is executed on a given datastore. This replaces the `datastore` field of tables `corrupt_object`, `missing_object` and `missing_object_reference` tables by `config_id`. Adapt all the code accordingly. Note that it changes a bit the cli usage: the kafka checker now needs a config entry, thus a kafka checking session can ony target a given object type (i.e. one kafka topic), The migration script will fill the config_id column for corrupt_object using the check_config entry that matches the oject_type (of corrupt_object) and datastore. For missing_object and missing_object_reference, it will use this later table to idenify the check_config entry corresponding object type for the reference_id and datastore, since it is a checking session on this object type that will generate a missing object entry (which is generaaly not of the same type). For the missing_object table, the config_id will use the one extracted from the missing_object_reference (joining on the missing_id column). Note that the migration script will fail if there are rows in one of these tables for which there exists more than one possible config_entry (i.e. with the same object_type and datastore). --- swh/scrubber/cli.py | 81 +++- swh/scrubber/db.py | 215 ++++++---- swh/scrubber/journal_checker.py | 77 ++-- swh/scrubber/sql/30-schema.sql | 18 +- swh/scrubber/sql/60-indexes.sql | 20 +- swh/scrubber/sql/upgrades/7.sql | 139 ++++++ swh/scrubber/storage_checker.py | 30 +- swh/scrubber/tests/conftest.py | 72 +++- .../tests/data/cli/sql/10-init-all.sql | 401 ++++++++++++++++++ swh/scrubber/tests/data/cli/sql/20-data.sql | 77 ++++ .../tests/data/cli/sql/upgrades/006.sql | 1 + .../tests/data/cli/sql/upgrades/007.sql | 1 + swh/scrubber/tests/storage_checker_tests.py | 4 +- swh/scrubber/tests/test_cli.py | 34 +- swh/scrubber/tests/test_fixer.py | 119 ++---- swh/scrubber/tests/test_journal_kafka.py | 75 +++- swh/scrubber/tests/test_migration.py | 258 +++++++++++ swh/scrubber/tests/test_origin_locator.py | 69 ++- swh/scrubber/utils.py | 10 +- 19 files changed, 1397 insertions(+), 304 deletions(-) create mode 100644 swh/scrubber/sql/upgrades/7.sql create mode 100644 swh/scrubber/tests/data/cli/sql/10-init-all.sql create mode 100644 swh/scrubber/tests/data/cli/sql/20-data.sql create mode 120000 swh/scrubber/tests/data/cli/sql/upgrades/006.sql create mode 120000 swh/scrubber/tests/data/cli/sql/upgrades/007.sql create mode 100644 swh/scrubber/tests/test_migration.py diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py index 5fbca07..71fdb8b 100644 --- a/swh/scrubber/cli.py +++ b/swh/scrubber/cli.py @@ -103,6 +103,7 @@ def scrubber_check_cli_group(ctx): @scrubber_check_cli_group.command(name="init") +@click.argument("backend", type=click.Choice(["storage", "journal"])) @click.option( "--object-type", type=click.Choice( @@ -124,6 +125,7 @@ def scrubber_check_cli_group(ctx): @click.pass_context def scrubber_check_init( ctx, + backend: str, object_type: str, nb_partitions: int, name: Optional[str], @@ -147,18 +149,30 @@ def scrubber_check_init( ) conf = ctx.obj["config"] - if "storage" not in conf: - raise click.ClickException( - "You must have a storage configured in your config file." - ) - db = ctx.obj["db"] - from swh.storage import get_storage - from .storage_checker import get_datastore + if backend == "storage": + if "storage" not in conf: + raise click.ClickException( + "You must have a storage configured in your config file." + ) + from swh.storage import get_storage + + from .storage_checker import get_datastore as get_storage_datastore + + datastore = get_storage_datastore(storage=get_storage(**conf["storage"])) + db.datastore_get_or_add(datastore) + elif backend == "journal": + if "journal" not in conf: + raise click.ClickException( + "You must have a journal configured in your config file." + ) + from .journal_checker import get_datastore as get_journal_datastore - datastore = get_datastore(storage=get_storage(**conf["storage"])) - db.datastore_get_or_add(datastore) + datastore = get_journal_datastore(journal_cfg=conf["journal"]) + db.datastore_get_or_add(datastore) + else: + raise click.ClickException(f"Backend type {backend} is not supported") if db.config_get_by_name(name): raise click.ClickException(f"Configuration {name} already exists") @@ -185,7 +199,7 @@ def scrubber_check_list( db = ctx.obj["db"] for id_, cfg in db.config_iter(): - ds = db.datastore_get(cfg.datastore_id) + ds = cfg.datastore if not ds: click.echo( f"[{id_}] {cfg.name}: Invalid configuration entry; datastore not found" @@ -246,7 +260,8 @@ def scrubber_check_stalled( in_flight = list(db.checked_partition_get_stuck(config_id, delay_td)) if in_flight: click.echo( - f"Stuck partitions for {cfg.name} [id={config_id}, type={cfg.object_type}]:" + f"Stuck partitions for {cfg.name} [id={config_id}, " + f"type={cfg.object_type.name.lower()}]:" ) now = datetime.datetime.now(tz=datetime.timezone.utc) for partition, stuck_since in in_flight: @@ -262,7 +277,8 @@ def scrubber_check_stalled( else: click.echo( - f"No stuck partition found for {cfg.name} [id={config_id}, type={cfg.object_type}]" + f"No stuck partition found for {cfg.name} [id={config_id}, " + f"type={cfg.object_type.name.lower()}]" ) @@ -276,6 +292,7 @@ def scrubber_check_stalled( @click.option( "--config-id", type=int, + help="Config ID (is config name is not given as argument)", ) @click.option("--limit", default=0, type=int) @click.pass_context @@ -287,15 +304,15 @@ def scrubber_check_storage( ): """Reads a swh-storage instance, and reports corrupt objects to the scrubber DB. - This runs a single thread; parallelism is achieved by running this command multiple - times. + This runs a single thread; parallelism is achieved by running this command + multiple times. This command references an existing scrubbing configuration (either by name or by id); the configuration holds the object type, number of partitions and the storage configuration this scrubbing session will check on. - All objects of type ``object_type`` are ordered, and split into the given number - of partitions. + All objects of type ``object_type`` are ordered, and split into the given + number of partitions. Then, this process will check all partitions. The status of the ongoing check session is stored in the database, so the number of concurrent @@ -312,10 +329,10 @@ def scrubber_check_storage( from .storage_checker import StorageChecker if name and config_id is None: - from .storage_checker import get_datastore + from .storage_checker import get_datastore as get_storage_datastore cfg = conf["storage"] - datastore = get_datastore(storage=get_storage(**cfg)) + datastore = get_storage_datastore(storage=get_storage(**cfg)) datastore_id = db.datastore_get_or_add(datastore) config_id = db.config_get_by_name(name, datastore_id) elif name is None and config_id is not None: @@ -335,19 +352,45 @@ def scrubber_check_storage( @scrubber_check_cli_group.command(name="journal") +@click.argument( + "name", + type=str, + default=None, + required=False, # can be given by config_id instead +) +@click.option( + "--config-id", + type=int, + help="Config ID (is config name is not given as argument)", +) @click.pass_context -def scrubber_check_journal(ctx) -> None: +def scrubber_check_journal(ctx, name, config_id) -> None: """Reads a complete kafka journal, and reports corrupt objects to the scrubber DB.""" conf = ctx.obj["config"] if "journal" not in conf: ctx.fail("You must have a journal configured in your config file.") + db = ctx.obj["db"] + + if name and config_id is None: + from .journal_checker import get_datastore as get_journal_datastore + + cfg = conf["journal"] + datastore = get_journal_datastore(journal_cfg=cfg) + datastore_id = db.datastore_get_or_add(datastore) + config_id = db.config_get_by_name(name, datastore_id) + elif name is None and config_id is not None: + assert db.config_get(config_id) is not None + + if config_id is None: + raise click.ClickException("A valid configuration name/id must be set") from .journal_checker import JournalChecker checker = JournalChecker( db=ctx.obj["db"], journal=conf["journal"], + config_id=config_id, ) checker.run() diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py index 3de6472..c77e417 100644 --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -32,10 +32,20 @@ class Datastore: """Human readable string.""" +@dataclasses.dataclass(frozen=True) +class ConfigEntry: + """Represents a datastore being scrubbed; eg. swh-storage or swh-journal.""" + + name: str + datastore: Datastore + object_type: ObjectType + nb_partitions: int + + @dataclasses.dataclass(frozen=True) class CorruptObject: id: CoreSWHID - datastore: Datastore + config: ConfigEntry first_occurrence: datetime.datetime object_: bytes @@ -43,7 +53,7 @@ class CorruptObject: @dataclasses.dataclass(frozen=True) class MissingObject: id: CoreSWHID - datastore: Datastore + config: ConfigEntry first_occurrence: datetime.datetime @@ -51,7 +61,7 @@ class MissingObject: class MissingObjectReference: missing_id: CoreSWHID reference_id: CoreSWHID - datastore: Datastore + config: ConfigEntry first_occurrence: datetime.datetime @@ -63,18 +73,8 @@ class FixedObject: recovery_date: Optional[datetime.datetime] = None -@dataclasses.dataclass(frozen=True) -class ConfigEntry: - """Represents a datastore being scrubbed; eg. swh-storage or swh-journal.""" - - name: str - datastore_id: int - object_type: str - nb_partitions: int - - class ScrubberDb(BaseDb): - current_version = 6 + current_version = 7 #################################### # Shared tables @@ -179,9 +179,12 @@ class ScrubberDb(BaseDb): with self.transaction() as cur: cur.execute( """ - SELECT name, datastore, object_type, nb_partitions - FROM check_config - WHERE id=%(config_id)s + SELECT + cc.name, cc.object_type, cc.nb_partitions, + ds.package, ds.class, ds.instance + FROM check_config AS cc + INNER JOIN datastore As ds ON (cc.datastore=ds.id) + WHERE cc.id=%(config_id)s """, { "config_id": config_id, @@ -190,11 +193,13 @@ class ScrubberDb(BaseDb): res = cur.fetchone() if res is None: raise ValueError(f"No config with id {config_id}") - (name, datastore_id, object_type, nb_partitions) = res + (name, object_type, nb_partitions, ds_package, ds_class, ds_instance) = res return ConfigEntry( name=name, - datastore_id=datastore_id, - object_type=object_type, + datastore=Datastore( + package=ds_package, cls=ds_class, instance=ds_instance + ), + object_type=getattr(ObjectType, object_type.upper()), nb_partitions=nb_partitions, ) @@ -226,18 +231,31 @@ class ScrubberDb(BaseDb): with self.transaction() as cur: cur.execute( """ - SELECT id, name, datastore, object_type, nb_partitions - FROM check_config + SELECT + cc.id, cc.name, cc.object_type, cc.nb_partitions, + ds.package, ds.class, ds.instance + FROM check_config AS cc + INNER JOIN datastore AS ds ON (cc.datastore=ds.id) """, ) for row in cur: assert row is not None - (id_, name, datastore_id, object_type, nb_partitions) = row + ( + id_, + name, + object_type, + nb_partitions, + ds_package, + ds_class, + ds_instance, + ) = row yield ( id_, ConfigEntry( name=name, - datastore_id=datastore_id, + datastore=Datastore( + package=ds_package, cls=ds_class, instance=ds_instance + ), object_type=object_type, nb_partitions=nb_partitions, ), @@ -464,18 +482,48 @@ class ScrubberDb(BaseDb): def corrupt_object_add( self, id: CoreSWHID, - datastore: Datastore, + config: ConfigEntry, serialized_object: bytes, ) -> None: - datastore_id = self.datastore_get_or_add(datastore) + config_id = self.config_get_by_name(config.name) + assert config_id is not None with self.transaction() as cur: cur.execute( """ - INSERT INTO corrupt_object (id, datastore, object) + INSERT INTO corrupt_object (id, config_id, object) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING """, - (str(id), datastore_id, serialized_object), + (str(id), config_id, serialized_object), + ) + + def _corrupt_object_list_from_cursor( + self, cur: psycopg2.extensions.cursor + ) -> Iterator[CorruptObject]: + for row in cur: + ( + id, + first_occurrence, + object_, + cc_object_type, + cc_nb_partitions, + cc_name, + ds_package, + ds_class, + ds_instance, + ) = row + yield CorruptObject( + id=CoreSWHID.from_string(id), + first_occurrence=first_occurrence, + object_=object_, + config=ConfigEntry( + name=cc_name, + datastore=Datastore( + package=ds_package, cls=ds_class, instance=ds_instance + ), + object_type=cc_object_type, + nb_partitions=cc_nb_partitions, + ), ) def corrupt_object_iter(self) -> Iterator[CorruptObject]: @@ -485,48 +533,21 @@ class ScrubberDb(BaseDb): """ SELECT co.id, co.first_occurrence, co.object, + cc.object_type, cc.nb_partitions, cc.name, ds.package, ds.class, ds.instance FROM corrupt_object AS co - INNER JOIN datastore AS ds ON (ds.id=co.datastore) + INNER JOIN check_config AS cc ON (cc.id=co.config_id) + INNER JOIN datastore AS ds ON (ds.id=cc.datastore) """ ) - - for row in cur: - (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row - yield CorruptObject( - id=CoreSWHID.from_string(id), - first_occurrence=first_occurrence, - object_=object_, - datastore=Datastore( - package=ds_package, cls=ds_class, instance=ds_instance - ), - ) - - def _corrupt_object_list_from_cursor( - self, cur: psycopg2.extensions.cursor - ) -> List[CorruptObject]: - results = [] - for row in cur: - (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row - results.append( - CorruptObject( - id=CoreSWHID.from_string(id), - first_occurrence=first_occurrence, - object_=object_, - datastore=Datastore( - package=ds_package, cls=ds_class, instance=ds_instance - ), - ) - ) - - return results + yield from self._corrupt_object_list_from_cursor(cur) def corrupt_object_get( self, start_id: CoreSWHID, end_id: CoreSWHID, limit: int = 100, - ) -> List[CorruptObject]: + ) -> Iterator[CorruptObject]: """Yields a page of records in the 'corrupt_object' table, ordered by id. Arguments: @@ -540,9 +561,11 @@ class ScrubberDb(BaseDb): """ SELECT co.id, co.first_occurrence, co.object, + cc.object_type, cc.nb_partitions, cc.name, ds.package, ds.class, ds.instance FROM corrupt_object AS co - INNER JOIN datastore AS ds ON (ds.id=co.datastore) + INNER JOIN check_config AS cc ON (cc.id=co.config_id) + INNER JOIN datastore AS ds ON (ds.id=cc.datastore) WHERE co.id >= %s AND co.id <= %s @@ -551,7 +574,7 @@ class ScrubberDb(BaseDb): """, (str(start_id), str(end_id), limit), ) - return self._corrupt_object_list_from_cursor(cur) + yield from self._corrupt_object_list_from_cursor(cur) def corrupt_object_grab_by_id( self, @@ -559,7 +582,7 @@ class ScrubberDb(BaseDb): start_id: CoreSWHID, end_id: CoreSWHID, limit: int = 100, - ) -> List[CorruptObject]: + ) -> Iterator[CorruptObject]: """Returns a page of records in the 'corrupt_object' table for a fixer, ordered by id @@ -575,9 +598,11 @@ class ScrubberDb(BaseDb): """ SELECT co.id, co.first_occurrence, co.object, + cc.object_type, cc.nb_partitions, cc.name, ds.package, ds.class, ds.instance FROM corrupt_object AS co - INNER JOIN datastore AS ds ON (ds.id=co.datastore) + INNER JOIN check_config AS cc ON (cc.id=co.config_id) + INNER JOIN datastore AS ds ON (ds.id=cc.datastore) WHERE co.id >= %(start_id)s AND co.id <= %(end_id)s @@ -592,7 +617,7 @@ class ScrubberDb(BaseDb): limit=limit, ), ) - return self._corrupt_object_list_from_cursor(cur) + yield from self._corrupt_object_list_from_cursor(cur) def corrupt_object_grab_by_origin( self, @@ -601,7 +626,7 @@ class ScrubberDb(BaseDb): start_id: Optional[CoreSWHID] = None, end_id: Optional[CoreSWHID] = None, limit: int = 100, - ) -> List[CorruptObject]: + ) -> Iterator[CorruptObject]: """Returns a page of records in the 'corrupt_object' table for a fixer, ordered by id @@ -616,9 +641,11 @@ class ScrubberDb(BaseDb): """ SELECT co.id, co.first_occurrence, co.object, + cc.object_type, cc.nb_partitions, cc.name, ds.package, ds.class, ds.instance FROM corrupt_object AS co - INNER JOIN datastore AS ds ON (ds.id=co.datastore) + INNER JOIN check_config AS cc ON (cc.id=co.config_id) + INNER JOIN datastore AS ds ON (ds.id=cc.datastore) INNER JOIN object_origin AS oo ON (oo.object_id=co.id) WHERE (co.id >= %(start_id)s OR %(start_id)s IS NULL) @@ -636,13 +663,13 @@ class ScrubberDb(BaseDb): limit=limit, ), ) - return self._corrupt_object_list_from_cursor(cur) + yield from self._corrupt_object_list_from_cursor(cur) def missing_object_add( self, id: CoreSWHID, reference_ids: Iterable[CoreSWHID], - datastore: Datastore, + config: ConfigEntry, ) -> None: """ Adds a "hole" to the inventory, ie. an object missing from a datastore @@ -661,25 +688,25 @@ class ScrubberDb(BaseDb): """ if not reference_ids: raise ValueError("reference_ids is empty") - datastore_id = self.datastore_get_or_add(datastore) + config_id = self.config_get_by_name(config.name) with self.transaction() as cur: cur.execute( """ - INSERT INTO missing_object (id, datastore) + INSERT INTO missing_object (id, config_id) VALUES (%s, %s) ON CONFLICT DO NOTHING """, - (str(id), datastore_id), + (str(id), config_id), ) psycopg2.extras.execute_batch( cur, """ - INSERT INTO missing_object_reference (missing_id, reference_id, datastore) + INSERT INTO missing_object_reference (missing_id, reference_id, config_id) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING """, [ - (str(id), str(reference_id), datastore_id) + (str(id), str(reference_id), config_id) for reference_id in reference_ids ], ) @@ -691,19 +718,35 @@ class ScrubberDb(BaseDb): """ SELECT mo.id, mo.first_occurrence, + cc.name, cc.object_type, cc.nb_partitions, ds.package, ds.class, ds.instance FROM missing_object AS mo - INNER JOIN datastore AS ds ON (ds.id=mo.datastore) + INNER JOIN check_config AS cc ON (cc.id=mo.config_id) + INNER JOIN datastore AS ds ON (ds.id=cc.datastore) """ ) for row in cur: - (id, first_occurrence, ds_package, ds_class, ds_instance) = row + ( + id, + first_occurrence, + cc_name, + cc_object_type, + cc_nb_partitions, + ds_package, + ds_class, + ds_instance, + ) = row yield MissingObject( id=CoreSWHID.from_string(id), first_occurrence=first_occurrence, - datastore=Datastore( - package=ds_package, cls=ds_class, instance=ds_instance + config=ConfigEntry( + name=cc_name, + object_type=cc_object_type, + nb_partitions=cc_nb_partitions, + datastore=Datastore( + package=ds_package, cls=ds_class, instance=ds_instance + ), ), ) @@ -716,9 +759,11 @@ class ScrubberDb(BaseDb): """ SELECT mor.reference_id, mor.first_occurrence, + cc.name, cc.object_type, cc.nb_partitions, ds.package, ds.class, ds.instance FROM missing_object_reference AS mor - INNER JOIN datastore AS ds ON (ds.id=mor.datastore) + INNER JOIN check_config AS cc ON (cc.id=mor.config_id) + INNER JOIN datastore AS ds ON (ds.id=cc.datastore) WHERE mor.missing_id=%s """, (str(missing_id),), @@ -728,6 +773,9 @@ class ScrubberDb(BaseDb): ( reference_id, first_occurrence, + cc_name, + cc_object_type, + cc_nb_partitions, ds_package, ds_class, ds_instance, @@ -736,8 +784,13 @@ class ScrubberDb(BaseDb): missing_id=missing_id, reference_id=CoreSWHID.from_string(reference_id), first_occurrence=first_occurrence, - datastore=Datastore( - package=ds_package, cls=ds_class, instance=ds_instance + config=ConfigEntry( + name=cc_name, + object_type=cc_object_type, + nb_partitions=cc_nb_partitions, + datastore=Datastore( + package=ds_package, cls=ds_class, instance=ds_instance + ), ), ) diff --git a/swh/scrubber/journal_checker.py b/swh/scrubber/journal_checker.py index f251bdf..1132864 100644 --- a/swh/scrubber/journal_checker.py +++ b/swh/scrubber/journal_checker.py @@ -5,53 +5,79 @@ """Reads all objects in a swh-storage instance and recomputes their checksums.""" +import json import logging -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from swh.journal.client import get_journal_client from swh.journal.serializers import kafka_to_value from swh.model import model -from .db import Datastore, ScrubberDb +from .db import ConfigEntry, Datastore, ScrubberDb logger = logging.getLogger(__name__) +def get_datastore(journal_cfg) -> Datastore: + if journal_cfg.get("cls") == "kafka": + datastore = Datastore( + package="journal", + cls="kafka", + instance=json.dumps( + { + "brokers": journal_cfg["brokers"], + "group_id": journal_cfg["group_id"], + "prefix": journal_cfg["prefix"], + } + ), + ) + else: + raise NotImplementedError( + f"JournalChecker(journal={journal_cfg!r}).datastore()" + ) + return datastore + + class JournalChecker: """Reads a chunk of a swh-storage database, recomputes checksums, and reports errors in a separate database.""" - _datastore = None + _config: Optional[ConfigEntry] = None + _datastore: Optional[Datastore] = None - def __init__(self, db: ScrubberDb, journal: Dict[str, Any]): + def __init__(self, db: ScrubberDb, config_id: int, journal: Dict[str, Any]): self.db = db - self.journal_client_config = journal + self.config_id = config_id + + self.journal_client_config = journal.copy() + if "object_types" in self.journal_client_config: + raise ValueError( + "The journal_client configuration entry should not define the " + "object_types field; this is handled by the scrubber configuration entry" + ) + self.journal_client_config["object_types"] = [ + self.config.object_type.name.lower() + ] self.journal_client = get_journal_client( - **journal, + **self.journal_client_config, # Remove default deserializer; so process_kafka_values() gets the message # verbatim so it can archive it with as few modifications a possible. value_deserializer=lambda obj_type, msg: msg, ) - def datastore_info(self) -> Datastore: + @property + def config(self) -> ConfigEntry: + if self._config is None: + self._config = self.db.config_get(self.config_id) + + assert self._config is not None + return self._config + + @property + def datastore(self) -> Datastore: """Returns a :class:`Datastore` instance representing the journal instance being checked.""" - if self._datastore is None: - config = self.journal_client_config - if config["cls"] == "kafka": - self._datastore = Datastore( - package="journal", - cls="kafka", - instance=( - f"brokers={config['brokers']!r} prefix={config['prefix']!r}" - ), - ) - else: - raise NotImplementedError( - f"StorageChecker(journal_client={self.journal_client_config!r})" - f".datastore()" - ) - return self._datastore + return self.config.datastore def run(self): """Runs a journal client with the given configuration. @@ -61,11 +87,10 @@ class JournalChecker: def process_kafka_messages(self, all_messages: Dict[str, List[bytes]]): for (object_type, messages) in all_messages.items(): + logger.debug("Processing %s %s", len(messages), object_type) cls = getattr(model, object_type.capitalize()) for message in messages: object_ = cls.from_dict(kafka_to_value(message)) real_id = object_.compute_hash() if object_.id != real_id: - self.db.corrupt_object_add( - object_.swhid(), self.datastore_info(), message - ) + self.db.corrupt_object_add(object_.swhid(), self.config, message) diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql index c19802c..c6d6001 100644 --- a/swh/scrubber/sql/30-schema.sql +++ b/swh/scrubber/sql/30-schema.sql @@ -63,13 +63,13 @@ comment on column checked_partition.end_date is 'Date the last scrub ended of th create table corrupt_object ( id swhid not null, - datastore int not null, object bytea not null, - first_occurrence timestamptz not null default now() + first_occurrence timestamptz not null default now(), + config_id int not null ); comment on table corrupt_object is 'Each row identifies an object that was found to be corrupt'; -comment on column corrupt_object.datastore is 'Datastore the corrupt object was found in.'; +comment on column corrupt_object.config_id is 'The check configuration the corrupt object was found in.'; comment on column corrupt_object.object is 'Corrupt object, as found in the datastore (possibly msgpack-encoded, using the journal''s serializer)'; comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time'; @@ -77,26 +77,26 @@ comment on column corrupt_object.first_occurrence is 'Moment the object was foun create table missing_object ( id swhid not null, - datastore int not null, - first_occurrence timestamptz not null default now() + first_occurrence timestamptz not null default now(), + config_id int not null ); comment on table missing_object is 'Each row identifies an object that are missing but referenced by another object (aka "holes")'; -comment on column missing_object.datastore is 'Datastore where the hole is.'; +comment on column missing_object.config_id is 'Check configuration where the hole was found.'; comment on column missing_object.first_occurrence is 'Moment the object was found to be corrupt for the first time'; create table missing_object_reference ( missing_id swhid not null, reference_id swhid not null, - datastore int not null, - first_occurrence timestamptz not null default now() + first_occurrence timestamptz not null default now(), + config_id int not null ); comment on table missing_object_reference is 'Each row identifies an object that points to an object that does not exist (aka a "hole")'; comment on column missing_object_reference.missing_id is 'SWHID of the missing object.'; comment on column missing_object_reference.reference_id is 'SWHID of the object referencing the missing object.'; -comment on column missing_object_reference.datastore is 'Datastore where the referencing object is.'; +comment on column missing_object_reference.config_id is 'Check configuration for which the referencing object is.'; comment on column missing_object_reference.first_occurrence is 'Moment the object was found to reference a missing object'; diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql index 152691b..2ddca9f 100644 --- a/swh/scrubber/sql/60-indexes.sql +++ b/swh/scrubber/sql/60-indexes.sql @@ -30,29 +30,29 @@ alter table checked_partition add primary key using index checked_partition_pkey -- corrupt_object -alter table corrupt_object add constraint corrupt_object_datastore_fkey foreign key (datastore) references datastore(id) not valid; -alter table corrupt_object validate constraint corrupt_object_datastore_fkey; +alter table corrupt_object add constraint corrupt_object_config_fkey foreign key (config_id) references check_config(id) not valid; +alter table corrupt_object validate constraint corrupt_object_config_fkey; -create unique index corrupt_object_pkey on corrupt_object(id, datastore); +create unique index corrupt_object_pkey on corrupt_object(id, config_id); alter table corrupt_object add primary key using index corrupt_object_pkey; -- missing_object -alter table missing_object add constraint missing_object_datastore_fkey foreign key (datastore) references datastore(id) not valid; -alter table missing_object validate constraint missing_object_datastore_fkey; +alter table missing_object add constraint missing_object_config_fkey foreign key (config_id) references check_config(id) not valid; +alter table missing_object validate constraint missing_object_config_fkey; -create unique index missing_object_pkey on missing_object(id, datastore); +create unique index missing_object_pkey on missing_object(id, config_id); alter table missing_object add primary key using index missing_object_pkey; -- missing_object_reference -alter table missing_object_reference add constraint missing_object_reference_datastore_fkey foreign key (datastore) references datastore(id) not valid; -alter table missing_object_reference validate constraint missing_object_reference_datastore_fkey; +alter table missing_object_reference add constraint missing_object_reference_config_fkey foreign key (config_id) references check_config(id) not valid; +alter table missing_object_reference validate constraint missing_object_reference_config_fkey; -create unique index missing_object_reference_missing_id_reference_id_datastore on missing_object_reference(missing_id, reference_id, datastore); -create unique index missing_object_reference_reference_id_missing_id_datastore on missing_object_reference(reference_id, missing_id, datastore); +create unique index missing_object_reference_missing_id_reference_id_config on missing_object_reference(missing_id, reference_id, config_id); +create unique index missing_object_reference_reference_id_missing_id_config on missing_object_reference(reference_id, missing_id, config_id); ------------------------------------- -- Issue resolution diff --git a/swh/scrubber/sql/upgrades/7.sql b/swh/scrubber/sql/upgrades/7.sql new file mode 100644 index 0000000..5aa4e11 --- /dev/null +++ b/swh/scrubber/sql/upgrades/7.sql @@ -0,0 +1,139 @@ +-- SWH Scrubber DB schema upgrade +-- from_version: 6 +-- to_version: 7 +-- description: Replace datastore column by a config_id one in missing_object, +-- corrupt_object and missing_object_reference + +--- First, we look if there are datastores used by several check_config entries +--- (for a given object type); If there are, we cannot automatically upgrade +--- the DB, since we cannot choose the config_id to use in missing_object, +--- corrupt_object and missing_object_reference tables (in place of datastore) +create temporary table invalid_configs ( + datastore integer, + object_type text, + n_cfg bigint, + n_rows bigint + ); + +with + m as ( + select * from (values ('swh:1:rev:', 'revision'), + ('swh:1:rel:', 'release'), + ('swh:1:dir:', 'directory'), + ('swh:1:cnt:', 'content')) + as m (prefix, object_type) + ), + dup_cfg as ( + select count(*) as n, datastore, object_type + from check_config + group by 2, 3 + having (count(*) > 1) + ), + mo as ( + select id, substring(id, 0, 11) as prefix, datastore + from missing_object + union + select reference_id as id, substring(reference_id, 0, 11) as prefix, datastore + from missing_object_reference + union + select id, substring(id, 0, 11) as prefix, datastore + from corrupt_object + ) + +insert into invalid_configs +select mo.datastore, m.object_type, dup_cfg.n as n_configs, count(mo.id) +from mo + inner join m on ( mo.prefix = m.prefix) + inner join dup_cfg on (mo.datastore=dup_cfg.datastore) + +group by 1, 2, 3; + + +select count(*)>0 as found_invalid from invalid_configs \gset + +select * from invalid_configs; + +\if :found_invalid +\warn 'Found datastores used by several config check sessions.' +\warn 'Sorry, you need to sort this by hand...' +--- Did not find an elegant way of stopping the migration script here... +-- so let's generate a syntax error... +fail +\else +\echo 'Seems each datastore is used in only one config_check, let''s continue' +\endif + + +--- Now we should be ok to do the actual migration: +--- 1. add config_id columns to the xxx_object tables +--- 2. fill the column (there should be only one possible config_id per row now) +--- 3. drop the datastore column + +create temporary table swhid_map (prefix text, object_type object_type); +insert into swhid_map values + ('swh:1:rev:', 'revision'), + ('swh:1:rel:', 'release'), + ('swh:1:dir:', 'directory'), + ('swh:1:cnt:', 'content'), + ('swh:1:snp:', 'snapshot') +; + +alter table corrupt_object +add column config_id int; + +update corrupt_object as upd +set config_id = cc.id +from check_config as cc +inner join swhid_map on (swhid_map.object_type = cc.object_type) +where substring(upd.id, 0, 11) = swhid_map.prefix + and cc.datastore = upd.datastore +; + +alter table missing_object_reference +add column config_id int; + +update missing_object_reference as upd +set config_id = cc.id +from check_config as cc +inner join swhid_map on (swhid_map.object_type = cc.object_type) +where substring(upd.reference_id, 0, 11) = swhid_map.prefix + and cc.datastore = upd.datastore +; + +alter table missing_object +add column config_id int; + +update missing_object as upd +set config_id = mor.config_id +from missing_object_reference as mor +where upd.id = mor.missing_id and upd.datastore = mor.datastore +; + +-- now we can remove the datastore column for these tables + +alter table corrupt_object +drop column datastore; +alter table missing_object +drop column datastore; +alter table missing_object_reference +drop column datastore; + +-- and restore indexes and foreign key validation + +alter table corrupt_object add constraint corrupt_object_config_fkey foreign key (config_id) references check_config(id) not valid; +alter table corrupt_object validate constraint corrupt_object_config_fkey; + +create unique index corrupt_object_pkey on corrupt_object(id, config_id); +alter table corrupt_object add primary key using index corrupt_object_pkey; + +alter table missing_object add constraint missing_object_config_fkey foreign key (config_id) references check_config(id) not valid; +alter table missing_object validate constraint missing_object_config_fkey; + +create unique index missing_object_pkey on missing_object(id, config_id); +alter table missing_object add primary key using index missing_object_pkey; + +alter table missing_object_reference add constraint missing_object_reference_config_fkey foreign key (config_id) references check_config(id) not valid; +alter table missing_object_reference validate constraint missing_object_reference_config_fkey; + +create unique index missing_object_reference_missing_id_reference_id_config on missing_object_reference(missing_id, reference_id, config_id); +create unique index missing_object_reference_reference_id_missing_id_config on missing_object_reference(reference_id, missing_id, config_id); diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py index aae2805..33b7fc3 100644 --- a/swh/scrubber/storage_checker.py +++ b/swh/scrubber/storage_checker.py @@ -126,7 +126,6 @@ class StorageChecker: self.limit = limit self._config: Optional[ConfigEntry] = None - self._datastore: Optional[Datastore] = None self._statsd: Optional[Statsd] = None @property @@ -142,18 +141,14 @@ class StorageChecker: return self.config.nb_partitions @property - def object_type(self) -> str: + def object_type(self) -> swhids.ObjectType: return self.config.object_type @property def datastore(self) -> Datastore: """Returns a :class:`Datastore` instance representing the swh-storage instance being checked.""" - if self._datastore is None: - self._datastore = get_datastore(self.storage) - datastore_id = self.db.datastore_get_or_add(self._datastore) - assert self.config.datastore_id == datastore_id - return self._datastore + return self.config.datastore @property def statsd(self) -> Statsd: @@ -173,7 +168,6 @@ class StorageChecker: """Runs on all objects of ``object_type`` in a partition between ``start_partition_id`` (inclusive) and ``end_partition_id`` (exclusive) """ - object_type = getattr(swhids.ObjectType, self.object_type.upper()) counter: Iterable[int] = count() if self.limit: counter = islice(counter, 0, self.limit) @@ -187,7 +181,7 @@ class StorageChecker: self.nb_partitions, ) - self._check_partition(object_type, partition_id) + self._check_partition(self.object_type, partition_id) self.db.checked_partition_upsert( self.config_id, @@ -204,7 +198,9 @@ class StorageChecker: page_token = None while True: if object_type in (swhids.ObjectType.RELEASE, swhids.ObjectType.REVISION): - method = getattr(self.storage, f"{self.object_type}_get_partition") + method = getattr( + self.storage, f"{self.object_type.name.lower()}_get_partition" + ) page = method(partition_id, self.nb_partitions, page_token=page_token) objects = page.results elif object_type == swhids.ObjectType.DIRECTORY: @@ -225,7 +221,7 @@ class StorageChecker: self.statsd.increment("duplicate_directory_entries_total") self.db.corrupt_object_add( object_.swhid(), - self.datastore, + self.config, value_to_kafka(object_.to_dict()), ) objects.append(object_) @@ -271,7 +267,7 @@ class StorageChecker: self.statsd.increment("hash_mismatch_total") self.db.corrupt_object_add( object_.swhid(), - self.datastore, + self.config, value_to_kafka(object_.to_dict()), ) if count: @@ -380,7 +376,7 @@ class StorageChecker: object_type=swhids.ObjectType.CONTENT, object_id=missing_id ) self.db.missing_object_add( - missing_swhid, cnt_references[missing_id], self.datastore + missing_swhid, cnt_references[missing_id], self.config ) for missing_id in missing_dirs: @@ -388,7 +384,7 @@ class StorageChecker: object_type=swhids.ObjectType.DIRECTORY, object_id=missing_id ) self.db.missing_object_add( - missing_swhid, dir_references[missing_id], self.datastore + missing_swhid, dir_references[missing_id], self.config ) for missing_id in missing_revs: @@ -396,7 +392,7 @@ class StorageChecker: object_type=swhids.ObjectType.REVISION, object_id=missing_id ) self.db.missing_object_add( - missing_swhid, rev_references[missing_id], self.datastore + missing_swhid, rev_references[missing_id], self.config ) for missing_id in missing_rels: @@ -404,7 +400,7 @@ class StorageChecker: object_type=swhids.ObjectType.RELEASE, object_id=missing_id ) self.db.missing_object_add( - missing_swhid, rel_references[missing_id], self.datastore + missing_swhid, rel_references[missing_id], self.config ) for missing_id in missing_snps: @@ -412,5 +408,5 @@ class StorageChecker: object_type=swhids.ObjectType.SNAPSHOT, object_id=missing_id ) self.db.missing_object_add( - missing_swhid, snp_references[missing_id], self.datastore + missing_swhid, snp_references[missing_id], self.config ) diff --git a/swh/scrubber/tests/conftest.py b/swh/scrubber/tests/conftest.py index b398838..c8997fb 100644 --- a/swh/scrubber/tests/conftest.py +++ b/swh/scrubber/tests/conftest.py @@ -3,14 +3,20 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime from functools import partial +import attr import pytest from pytest_postgresql import factories from swh.core.db.db_utils import initialize_database_for_module +from swh.journal.serializers import value_to_kafka +from swh.model.hashutil import hash_to_bytes +from swh.model.model import Directory, DirectoryEntry from swh.model.swhids import ObjectType -from swh.scrubber.db import Datastore, ScrubberDb +from swh.model.tests.swh_model_data import DIRECTORIES +from swh.scrubber.db import ConfigEntry, CorruptObject, Datastore, ScrubberDb scrubber_postgresql_proc = factories.postgresql_proc( load=[partial(initialize_database_for_module, modname="scrubber", version=6)], @@ -21,6 +27,53 @@ postgresql_scrubber = factories.postgresql("scrubber_postgresql_proc") OBJECT_TYPE = ObjectType.DIRECTORY PARTITION_ID = 2 NB_PARTITIONS = 64 +DIRECTORY = [dir_ for dir_ in DIRECTORIES if len(dir_.entries) > 1][0] +ORIGINAL_DIRECTORY = Directory( + entries=( + DirectoryEntry( + name=b"dir1", + type="dir", + target=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), + perms=0o040755, + ), + DirectoryEntry( + name=b"file1.ext", + type="file", + target=hash_to_bytes("86bc6b377e9d25f9d26777a4a28d08e63e7c5779"), + perms=0o644, + ), + DirectoryEntry( + name=b"subprepo1", + type="rev", + target=hash_to_bytes("c7f96242d73c267adc77c2908e64e0c1cb6a4431"), + perms=0o160000, + ), + ), + raw_manifest=( + b"tree 102\x00" + b"160000 subprepo1\x00\xc7\xf9bB\xd7<&z\xdcw\xc2\x90\x8ed\xe0\xc1\xcbjD1" + b"644 file1.ext\x00\x86\xbck7~\x9d%\xf9\xd2gw\xa4\xa2\x8d\x08\xe6>|Wy" + b"40755 dir1\x00K\x82]\xc6B\xcbn\xb9\xa0`\xe5K\xf8\xd6\x92\x88\xfb\xeeI\x04" + ), +) + +# A directory with its entries in canonical order, but a hash computed as if +# computed in the reverse order. +# This happens when entries get normalized (either by the loader or accidentally +# in swh-storage) +CORRUPT_DIRECTORY = attr.evolve(ORIGINAL_DIRECTORY, raw_manifest=None) +assert ORIGINAL_DIRECTORY != CORRUPT_DIRECTORY +assert ( + hash_to_bytes("61992617462fff81509bda4a24b54c96ea74a007") + == ORIGINAL_DIRECTORY.id + == CORRUPT_DIRECTORY.id +) +assert ( + hash_to_bytes("81fda5b242e65fc81201e590d0f0ce5f582fbcdd") + == CORRUPT_DIRECTORY.compute_hash() + != CORRUPT_DIRECTORY.id +) +assert ORIGINAL_DIRECTORY.entries == CORRUPT_DIRECTORY.entries @pytest.fixture @@ -38,7 +91,22 @@ def scrubber_db(postgresql_scrubber): @pytest.fixture -def config_id(scrubber_db, datastore): +def config_id(scrubber_db, datastore) -> int: return scrubber_db.config_add( f"cfg_{OBJECT_TYPE}_{NB_PARTITIONS}", datastore, OBJECT_TYPE, NB_PARTITIONS ) + + +@pytest.fixture +def config_entry(scrubber_db, config_id) -> ConfigEntry: + return scrubber_db.config_get(config_id) + + +@pytest.fixture +def corrupt_object(scrubber_db, config_entry): + return CorruptObject( + id=ORIGINAL_DIRECTORY.swhid(), + config=config_entry, + first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc), + object_=value_to_kafka(CORRUPT_DIRECTORY.to_dict()), + ) diff --git a/swh/scrubber/tests/data/cli/sql/10-init-all.sql b/swh/scrubber/tests/data/cli/sql/10-init-all.sql new file mode 100644 index 0000000..f2ce92c --- /dev/null +++ b/swh/scrubber/tests/data/cli/sql/10-init-all.sql @@ -0,0 +1,401 @@ +-- +-- PostgreSQL database dump +-- + +-- Dumped from database version 16.0 (Debian 16.0-1.pgdg120+1) +-- Dumped by pg_dump version 16.0 (Debian 16.0-1.pgdg120+1) + +SET statement_timeout = 0; +SET lock_timeout = 0; +SET idle_in_transaction_session_timeout = 0; +SET client_encoding = 'UTF8'; +SET standard_conforming_strings = on; +SELECT pg_catalog.set_config('search_path', '', false); +SET check_function_bodies = false; +SET xmloption = content; +SET client_min_messages = warning; +SET row_security = off; + +-- +-- Name: datastore_type; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.datastore_type AS ENUM ( + 'storage', + 'journal', + 'objstorage' +); + + +-- +-- Name: object_type; Type: TYPE; Schema: public; Owner: - +-- + +CREATE TYPE public.object_type AS ENUM ( + 'content', + 'directory', + 'revision', + 'release', + 'snapshot', + 'extid', + 'raw_extrinsic_metadata' +); + + +-- +-- Name: swhid; Type: DOMAIN; Schema: public; Owner: - +-- + +CREATE DOMAIN public.swhid AS text + CONSTRAINT swhid_check CHECK ((VALUE ~ '^swh:[0-9]+:.*'::text)); + + +SET default_tablespace = ''; + +SET default_table_access_method = heap; + +-- +-- Name: check_config; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.check_config ( + id integer NOT NULL, + datastore integer NOT NULL, + object_type public.object_type NOT NULL, + nb_partitions bigint NOT NULL, + name text, + comment text +); + + +-- +-- Name: check_config_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.check_config_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: check_config_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.check_config_id_seq OWNED BY public.check_config.id; + + +-- +-- Name: checked_partition; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.checked_partition ( + config_id integer NOT NULL, + partition_id bigint NOT NULL, + start_date timestamp with time zone, + end_date timestamp with time zone +); + + +-- +-- Name: corrupt_object; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.corrupt_object ( + id public.swhid NOT NULL, + datastore integer NOT NULL, + object bytea NOT NULL, + first_occurrence timestamp with time zone DEFAULT now() NOT NULL +); + + +-- +-- Name: datastore; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.datastore ( + id integer NOT NULL, + package public.datastore_type NOT NULL, + class text, + instance text +); + + +-- +-- Name: datastore_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.datastore_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: datastore_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.datastore_id_seq OWNED BY public.datastore.id; + + +-- +-- Name: fixed_object; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.fixed_object ( + id public.swhid NOT NULL, + object bytea NOT NULL, + method text, + recovery_date timestamp with time zone DEFAULT now() NOT NULL +); + + +-- +-- Name: missing_object; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.missing_object ( + id public.swhid NOT NULL, + datastore integer NOT NULL, + first_occurrence timestamp with time zone DEFAULT now() NOT NULL +); + + +-- +-- Name: missing_object_reference; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.missing_object_reference ( + missing_id public.swhid NOT NULL, + reference_id public.swhid NOT NULL, + datastore integer NOT NULL, + first_occurrence timestamp with time zone DEFAULT now() NOT NULL +); + + +-- +-- Name: object_origin; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.object_origin ( + object_id public.swhid NOT NULL, + origin_url text NOT NULL, + last_attempt timestamp with time zone +); + + +-- +-- Name: check_config id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.check_config ALTER COLUMN id SET DEFAULT nextval('public.check_config_id_seq'::regclass); + + +-- +-- Name: datastore id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.datastore ALTER COLUMN id SET DEFAULT nextval('public.datastore_id_seq'::regclass); + + +-- +-- Data for Name: check_config; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.check_config (id, datastore, object_type, nb_partitions, name, comment) FROM stdin; +\. + + +-- +-- Data for Name: checked_partition; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.checked_partition (config_id, partition_id, start_date, end_date) FROM stdin; +\. + + +-- +-- Data for Name: corrupt_object; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.corrupt_object (id, datastore, object, first_occurrence) FROM stdin; +\. + + +-- +-- Data for Name: datastore; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.datastore (id, package, class, instance) FROM stdin; +\. + + +-- +-- Data for Name: fixed_object; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.fixed_object (id, object, method, recovery_date) FROM stdin; +\. + + +-- +-- Data for Name: missing_object; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.missing_object (id, datastore, first_occurrence) FROM stdin; +\. + + +-- +-- Data for Name: missing_object_reference; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.missing_object_reference (missing_id, reference_id, datastore, first_occurrence) FROM stdin; +\. + + +-- +-- Data for Name: object_origin; Type: TABLE DATA; Schema: public; Owner: - +-- + +COPY public.object_origin (object_id, origin_url, last_attempt) FROM stdin; +\. + + +-- +-- Name: check_config_id_seq; Type: SEQUENCE SET; Schema: public; Owner: - +-- + +SELECT pg_catalog.setval('public.check_config_id_seq', 1, false); + + +-- +-- Name: datastore_id_seq; Type: SEQUENCE SET; Schema: public; Owner: - +-- + +SELECT pg_catalog.setval('public.datastore_id_seq', 1, false); + + +-- +-- Name: check_config check_config_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.check_config + ADD CONSTRAINT check_config_pkey PRIMARY KEY (id); + + +-- +-- Name: checked_partition checked_partition_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.checked_partition + ADD CONSTRAINT checked_partition_pkey PRIMARY KEY (config_id, partition_id); + + +-- +-- Name: corrupt_object corrupt_object_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.corrupt_object + ADD CONSTRAINT corrupt_object_pkey PRIMARY KEY (id, datastore); + + +-- +-- Name: datastore datastore_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.datastore + ADD CONSTRAINT datastore_pkey PRIMARY KEY (id); + + +-- +-- Name: fixed_object fixed_object_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.fixed_object + ADD CONSTRAINT fixed_object_pkey PRIMARY KEY (id); + + +-- +-- Name: missing_object missing_object_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.missing_object + ADD CONSTRAINT missing_object_pkey PRIMARY KEY (id, datastore); + + +-- +-- Name: check_config_unicity_idx; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX check_config_unicity_idx ON public.check_config USING btree (datastore, object_type, nb_partitions); + + +-- +-- Name: datastore_package_class_instance; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX datastore_package_class_instance ON public.datastore USING btree (package, class, instance); + + +-- +-- Name: missing_object_reference_missing_id_reference_id_datastore; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX missing_object_reference_missing_id_reference_id_datastore ON public.missing_object_reference USING btree (missing_id, reference_id, datastore); + + +-- +-- Name: missing_object_reference_reference_id_missing_id_datastore; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX missing_object_reference_reference_id_missing_id_datastore ON public.missing_object_reference USING btree (reference_id, missing_id, datastore); + + +-- +-- Name: object_origin_by_origin; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX object_origin_by_origin ON public.object_origin USING btree (origin_url, object_id); + + +-- +-- Name: object_origin_pkey; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX object_origin_pkey ON public.object_origin USING btree (object_id, origin_url); + + +-- +-- Name: corrupt_object corrupt_object_datastore_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.corrupt_object + ADD CONSTRAINT corrupt_object_datastore_fkey FOREIGN KEY (datastore) REFERENCES public.datastore(id); + + +-- +-- Name: missing_object missing_object_datastore_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.missing_object + ADD CONSTRAINT missing_object_datastore_fkey FOREIGN KEY (datastore) REFERENCES public.datastore(id); + + +-- +-- Name: missing_object_reference missing_object_reference_datastore_fkey; Type: FK CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.missing_object_reference + ADD CONSTRAINT missing_object_reference_datastore_fkey FOREIGN KEY (datastore) REFERENCES public.datastore(id); + + +-- +-- PostgreSQL database dump complete +-- diff --git a/swh/scrubber/tests/data/cli/sql/20-data.sql b/swh/scrubber/tests/data/cli/sql/20-data.sql new file mode 100644 index 0000000..b36fac7 --- /dev/null +++ b/swh/scrubber/tests/data/cli/sql/20-data.sql @@ -0,0 +1,77 @@ +-- + +insert into datastore(package, class, instance) values + ('storage', 'postgresql', '5432'), + ('storage', 'postgresql', '5433'), + ('storage', 'postgresql', '5434') +; + +insert into check_config(datastore, object_type, nb_partitions, name) values + (1, 'release', 16, 'ds1-release-16'), + (1, 'revision', 16, 'ds1-revision-16'), + (1, 'snapshot', 16, 'ds1-snapshot-16'), + (2, 'release', 16, 'ds2-release-16'), + (2, 'snapshot', 16, 'ds2-snapshot-16'), + (3, 'release', 16, 'ds3-release-16'), + (3, 'release', 32, 'ds3-release-32') +; + +insert into corrupt_object(id, datastore, object) values + ('swh:1:rel:0000000000000000000000000000000000000000', 1, '\x00'), + ('swh:1:rel:0000000000000000000000000000000000000001', 1, '\x00'), + ('swh:1:rel:0000000000000000000000000000000000000002', 1, '\x00'), + ('swh:1:rel:0000000000000000000000000000000000000003', 1, '\x00'), + ('swh:1:rel:0000000000000000000000000000000000000004', 1, '\x00'), + + ('swh:1:rev:0000000000000000000000000000000000000000', 1, '\x00'), + ('swh:1:rev:0000000000000000000000000000000000000001', 1, '\x00'), + ('swh:1:rev:0000000000000000000000000000000000000002', 1, '\x00'), + ('swh:1:rev:0000000000000000000000000000000000000003', 1, '\x00'), + ('swh:1:rev:0000000000000000000000000000000000000004', 1, '\x00'), + + ('swh:1:rel:0000000000000000000000000000000000000000', 2, '\x00'), + ('swh:1:rel:0000000000000000000000000000000000000001', 2, '\x00'), + ('swh:1:rel:0000000000000000000000000000000000000002', 2, '\x00'), + ('swh:1:rel:0000000000000000000000000000000000000003', 2, '\x00'), + ('swh:1:rel:0000000000000000000000000000000000000004', 2, '\x00') +; + +insert into missing_object(id, datastore) values + ('swh:1:rel:0000000000000000000000000000000000000000', 1), + ('swh:1:rel:0000000000000000000000000000000000000001', 1), + ('swh:1:rel:0000000000000000000000000000000000000002', 1), + ('swh:1:rel:0000000000000000000000000000000000000003', 1), + ('swh:1:rel:0000000000000000000000000000000000000004', 1), + + ('swh:1:rev:0000000000000000000000000000000000000000', 1), + ('swh:1:rev:0000000000000000000000000000000000000001', 1), + ('swh:1:rev:0000000000000000000000000000000000000002', 1), + ('swh:1:rev:0000000000000000000000000000000000000003', 1), + ('swh:1:rev:0000000000000000000000000000000000000004', 1), + + ('swh:1:rel:0000000000000000000000000000000000000000', 2), + ('swh:1:rel:0000000000000000000000000000000000000001', 2), + ('swh:1:rel:0000000000000000000000000000000000000002', 2), + ('swh:1:rel:0000000000000000000000000000000000000003', 2), + ('swh:1:rel:0000000000000000000000000000000000000004', 2) +; + +insert into missing_object_reference(missing_id, reference_id, datastore) values + ('swh:1:rel:0000000000000000000000000000000000000000', 'swh:1:snp:0000000000000000000000000000000000000000', 1), + ('swh:1:rel:0000000000000000000000000000000000000001', 'swh:1:snp:0000000000000000000000000000000000000001', 1), + ('swh:1:rel:0000000000000000000000000000000000000002', 'swh:1:snp:0000000000000000000000000000000000000002', 1), + ('swh:1:rel:0000000000000000000000000000000000000003', 'swh:1:snp:0000000000000000000000000000000000000003', 1), + ('swh:1:rel:0000000000000000000000000000000000000004', 'swh:1:snp:0000000000000000000000000000000000000004', 1), + + ('swh:1:rev:0000000000000000000000000000000000000000', 'swh:1:snp:0000000000000000000000000000000000000000', 1), + ('swh:1:rev:0000000000000000000000000000000000000001', 'swh:1:snp:0000000000000000000000000000000000000001', 1), + ('swh:1:rev:0000000000000000000000000000000000000002', 'swh:1:snp:0000000000000000000000000000000000000002', 1), + ('swh:1:rev:0000000000000000000000000000000000000003', 'swh:1:snp:0000000000000000000000000000000000000003', 1), + ('swh:1:rev:0000000000000000000000000000000000000004', 'swh:1:snp:0000000000000000000000000000000000000004', 1), + + ('swh:1:rel:0000000000000000000000000000000000000000', 'swh:1:snp:0000000000000000000000000000000000000000', 2), + ('swh:1:rel:0000000000000000000000000000000000000001', 'swh:1:snp:0000000000000000000000000000000000000001', 2), + ('swh:1:rel:0000000000000000000000000000000000000002', 'swh:1:snp:0000000000000000000000000000000000000002', 2), + ('swh:1:rel:0000000000000000000000000000000000000003', 'swh:1:snp:0000000000000000000000000000000000000003', 2), + ('swh:1:rel:0000000000000000000000000000000000000004', 'swh:1:snp:0000000000000000000000000000000000000004', 2) +; diff --git a/swh/scrubber/tests/data/cli/sql/upgrades/006.sql b/swh/scrubber/tests/data/cli/sql/upgrades/006.sql new file mode 120000 index 0000000..20dc989 --- /dev/null +++ b/swh/scrubber/tests/data/cli/sql/upgrades/006.sql @@ -0,0 +1 @@ +../../../../../sql/upgrades/6.sql \ No newline at end of file diff --git a/swh/scrubber/tests/data/cli/sql/upgrades/007.sql b/swh/scrubber/tests/data/cli/sql/upgrades/007.sql new file mode 120000 index 0000000..56a7d8b --- /dev/null +++ b/swh/scrubber/tests/data/cli/sql/upgrades/007.sql @@ -0,0 +1 @@ +../../../../../sql/upgrades/7.sql \ No newline at end of file diff --git a/swh/scrubber/tests/storage_checker_tests.py b/swh/scrubber/tests/storage_checker_tests.py index 66e5afe..6150f10 100644 --- a/swh/scrubber/tests/storage_checker_tests.py +++ b/swh/scrubber/tests/storage_checker_tests.py @@ -149,7 +149,7 @@ def test_corrupt_snapshot(scrubber_db, datastore, swh_storage, corrupt_idx): assert corrupt_objects[0].id == swhids.CoreSWHID.from_string( "swh:1:snp:0000000000000000000000000000000000000000" ) - assert corrupt_objects[0].datastore == datastore + assert corrupt_objects[0].config.datastore == datastore assert ( before_date - datetime.timedelta(seconds=5) <= corrupt_objects[0].first_occurrence @@ -291,7 +291,7 @@ def test_directory_duplicate_entries(scrubber_db, datastore, swh_storage): corrupt_objects = list(scrubber_db.corrupt_object_iter()) assert len(corrupt_objects) == 1 assert corrupt_objects[0].id == invalid_directory.swhid() - assert corrupt_objects[0].datastore == datastore + assert corrupt_objects[0].config.datastore == datastore assert ( before_date - datetime.timedelta(seconds=5) <= corrupt_objects[0].first_occurrence diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py index b8b21b7..06a2b52 100644 --- a/swh/scrubber/tests/test_cli.py +++ b/swh/scrubber/tests/test_cli.py @@ -119,6 +119,7 @@ def test_check_init(mocker, scrubber_db, swh_storage): [ "check", "init", + "storage", "--object-type", "snapshot", "--nb-partitions", @@ -138,6 +139,7 @@ def test_check_init(mocker, scrubber_db, swh_storage): [ "check", "init", + "storage", "--object-type", "snapshot", "--nb-partitions", @@ -165,6 +167,7 @@ def test_check_storage(mocker, scrubber_db, swh_storage): [ "check", "init", + "storage", "--object-type", "snapshot", "--nb-partitions", @@ -219,6 +222,7 @@ def test_check_list(mocker, scrubber_db, swh_storage): [ "check", "init", + "storage", "--object-type", "snapshot", "--nb-partitions", @@ -247,6 +251,7 @@ def test_check_stalled(mocker, scrubber_db, swh_storage): [ "check", "init", + "storage", "--object-type", "snapshot", "--nb-partitions", @@ -326,6 +331,7 @@ def test_check_reset(mocker, scrubber_db, swh_storage): [ "check", "init", + "storage", "--object-type", "snapshot", "--nb-partitions", @@ -406,7 +412,28 @@ def test_check_journal( ) result = invoke( scrubber_db, - ["check", "journal"], + [ + "check", + "init", + "journal", + "--object-type", + "snapshot", + "--nb-partitions", + "4", + "--name", + "cfg1", + ], + kafka_server=kafka_server, + kafka_prefix=kafka_prefix, + kafka_consumer_group=kafka_consumer_group, + ) + assert result.exit_code == 0, result.output + msg = "Created configuration cfg1 [1] for checking snapshot in kafka journal" + assert result.output.strip() == msg + + result = invoke( + scrubber_db, + ["check", "journal", "cfg1"], kafka_server=kafka_server, kafka_prefix=kafka_prefix, kafka_consumer_group=kafka_consumer_group, @@ -414,7 +441,9 @@ def test_check_journal( assert result.exit_code == 0, result.output assert result.output == "" - get_scrubber_db.assert_called_once_with(cls="postgresql", db=scrubber_db.conn.dsn) + assert get_scrubber_db.call_count == 2 + get_scrubber_db.assert_called_with(cls="postgresql", db=scrubber_db.conn.dsn) + JournalChecker.assert_called_once_with( db=scrubber_db, journal={ @@ -424,6 +453,7 @@ def test_check_journal( "prefix": kafka_prefix, "stop_on_eof": True, }, + config_id=1, ) assert journal_checker.method_calls == [call.run()] diff --git a/swh/scrubber/tests/test_fixer.py b/swh/scrubber/tests/test_fixer.py index ab8ebef..98956b7 100644 --- a/swh/scrubber/tests/test_fixer.py +++ b/swh/scrubber/tests/test_fixer.py @@ -11,76 +11,31 @@ from unittest.mock import MagicMock import zlib import attr +import pytest from swh.journal.serializers import kafka_to_value, value_to_kafka from swh.model.hashutil import hash_to_bytes from swh.model.model import Directory, DirectoryEntry -from swh.model.tests.swh_model_data import DIRECTORIES -from swh.scrubber.db import CorruptObject, Datastore, FixedObject, ScrubberDb -from swh.scrubber.fixer import Fixer - -(DIRECTORY,) = [dir_ for dir_ in DIRECTORIES if len(dir_.entries) > 1] - -# ORIGINAL_DIRECTORY represents a directory with entries in non-canonical order, -# and a consistent hash. Its entries' were canonically reordered, but the original -# order is still present in the raw manifest. -_DIR = Directory(entries=tuple(reversed(DIRECTORY.entries))) -ORIGINAL_DIRECTORY = Directory( - entries=( - DirectoryEntry( - name=b"dir1", - type="dir", - target=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), - perms=0o040755, - ), - DirectoryEntry( - name=b"file1.ext", - type="file", - target=hash_to_bytes("86bc6b377e9d25f9d26777a4a28d08e63e7c5779"), - perms=0o644, - ), - DirectoryEntry( - name=b"subprepo1", - type="rev", - target=hash_to_bytes("c7f96242d73c267adc77c2908e64e0c1cb6a4431"), - perms=0o160000, - ), - ), - raw_manifest=( - b"tree 102\x00" - b"160000 subprepo1\x00\xc7\xf9bB\xd7<&z\xdcw\xc2\x90\x8ed\xe0\xc1\xcbjD1" - b"644 file1.ext\x00\x86\xbck7~\x9d%\xf9\xd2gw\xa4\xa2\x8d\x08\xe6>|Wy" - b"40755 dir1\x00K\x82]\xc6B\xcbn\xb9\xa0`\xe5K\xf8\xd6\x92\x88\xfb\xeeI\x04" - ), +from swh.scrubber.db import ( + ConfigEntry, + CorruptObject, + Datastore, + FixedObject, + ScrubberDb, ) +from swh.scrubber.fixer import Fixer -# A directory with its entries in canonical order, but a hash computed as if -# computed in the reverse order. -# This happens when entries get normalized (either by the loader or accidentally -# in swh-storage) -CORRUPT_DIRECTORY = attr.evolve(ORIGINAL_DIRECTORY, raw_manifest=None) +from .conftest import CORRUPT_DIRECTORY, ORIGINAL_DIRECTORY -assert ORIGINAL_DIRECTORY != CORRUPT_DIRECTORY -assert ( - hash_to_bytes("61992617462fff81509bda4a24b54c96ea74a007") - == ORIGINAL_DIRECTORY.id - == CORRUPT_DIRECTORY.id -) -assert ( - hash_to_bytes("81fda5b242e65fc81201e590d0f0ce5f582fbcdd") - == CORRUPT_DIRECTORY.compute_hash() - != CORRUPT_DIRECTORY.id -) -assert ORIGINAL_DIRECTORY.entries == CORRUPT_DIRECTORY.entries - -DATASTORE = Datastore(package="storage", cls="postgresql", instance="service=swh") -CORRUPT_OBJECT = CorruptObject( - id=ORIGINAL_DIRECTORY.swhid(), - datastore=DATASTORE, - first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc), - object_=value_to_kafka(CORRUPT_DIRECTORY.to_dict()), -) +@pytest.fixture +def corrupt_object(config_entry) -> CorruptObject: + return CorruptObject( + id=ORIGINAL_DIRECTORY.swhid(), + config=config_entry, + first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc), + object_=value_to_kafka(CORRUPT_DIRECTORY.to_dict()), + ) def test_no_object(scrubber_db: ScrubberDb, mocker) -> None: @@ -93,10 +48,12 @@ def test_no_object(scrubber_db: ScrubberDb, mocker) -> None: assert cur.fetchone() == (0,) -def test_no_origin(scrubber_db: ScrubberDb, mocker) -> None: +def test_no_origin( + scrubber_db: ScrubberDb, corrupt_object: CorruptObject, mocker +) -> None: """There is no origin to recover objects from -> nothing happens""" scrubber_db.corrupt_object_add( - CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + corrupt_object.id, corrupt_object.config, corrupt_object.object_ ) fixer = Fixer(db=scrubber_db) @@ -107,18 +64,20 @@ def test_no_origin(scrubber_db: ScrubberDb, mocker) -> None: assert cur.fetchone() == (0,) -def test_already_fixed(scrubber_db: ScrubberDb, mocker) -> None: +def test_already_fixed( + scrubber_db: ScrubberDb, corrupt_object: CorruptObject, datastore: Datastore, mocker +) -> None: """All corrupt objects are already fixed -> nothing happens""" fixed_object = FixedObject( - id=CORRUPT_OBJECT.id, + id=corrupt_object.id, object_=value_to_kafka(ORIGINAL_DIRECTORY.to_dict()), method="whatever means necessary", ) scrubber_db.corrupt_object_add( - CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + corrupt_object.id, corrupt_object.config, corrupt_object.object_ ) with scrubber_db.cursor() as cur: - scrubber_db.object_origin_add(cur, CORRUPT_OBJECT.id, ["http://example.org/"]) + scrubber_db.object_origin_add(cur, corrupt_object.id, ["http://example.org/"]) scrubber_db.fixed_object_add(cur, [fixed_object]) subprocess_run = mocker.patch("subprocess.run") @@ -147,7 +106,7 @@ def _run_fixer_with_clone( adds a corrupt object and an origin to the DB, mocks subprocess.run with the given function, and runs the fixer with caplog""" scrubber_db.corrupt_object_add( - corrupt_object.id, corrupt_object.datastore, corrupt_object.object_ + corrupt_object.id, corrupt_object.config, corrupt_object.object_ ) with scrubber_db.cursor() as cur: scrubber_db.object_origin_add(cur, corrupt_object.id, ["http://example.org/"]) @@ -164,7 +123,9 @@ def _run_fixer_with_clone( subprocess_run.assert_called() -def test_failed_clone(scrubber_db: ScrubberDb, mocker, caplog) -> None: +def test_failed_clone( + scrubber_db: ScrubberDb, corrupt_object: CorruptObject, mocker, caplog +) -> None: """Corrupt object found with an origin, but the origin's clone is broken somehow""" scrubber_db = MagicMock(wraps=scrubber_db) @@ -172,7 +133,7 @@ def test_failed_clone(scrubber_db: ScrubberDb, mocker, caplog) -> None: scrubber_db, mocker, caplog, - corrupt_object=CORRUPT_OBJECT, + corrupt_object=corrupt_object, subprocess_run_side_effect=subprocess.CalledProcessError(1, "foo"), ) @@ -188,7 +149,9 @@ def test_failed_clone(scrubber_db: ScrubberDb, mocker, caplog) -> None: ) in caplog.record_tuples -def test_empty_origin(scrubber_db: ScrubberDb, mocker, caplog) -> None: +def test_empty_origin( + scrubber_db: ScrubberDb, corrupt_object: CorruptObject, mocker, caplog +) -> None: """Corrupt object found with an origin, but the origin's clone is missing the object""" scrubber_db = MagicMock(wraps=scrubber_db) @@ -203,7 +166,7 @@ def test_empty_origin(scrubber_db: ScrubberDb, mocker, caplog) -> None: scrubber_db, mocker, caplog, - corrupt_object=CORRUPT_OBJECT, + corrupt_object=corrupt_object, subprocess_run_side_effect=subprocess_run, ) @@ -220,7 +183,7 @@ def test_empty_origin(scrubber_db: ScrubberDb, mocker, caplog) -> None: def test_parseable_directory_from_origin( - scrubber_db: ScrubberDb, mocker, caplog + scrubber_db: ScrubberDb, corrupt_object: CorruptObject, mocker, caplog ) -> None: """Corrupt object found with an origin, and the object is found in the origin's clone as expected.""" @@ -241,7 +204,7 @@ def test_parseable_directory_from_origin( scrubber_db, mocker, caplog, - corrupt_object=CORRUPT_OBJECT, + corrupt_object=corrupt_object, subprocess_run_side_effect=subprocess_run, ) @@ -259,7 +222,9 @@ def test_parseable_directory_from_origin( assert caplog.record_tuples == [] -def test_unparseable_directory(scrubber_db: ScrubberDb, mocker, caplog) -> None: +def test_unparseable_directory( + scrubber_db: ScrubberDb, config_entry: ConfigEntry, mocker, caplog +) -> None: """Corrupt object found with an origin, and the object is found in the origin's clone as expected; but Dulwich cannot parse it. It was probably loaded by an old version of the loader that was more permissive, @@ -286,7 +251,7 @@ def test_unparseable_directory(scrubber_db: ScrubberDb, mocker, caplog) -> None: corrupt_directory = attr.evolve(original_directory, raw_manifest=None) corrupt_object = CorruptObject( id=original_directory.swhid(), - datastore=DATASTORE, + config=config_entry, object_=value_to_kafka(corrupt_directory.to_dict()), first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc), ) diff --git a/swh/scrubber/tests/test_journal_kafka.py b/swh/scrubber/tests/test_journal_kafka.py index e0b334d..7628d06 100644 --- a/swh/scrubber/tests/test_journal_kafka.py +++ b/swh/scrubber/tests/test_journal_kafka.py @@ -11,11 +11,15 @@ import pytest from swh.journal.serializers import kafka_to_value from swh.journal.writer import get_journal_writer from swh.model import swhids +from swh.model.swhids import ObjectType from swh.model.tests import swh_model_data -from swh.scrubber.journal_checker import JournalChecker +from swh.scrubber.db import Datastore +from swh.scrubber.journal_checker import JournalChecker, get_datastore -def journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group): +def journal_client_config( + kafka_server: str, kafka_prefix: str, kafka_consumer_group: str +): return dict( cls="kafka", brokers=kafka_server, @@ -25,7 +29,7 @@ def journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group): ) -def journal_writer(kafka_server, kafka_prefix): +def journal_writer(kafka_server: str, kafka_prefix: str): return get_journal_writer( cls="kafka", brokers=[kafka_server], @@ -35,25 +39,59 @@ def journal_writer(kafka_server, kafka_prefix): ) -def test_no_corruption(scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group): +@pytest.fixture +def datastore( + kafka_server: str, kafka_prefix: str, kafka_consumer_group: str +) -> Datastore: + journal_config = journal_client_config( + kafka_server, kafka_prefix, kafka_consumer_group + ) + datastore = get_datastore(journal_config) + return datastore + + +def test_no_corruption( + scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, datastore +): writer = journal_writer(kafka_server, kafka_prefix) writer.write_additions("directory", swh_model_data.DIRECTORIES) writer.write_additions("revision", swh_model_data.REVISIONS) writer.write_additions("release", swh_model_data.RELEASES) writer.write_additions("snapshot", swh_model_data.SNAPSHOTS) - JournalChecker( - db=scrubber_db, - journal=journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group), - ).run() + journal_cfg = journal_client_config( + kafka_server, kafka_prefix, kafka_consumer_group + ) + gid = journal_cfg["group_id"] + "_" + + for object_type in ("directory", "revision", "release", "snapshot"): + journal_cfg["group_id"] = gid + object_type + config_id = scrubber_db.config_add( + f"cfg_{object_type}", datastore, getattr(ObjectType, object_type.upper()), 1 + ) + jc = JournalChecker( + db=scrubber_db, + config_id=config_id, + journal=journal_cfg, + ) + jc.run() + jc.journal_client.close() assert list(scrubber_db.corrupt_object_iter()) == [] @pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS))) def test_corrupt_snapshot( - scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, corrupt_idx + scrubber_db, + kafka_server, + kafka_prefix, + kafka_consumer_group, + datastore, + corrupt_idx, ): + config_id = scrubber_db.config_add( + "cfg_snapshot", datastore, ObjectType.SNAPSHOT, 1 + ) snapshots = list(swh_model_data.SNAPSHOTS) snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20) @@ -63,6 +101,7 @@ def test_corrupt_snapshot( before_date = datetime.datetime.now(tz=datetime.timezone.utc) JournalChecker( db=scrubber_db, + config_id=config_id, journal=journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group), ).run() after_date = datetime.datetime.now(tz=datetime.timezone.utc) @@ -72,12 +111,8 @@ def test_corrupt_snapshot( assert corrupt_objects[0].id == swhids.CoreSWHID.from_string( "swh:1:snp:0000000000000000000000000000000000000000" ) - assert corrupt_objects[0].datastore.package == "journal" - assert corrupt_objects[0].datastore.cls == "kafka" - assert ( - corrupt_objects[0].datastore.instance - == f"brokers='{kafka_server}' prefix='{kafka_prefix}'" - ) + assert corrupt_objects[0].config.datastore.package == "journal" + assert corrupt_objects[0].config.datastore.cls == "kafka" assert ( before_date - datetime.timedelta(seconds=5) <= corrupt_objects[0].first_occurrence @@ -89,8 +124,15 @@ def test_corrupt_snapshot( def test_corrupt_snapshots( - scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group + scrubber_db, + kafka_server, + kafka_prefix, + kafka_consumer_group, + datastore, ): + config_id = scrubber_db.config_add( + "cfg_snapshot", datastore, ObjectType.SNAPSHOT, 1 + ) snapshots = list(swh_model_data.SNAPSHOTS) for i in (0, 1): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20) @@ -100,6 +142,7 @@ def test_corrupt_snapshots( JournalChecker( db=scrubber_db, + config_id=config_id, journal=journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group), ).run() diff --git a/swh/scrubber/tests/test_migration.py b/swh/scrubber/tests/test_migration.py new file mode 100644 index 0000000..895f620 --- /dev/null +++ b/swh/scrubber/tests/test_migration.py @@ -0,0 +1,258 @@ +# Copyright (C) 2023 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information +import os +from subprocess import CalledProcessError + +from click.testing import CliRunner +import pytest + +from swh.core.cli.db import db as swhdb +from swh.core.db import BaseDb +from swh.core.db.db_utils import import_swhmodule, swh_db_upgrade, swh_db_version +from swh.core.db.tests.test_cli import craft_conninfo +from swh.model.swhids import ObjectType +from swh.scrubber.db import ScrubberDb + + +@pytest.fixture +def cli_runner(): + return CliRunner() + + +@pytest.fixture() +def current_version(): + return 6 + + +def test_datadir(datadir): + assert datadir.endswith("/swh/scrubber/tests/data") + + +@pytest.fixture() +def mock_import_swhmodule(mocker, datadir, current_version): + """This bypasses the module manipulation to make import_swhmodule return a mock + object suitable for data test files listing via get_sql_for_package. + + For a given module `test.<mod>`, return a MagicMock object with a __name__ + set to `<mod>` and __file__ pointing to `data/<mod>/__init__.py`. + + The Mock object also defines a `get_datastore()` attribute on which the + `current_version` attribute is set to `current_version`. + + Typical usage:: + + def test_xxx(cli_runner, mock_import_swhmodule): + conninfo = craft_conninfo(test_db, "new-db") + module_name = "test.cli" + # the command below will use sql scripts from + # swh/core/db/tests/data/cli/sql/*.sql + cli_runner.invoke(swhdb, ["init", module_name, "--dbname", conninfo]) + + """ + + def import_swhmodule_mock(modname): + if modname.startswith("test."): + dirname = modname.split(".", 1)[1] + + def get_datastore(*args, **kw): + return mocker.MagicMock(current_version=current_version) + + return mocker.MagicMock( + __name__=modname, + __file__=os.path.join(datadir, dirname, "__init__.py"), + get_datastore=get_datastore, + ) + else: + return import_swhmodule(modname) + + return mocker.patch("swh.core.db.db_utils.import_swhmodule", import_swhmodule_mock) + + +def test_upgrade_6_to_7( + cli_runner, postgresql, mock_import_swhmodule, datadir, current_version +): + """Check 6 to 7 migration""" + module = "test.cli" + conninfo = craft_conninfo(postgresql) + result = cli_runner.invoke(swhdb, ["init-admin", module, "--dbname", conninfo]) + assert result.exit_code == 0, f"Unexpected output: {result.output}" + result = cli_runner.invoke(swhdb, ["init", module, "--dbname", conninfo]) + assert result.exit_code == 0, f"Unexpected output: {result.output}" + + assert swh_db_version(conninfo) == current_version + + new_version = swh_db_upgrade(conninfo, module, 7) + assert new_version == 7 + assert swh_db_version(conninfo) == 7 + + tmap = {otype.name.lower(): otype.value for otype in ObjectType} + db = ScrubberDb(postgresql) + + # corrupt objects + corrupt_objects = list(db.corrupt_object_iter()) + assert len(corrupt_objects) == 15 + + got = [ + ( + o.config.datastore.instance, + o.config.object_type, + o.config.name, + str(o.id), + ) + for o in corrupt_objects + ] + + # expected result for corrupt objects + expected = [] + for otype, n, ds, port in ( + ("release", 16, 1, 5432), + ("revision", 16, 1, 5432), + ("release", 16, 2, 5433), + ): + expected += [ + (str(port), otype, f"ds{ds}-{otype}-{n}", f"swh:1:{tmap[otype]}:{i:040X}") + for i in range(5) + ] + + assert set(got) == set(expected) + + # missing objects + missing_objects = list(db.missing_object_iter()) + assert len(missing_objects) == 15 + + got = [ + (o.config.datastore.instance, o.config.object_type, o.config.name, str(o.id)) + for o in missing_objects + ] + + # expected result for missing objects + # For those, the object type of the check_config entry is the one of the + # reference object (not the one of the missing object). + # E.g. the check_config for snapshot generates release missing object entries. + expected = [] + for checked_otype, missing_otype, n, ds, port in ( + ("snapshot", "release", 16, 1, 5432), + ("snapshot", "revision", 16, 1, 5432), + ("snapshot", "release", 16, 2, 5433), + ): + expected += [ + ( + str(port), + checked_otype, + f"ds{ds}-{checked_otype}-{n}", + f"swh:1:{tmap[missing_otype]}:{i:040X}", + ) + for i in range(5) + ] + + assert set(got) == set(expected) + + # missing_object_references + for mo in missing_objects: + mo_refs = list(db.missing_object_reference_iter(mo.id)) + assert mo.config in [mor.config for mor in mo_refs] + # for each missing object, there is only one reference from the same + # check session (same check_config) + assert len([mor for mor in mo_refs if mor.config == mo.config]) == 1 + + +def test_upgrade_6_to_7_fails_corrupt( + cli_runner, postgresql, mock_import_swhmodule, datadir, current_version +): + """Check 6 to 7 migration fails + + in case there is a corrupt_object row with a datastore that matches 2 + check_configs for the object type + + """ + + module = "test.cli" + conninfo = craft_conninfo(postgresql) + result = cli_runner.invoke(swhdb, ["init-admin", module, "--dbname", conninfo]) + assert result.exit_code == 0, f"Unexpected output: {result.output}" + result = cli_runner.invoke(swhdb, ["init", module, "--dbname", conninfo]) + assert result.exit_code == 0, f"Unexpected output: {result.output}" + + assert swh_db_version(conninfo) == current_version + + cnx = BaseDb.connect(conninfo) + with cnx.transaction() as cur: + # datastore 3 have 2 check_config entries for the release object_type + cur.execute( + "insert into corrupt_object(id, datastore, object) " "values (%s, %s, %s)", + ("swh:1:rel:0000000000000000000000000000000000000000", 3, b"\x00"), + ) + with pytest.raises(CalledProcessError): + swh_db_upgrade(conninfo, module, 7) + + assert swh_db_version(conninfo) == 6 + + +def test_upgrade_6_to_7_fails_missing_reference( + cli_runner, postgresql, mock_import_swhmodule, datadir, current_version +): + """Check 6 to 7 migration fails + + in case there is a missing_object_reference row with a datastore that matches 2 + check_configs for the object (reference_id) type. + """ + module = "test.cli" + conninfo = craft_conninfo(postgresql) + result = cli_runner.invoke(swhdb, ["init-admin", module, "--dbname", conninfo]) + assert result.exit_code == 0, f"Unexpected output: {result.output}" + result = cli_runner.invoke(swhdb, ["init", module, "--dbname", conninfo]) + assert result.exit_code == 0, f"Unexpected output: {result.output}" + + assert swh_db_version(conninfo) == current_version + + cnx = BaseDb.connect(conninfo) + with cnx.transaction() as cur: + # datastore 3 have 2 check_config entries for the release object_type + cur.execute( + "insert into missing_object_reference(missing_id, reference_id, datastore) " + "values (%s, %s, %s)", + ( + "swh:1:dir:0000000000000000000000000000000000000000", + "swh:1:rel:0000000000000000000000000000000000000000", + 3, + ), + ) + + with pytest.raises(CalledProcessError): + swh_db_upgrade(conninfo, module, 7) + + assert swh_db_version(conninfo) == 6 + + +def test_upgrade_6_to_7_fails_missing( + cli_runner, postgresql, mock_import_swhmodule, datadir, current_version +): + """Check 6 to 7 migration fails + + in case there is a missing_object row with a datastore that matches 2 + check_configs for the object type. + """ + + module = "test.cli" + conninfo = craft_conninfo(postgresql) + result = cli_runner.invoke(swhdb, ["init-admin", module, "--dbname", conninfo]) + assert result.exit_code == 0, f"Unexpected output: {result.output}" + result = cli_runner.invoke(swhdb, ["init", module, "--dbname", conninfo]) + assert result.exit_code == 0, f"Unexpected output: {result.output}" + + assert swh_db_version(conninfo) == current_version + + cnx = BaseDb.connect(conninfo) + with cnx.transaction() as cur: + # datastore 3 have 2 check_config entries for the release object_type + cur.execute( + "insert into missing_object(id, datastore) " "values (%s, %s)", + ("swh:1:rel:0000000000000000000000000000000000000000", 3), + ) + + with pytest.raises(CalledProcessError): + swh_db_upgrade(conninfo, module, 7) + + assert swh_db_version(conninfo) == 6 diff --git a/swh/scrubber/tests/test_origin_locator.py b/swh/scrubber/tests/test_origin_locator.py index 88ad613..52702db 100644 --- a/swh/scrubber/tests/test_origin_locator.py +++ b/swh/scrubber/tests/test_origin_locator.py @@ -3,7 +3,6 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime import logging from unittest.mock import MagicMock @@ -12,22 +11,14 @@ import pytest from swh.graph.http_naive_client import NaiveClient as NaiveGraphClient from swh.model.model import Origin from swh.model.swhids import CoreSWHID -from swh.scrubber.db import CorruptObject, Datastore from swh.scrubber.origin_locator import OriginLocator -CORRUPT_OBJECT = CorruptObject( - id=CoreSWHID.from_string("swh:1:cnt:" + "f" * 40), - datastore=Datastore(package="storage", cls="postgresql", instance="service=swh"), - first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc), - object_=b"blah", -) - @pytest.mark.parametrize("insert", [False, True]) -def test_no_objects(scrubber_db, insert): +def test_no_objects(scrubber_db, corrupt_object, insert): if insert: scrubber_db.corrupt_object_add( - CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + corrupt_object.id, corrupt_object.config, corrupt_object.object_ ) graph = MagicMock() @@ -37,8 +28,8 @@ def test_no_objects(scrubber_db, insert): graph=graph, storage=storage, # this range does not contain the object above - start_object=CoreSWHID.from_string("swh:1:cnt:00" + "00" * 19), - end_object=CoreSWHID.from_string("swh:1:cnt:f0" + "00" * 19), + start_object=CoreSWHID.from_string("swh:1:dir:00" + "00" * 19), + end_object=CoreSWHID.from_string("swh:1:dir:60" + "00" * 19), ) locator.run() @@ -51,9 +42,9 @@ def test_no_objects(scrubber_db, insert): assert cur.fetchone() == (0,) -def test_object_not_in_graph(scrubber_db): +def test_object_not_in_graph(scrubber_db, corrupt_object): scrubber_db.corrupt_object_add( - CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + corrupt_object.id, corrupt_object.config, corrupt_object.object_ ) graph = NaiveGraphClient(nodes=[], edges=[]) @@ -62,8 +53,8 @@ def test_object_not_in_graph(scrubber_db): db=scrubber_db, graph=graph, storage=storage, - start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), - end_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + start_object=CoreSWHID.from_string("swh:1:dir:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:dir:" + "00" * 20), ) locator.run() @@ -75,23 +66,23 @@ def test_object_not_in_graph(scrubber_db): assert cur.fetchone() == (0,) -def test_origin_not_in_storage(scrubber_db, swh_storage, caplog): +def test_origin_not_in_storage(scrubber_db, swh_storage, corrupt_object, caplog): scrubber_db.corrupt_object_add( - CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + corrupt_object.id, corrupt_object.config, corrupt_object.object_ ) origin = Origin(url="http://example.org") graph = NaiveGraphClient( - nodes=[CORRUPT_OBJECT.id, origin.swhid()], - edges=[(origin.swhid(), CORRUPT_OBJECT.id)], + nodes=[corrupt_object.id, origin.swhid()], + edges=[(origin.swhid(), corrupt_object.id)], ) locator = OriginLocator( db=scrubber_db, graph=graph, storage=swh_storage, - start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), - end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), + start_object=CoreSWHID.from_string("swh:1:dir:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:dir:" + "ff" * 20), ) with caplog.at_level(logging.ERROR, logger="swh.scrubber.origin_locator"): @@ -107,9 +98,9 @@ def test_origin_not_in_storage(scrubber_db, swh_storage, caplog): ) -def test_two_origins(scrubber_db, swh_storage): +def test_two_origins(scrubber_db, corrupt_object, swh_storage): scrubber_db.corrupt_object_add( - CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + corrupt_object.id, corrupt_object.config, corrupt_object.object_ ) origin1 = Origin(url="http://example.org") @@ -117,18 +108,18 @@ def test_two_origins(scrubber_db, swh_storage): swh_storage.origin_add([origin1, origin2]) graph = NaiveGraphClient( - nodes=[CORRUPT_OBJECT.id, origin1.swhid(), origin2.swhid()], + nodes=[corrupt_object.id, origin1.swhid(), origin2.swhid()], edges=[ - (origin1.swhid(), CORRUPT_OBJECT.id), - (origin2.swhid(), CORRUPT_OBJECT.id), + (origin1.swhid(), corrupt_object.id), + (origin2.swhid(), corrupt_object.id), ], ) locator = OriginLocator( db=scrubber_db, graph=graph, storage=swh_storage, - start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), - end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), + start_object=CoreSWHID.from_string("swh:1:dir:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:dir:" + "ff" * 20), ) locator.run() @@ -136,29 +127,29 @@ def test_two_origins(scrubber_db, swh_storage): with scrubber_db.conn.cursor() as cur: cur.execute("SELECT object_id, origin_url FROM object_origin") assert set(cur) == { - (str(CORRUPT_OBJECT.id), origin1.url), - (str(CORRUPT_OBJECT.id), origin2.url), + (str(corrupt_object.id), origin1.url), + (str(corrupt_object.id), origin2.url), } -def test_many_origins(scrubber_db, swh_storage): +def test_many_origins(scrubber_db, corrupt_object, swh_storage): scrubber_db.corrupt_object_add( - CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + corrupt_object.id, corrupt_object.config, corrupt_object.object_ ) origins = [Origin(url=f"http://example.org/{i}") for i in range(1000)] swh_storage.origin_add(origins) graph = NaiveGraphClient( - nodes=[CORRUPT_OBJECT.id] + [origin.swhid() for origin in origins], - edges=[(origin.swhid(), CORRUPT_OBJECT.id) for origin in origins], + nodes=[corrupt_object.id] + [origin.swhid() for origin in origins], + edges=[(origin.swhid(), corrupt_object.id) for origin in origins], ) locator = OriginLocator( db=scrubber_db, graph=graph, storage=swh_storage, - start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), - end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), + start_object=CoreSWHID.from_string("swh:1:dir:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:dir:" + "ff" * 20), ) locator.run() @@ -166,5 +157,5 @@ def test_many_origins(scrubber_db, swh_storage): with scrubber_db.conn.cursor() as cur: cur.execute("SELECT object_id, origin_url FROM object_origin") rows = set(cur) - assert rows <= {(str(CORRUPT_OBJECT.id), origin.url) for origin in origins} + assert rows <= {(str(corrupt_object.id), origin.url) for origin in origins} assert len(rows) == 100 diff --git a/swh/scrubber/utils.py b/swh/scrubber/utils.py index 5a4daaf..c38f9f6 100644 --- a/swh/scrubber/utils.py +++ b/swh/scrubber/utils.py @@ -26,12 +26,14 @@ def iter_corrupt_objects( while True: with db.conn, db.cursor() as cur: if origin_url: - corrupt_objects = db.corrupt_object_grab_by_origin( - cur, origin_url, start_object, end_object + corrupt_objects = list( + db.corrupt_object_grab_by_origin( + cur, origin_url, start_object, end_object + ) ) else: - corrupt_objects = db.corrupt_object_grab_by_id( - cur, start_object, end_object + corrupt_objects = list( + db.corrupt_object_grab_by_id(cur, start_object, end_object) ) if corrupt_objects and corrupt_objects[0].id == start_object: # TODO: don't needlessly fetch duplicate objects -- GitLab