diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py index 85c5711028846d69438f8af6294500fad7565cbe..10c711e898e28d08126f6ec743d238adb95d6989 100644 --- a/swh/scrubber/cli.py +++ b/swh/scrubber/cli.py @@ -10,6 +10,7 @@ import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group +from swh.model.swhids import ObjectType @swh_cli_group.group(name="scrubber", context_settings=CONTEXT_SETTINGS) @@ -75,12 +76,14 @@ def scrubber_cli_group(ctx, config_file: Optional[str]) -> None: else: conf = {} - if "scrubber_db" not in conf: - ctx.fail("You must have a scrubber_db configured in your config file.") - ctx.ensure_object(dict) ctx.obj["config"] = conf - ctx.obj["db"] = get_scrubber_db(**conf["scrubber_db"]) + if "scrubber_db" not in conf: + click.echo( + "WARNING: You must have a scrubber_db configured in your config file.\n" + ) + else: + ctx.obj["db"] = get_scrubber_db(**conf["scrubber_db"]) @scrubber_cli_group.group(name="check") @@ -90,7 +93,7 @@ def scrubber_check_cli_group(ctx): pass -@scrubber_check_cli_group.command(name="storage") +@scrubber_check_cli_group.command(name="init") @click.option( "--object-type", type=click.Choice( @@ -107,58 +110,139 @@ def scrubber_check_cli_group(ctx): ] ), ) -@click.option("--start-partition-id", default=0, type=int) -@click.option("--end-partition-id", default=4096, type=int) @click.option("--nb-partitions", default=4096, type=int) +@click.option("--name", default=None, type=str) @click.pass_context -def scrubber_check_storage( +def scrubber_check_init( ctx, object_type: str, - start_partition_id: int, - end_partition_id: int, nb_partitions: int, + name: Optional[str], +): + """Initialise a scrubber check configuration for the datastore defined in the + configuration file and given object_type. + + A checker configuration configuration consists simply in a set of: + + - object type: the type of object being checked, + - number of partitions: the number of partitions the hash space is divided + in; must be a power of 2, + - name: an unique name for easier reference, + + linked to the storage provided in the configuration file. + + """ + if not object_type or not name: + raise click.ClickException( + "Invalid parameters: you must provide the object type and configuration name" + ) + + conf = ctx.obj["config"] + if "storage" not in conf: + raise click.ClickException( + "You must have a storage configured in your config file." + ) + + db = ctx.obj["db"] + from swh.storage import get_storage + + from .storage_checker import get_datastore + + datastore = get_datastore(storage=get_storage(**conf["storage"])) + db.datastore_get_or_add(datastore) + + if db.config_get_by_name(name): + raise click.ClickException(f"Configuration {name} already exists") + + config_id = db.config_add( + name, datastore, getattr(ObjectType, object_type.upper()), nb_partitions + ) + click.echo( + f"Created configuration {name} [{config_id}] for checking {object_type} " + f"in {datastore.cls} {datastore.package}" + ) + + +@scrubber_check_cli_group.command(name="list") +@click.pass_context +def scrubber_check_list( + ctx, +): + """List the know configurations""" + conf = ctx.obj["config"] + if "storage" not in conf: + ctx.fail("You must have a storage configured in your config file.") + + db = ctx.obj["db"] + + for id_, cfg in db.config_iter(): + ds = db.datastore_get(cfg.datastore_id) + if not ds: + click.echo( + f"[{id_}] {cfg.name}: Invalid configuration entry; datastore not found" + ) + else: + click.echo( + f"[{id_}] {cfg.name}: {cfg.object_type}, {cfg.nb_partitions}, " + f"{ds.package}:{ds.cls} ({ds.instance})" + ) + + +@scrubber_check_cli_group.command(name="storage") +@click.argument( + "name", + type=str, + default=None, + required=False, # can be given by config_id instead +) +@click.option( + "--config-id", + type=int, +) +@click.option("--limit", default=0, type=int) +@click.pass_context +def scrubber_check_storage( + ctx, + name: str, + config_id: int, + limit: int, ): """Reads a swh-storage instance, and reports corrupt objects to the scrubber DB. This runs a single thread; parallelism is achieved by running this command multiple - times, on disjoint ranges. + times. + + This command references an existing scrubbing configuration (either by name + or by id); the configuration holds the object type, number of partitions + and the storage configuration this scrubbing session will check on. All objects of type ``object_type`` are ordered, and split into the given number - of partitions. When running in parallel, the number of partitions should be the - same for all workers or they may work on overlapping or non-exhaustive ranges. - - Then, this process will check all partitions in the given - ``[start_partition_id, end_partition_id)`` range. When running in parallel, these - ranges should be set so that processes over the whole ``[0, nb_partitions)`` range. - - For example in order to have 8 threads checking revisions in parallel and with 64k - checkpoints (to recover on crashes), the CLI should be ran 8 times with these - parameters:: - - --object-type revision --nb-partitions 65536 --start-partition-id 0 --end-partition-id 8192 - --object-type revision --nb-partitions 65536 --start-partition-id 8192 --end-partition-id 16384 - --object-type revision --nb-partitions 65536 --start-partition-id 16384 --end-partition-id 24576 - --object-type revision --nb-partitions 65536 --start-partition-id 24576 --end-partition-id 32768 - --object-type revision --nb-partitions 65536 --start-partition-id 32768 --end-partition-id 40960 - --object-type revision --nb-partitions 65536 --start-partition-id 40960 --end-partition-id 49152 - --object-type revision --nb-partitions 65536 --start-partition-id 49152 --end-partition-id 57344 - --object-type revision --nb-partitions 65536 --start-partition-id 57344 --end-partition-id 65536 + of partitions. + + Then, this process will check all partitions. The status of the ongoing + check session is stored in the database, so the number of concurrent + workers can be dynamically adjusted. + """ # noqa conf = ctx.obj["config"] if "storage" not in conf: ctx.fail("You must have a storage configured in your config file.") + db = ctx.obj["db"] from swh.storage import get_storage from .storage_checker import StorageChecker + if name and config_id is None: + config_id = db.config_get_by_name(name) + + if config_id is None: + raise click.ClickExceptino("A valid configuration name/id must be set") checker = StorageChecker( db=ctx.obj["db"], storage=get_storage(**conf["storage"]), - object_type=object_type, - start_partition_id=start_partition_id, - end_partition_id=end_partition_id, - nb_partitions=nb_partitions, + config_id=config_id, + limit=limit, ) checker.run() diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py index 01146d36d4f8d6d0174a9a533243cc7c9704e255..b20fc475fe6b208e57c8da85b5e63654c918c7e2 100644 --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -15,6 +15,10 @@ from swh.core.db import BaseDb from swh.model.swhids import CoreSWHID, ObjectType +def now(): + return datetime.datetime.now(tz=datetime.timezone.utc) + + @dataclasses.dataclass(frozen=True) class Datastore: """Represents a datastore being scrubbed; eg. swh-storage or swh-journal.""" @@ -59,6 +63,16 @@ class FixedObject: recovery_date: Optional[datetime.datetime] = None +@dataclasses.dataclass(frozen=True) +class ConfigEntry: + """Represents a datastore being scrubbed; eg. swh-storage or swh-journal.""" + + name: str + datastore_id: int + object_type: str + nb_partitions: int + + class ScrubberDb(BaseDb): current_version = 6 @@ -99,84 +113,223 @@ class ScrubberDb(BaseDb): return id_ @functools.lru_cache(1000) - def config_get_or_add( - self, datastore: Datastore, object_type: ObjectType, nb_partitions: int + def datastore_get(self, datastore_id: int) -> Datastore: + """Returns a datastore's id. Raises :exc:`ValueError` if it does not exist.""" + with self.transaction() as cur: + cur.execute( + """ + SELECT package, class, instance + FROM datastore + WHERE id=%s + """, + (datastore_id,), + ) + res = cur.fetchone() + if res is None: + raise ValueError(f"No datastore with id {datastore_id}") + (package, cls, instance) = res + return Datastore(package=package, cls=cls, instance=instance) + + def config_add( + self, + name: Optional[str], + datastore: Datastore, + object_type: ObjectType, + nb_partitions: int, ) -> int: - """Creates a configuration entry (and a datastore) if it does not exist, and returns its id.""" + """Creates a configuration entry (and potentially a datastore); + + Will fail if a config with same (datastore. object_type, nb_paritions) + already exists. + """ + datastore_id = self.datastore_get_or_add(datastore) + if not name: + name = ( + f"check_{object_type.name.lower()}_{nb_partitions}_" + f"{datastore.package}_{datastore.cls}" + ) + args = { + "name": name, + "datastore_id": datastore_id, + "object_type": object_type.name.lower(), + "nb_partitions": nb_partitions, + } with self.transaction() as cur: cur.execute( """ WITH inserted AS ( - INSERT INTO check_config (datastore, object_type, nb_partitions) - VALUES (%(datastore_id)s, %(object_type)s, %(nb_partitions)s) - ON CONFLICT DO NOTHING + INSERT INTO check_config (name, datastore, object_type, nb_partitions) + VALUES (%(name)s, %(datastore_id)s, %(object_type)s, %(nb_partitions)s) RETURNING id ) SELECT id - FROM inserted - UNION ( - -- If the datastore already exists, we need to fetch its id - SELECT id + FROM inserted; + """, + args, + ) + res = cur.fetchone() + if res is None: + raise ValueError(f"No config matching {args}") + (id_,) = res + return id_ + + @functools.lru_cache(1000) + def config_get(self, config_id: int) -> ConfigEntry: + with self.transaction() as cur: + cur.execute( + """ + SELECT name, datastore, object_type, nb_partitions FROM check_config - WHERE - datastore=%(datastore_id)s - AND object_type=%(object_type)s - AND nb_partitions=%(nb_partitions)s - ) - LIMIT 1 + WHERE id=%(config_id)s """, { - "datastore_id": datastore_id, - "object_type": object_type.name.lower(), - "nb_partitions": nb_partitions, + "config_id": config_id, }, ) res = cur.fetchone() - assert res is not None - (id_,) = res - return id_ + if res is None: + raise ValueError(f"No config with id {config_id}") + (name, datastore_id, object_type, nb_partitions) = res + return ConfigEntry( + name=name, + datastore_id=datastore_id, + object_type=object_type, + nb_partitions=nb_partitions, + ) + + def config_get_by_name( + self, + name: str, + ) -> Optional[int]: + """Get the configuration entry for given name, if any""" + with self.transaction() as cur: + cur.execute( + """ + SELECT id + FROM check_config + WHERE + name=%s + """, + (name,), + ) + if cur.rowcount: + res = cur.fetchone() + if res: + (id_,) = res + return id_ + return None + + def config_iter(self) -> Iterator[Tuple[int, ConfigEntry]]: + with self.transaction() as cur: + cur.execute( + """ + SELECT id, name, datastore, object_type, nb_partitions + FROM check_config + """, + ) + for row in cur: + assert row is not None + (id_, name, datastore_id, object_type, nb_partitions) = row + yield ( + id_, + ConfigEntry( + name=name, + datastore_id=datastore_id, + object_type=object_type, + nb_partitions=nb_partitions, + ), + ) #################################### # Checkpointing/progress tracking #################################### + def checked_partition_iter_next( + self, + config_id: int, + ) -> Iterator[int]: + """Generates partitions to be checked for the given configuration + + At each iteration, look for the next "free" partition in the + checked_partition, for the given config_id, reserve it and return its + id. + + Reserving the partition means make sure there is a row in the table for + this partition id with the start_date column set. + + To chose a "free" partition is to select either the smaller partition + is for which the start_date is NULL, or the first partition id not yet + in the table. + + Stops the iteration when the number of partitions for the config id is + reached. + + """ + while True: + start_time = now() + with self.transaction() as cur: + cur.execute( + """ + WITH next AS ( + SELECT min(partition_id) as pid + FROM checked_partition + WHERE config_id=%(config_id)s and start_date is NULL + UNION + SELECT coalesce(max(partition_id) + 1, 0) as pid + FROM checked_partition + WHERE config_id=%(config_id)s + ) + INSERT INTO checked_partition(config_id, partition_id, start_date) + select %(config_id)s, min(pid), %(start_date)s from next + where pid is not NULL + ON CONFLICT (config_id, partition_id) + DO UPDATE + SET start_date = GREATEST( + checked_partition.start_date, EXCLUDED.start_date + ) + RETURNING partition_id; + """, + {"config_id": config_id, "start_date": start_time}, + ) + res = cur.fetchone() + assert res is not None + (partition_id,) = res + if partition_id >= self.config_get(config_id).nb_partitions: + self.conn.rollback() + return + yield partition_id + def checked_partition_upsert( self, - datastore: Datastore, - object_type: ObjectType, + config_id: int, partition_id: int, - nb_partitions: int, - date: datetime.datetime, + date: Optional[datetime.datetime] = None, ) -> None: """ Records in the database the given partition was last checked at the given date. """ - config_id = self.config_get_or_add(datastore, object_type, nb_partitions) + if date is None: + date = now() + with self.transaction() as cur: cur.execute( """ - INSERT INTO checked_partition(config_id, partition_id, last_date) - VALUES (%s, %s, %s) - ON CONFLICT (config_id, partition_id) - DO UPDATE - SET last_date = GREATEST( - checked_partition.last_date, EXCLUDED.last_date - ) + UPDATE checked_partition + SET end_date = GREATEST(%(date)s, end_date) + WHERE config_id=%(config_id)s AND partition_id=%(partition_id)s """, - ( - config_id, - partition_id, - date, - ), + { + "config_id": config_id, + "partition_id": partition_id, + "date": date, + }, ) def checked_partition_get_last_date( self, - datastore: Datastore, - object_type: ObjectType, + config_id: int, partition_id: int, - nb_partitions: int, ) -> Optional[datetime.datetime]: """ Returns the last date the given partition was checked in the given datastore, @@ -185,11 +338,10 @@ class ScrubberDb(BaseDb): Currently, this matches partition id and number exactly, with no regard for partitions that contain or are contained by it. """ - config_id = self.config_get_or_add(datastore, object_type, nb_partitions) with self.transaction() as cur: cur.execute( """ - SELECT last_date + SELECT end_date FROM checked_partition WHERE config_id=%s AND partition_id=%s """, @@ -204,23 +356,22 @@ class ScrubberDb(BaseDb): return date def checked_partition_iter( - self, datastore: Datastore - ) -> Iterator[Tuple[ObjectType, int, int, datetime.datetime]]: - """Yields tuples of ``(partition_id, nb_partitions, last_date)``""" - datastore_id = self.datastore_get_or_add(datastore) + self, config_id: int + ) -> Iterator[Tuple[int, int, datetime.datetime, Optional[datetime.datetime]]]: + """Yields tuples of ``(partition_id, nb_partitions, start_date, end_date)``""" with self.transaction() as cur: cur.execute( """ - SELECT CC.object_type, CP.partition_id, CC.nb_partitions, CP.last_date + SELECT CP.partition_id, CC.nb_partitions, CP.start_date, CP.end_date FROM checked_partition as CP INNER JOIN check_config AS CC on (CC.id=CP.config_id) - WHERE CC.datastore=%s + WHERE CC.id=%s """, - (datastore_id,), + (config_id,), ) - for (object_type, *rest) in cur: - yield (getattr(ObjectType, object_type.upper()), *rest) # type: ignore[misc] + for row in cur: + yield tuple(row) # type: ignore[misc] #################################### # Inventory of objects with issues diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql index 9c6a2c4c3111de2dbc6ded307e47b597db84e3d2..c19802c9220015ef96c00247683d1da243c67b0c 100644 --- a/swh/scrubber/sql/30-schema.sql +++ b/swh/scrubber/sql/30-schema.sql @@ -46,13 +46,15 @@ create table checked_partition ( config_id int not null, partition_id bigint not null, - last_date timestamptz not null + start_date timestamptz, + end_date timestamptz ); comment on table checked_partition is 'Each row represents a range of objects in a datastore that were fetched, checksummed, and checked at some point in the past. The whole set of objects of the given type is split into config.nb_partitions and partition_id is a value from 0 to config.nb_partitions-1.'; comment on column checked_partition.config_id is 'The check configuration this partition concerns.'; comment on column checked_partition.partition_id is 'Index of the partition to fetch'; -comment on column checked_partition.last_date is 'Date the last scrub of this partition *started*.'; +comment on column checked_partition.start_date is 'Date the last scrub started for this partition.'; +comment on column checked_partition.end_date is 'Date the last scrub ended of this partition.'; ------------------------------------- -- Inventory of objects with issues diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql index 3ed4584879640130b2a8bf57edfe5ab51174dc4c..152691b969ebb3df71c94f052d62f32ba5516511 100644 --- a/swh/scrubber/sql/60-indexes.sql +++ b/swh/scrubber/sql/60-indexes.sql @@ -14,6 +14,7 @@ create unique index datastore_package_class_instance on datastore(package, class ------------------------------------- create unique index check_config_pkey on check_config(id); +create unique index check_config_unicity_idx on check_config(datastore, object_type, nb_partitions); alter table check_config add primary key using index check_config_pkey; ------------------------------------- diff --git a/swh/scrubber/sql/upgrades/6.sql b/swh/scrubber/sql/upgrades/6.sql index 70c25f88edeecbfff1eaa66cad9c48672d8891ce..4d229272aadac942fa0ec0cf69526256c136d965 100644 --- a/swh/scrubber/sql/upgrades/6.sql +++ b/swh/scrubber/sql/upgrades/6.sql @@ -34,16 +34,18 @@ create table checked_partition ( config_id int not null, partition_id bigint not null, - last_date timestamptz not null + start_date timestamptz, + end_date timestamptz ); comment on table checked_partition is 'Each row represents a range of objects in a datastore that were fetched, checksummed, and checked at some point in the past. The whole set of objects of the given type is split into config.nb_partitions and partition_id is a value from 0 to config.nb_partitions-1.'; comment on column checked_partition.config_id is 'The check configuration this partition concerns.'; comment on column checked_partition.partition_id is 'Index of the partition to fetch'; -comment on column checked_partition.last_date is 'Date the last scrub of this partition *started*.'; +comment on column checked_partition.start_date is 'Date the last scrub started for this partition.'; +comment on column checked_partition.end_date is 'Date the last scrub ended of this partition.'; insert into checked_partition - select CC.id, CP.partition_id, CP.last_date + select CC.id, CP.partition_id, CP.last_date, CP.last_date from old_checked_partition as CP inner join check_config as CC using (datastore, object_type, nb_partitions); diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py index 88fcfe62f7f5dbec400b8ce85e33cb65ab24a1f7..aae2805e2e03a80951af20ee518917f594ad53a5 100644 --- a/swh/scrubber/storage_checker.py +++ b/swh/scrubber/storage_checker.py @@ -7,8 +7,7 @@ import collections import contextlib -import dataclasses -import datetime +from itertools import count, islice import json import logging from typing import Iterable, Optional, Tuple, Union @@ -36,7 +35,7 @@ from swh.storage.cassandra.storage import CassandraStorage from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage as PostgresqlStorage -from .db import Datastore, ScrubberDb +from .db import ConfigEntry, Datastore, ScrubberDb logger = logging.getLogger(__name__) @@ -89,67 +88,83 @@ def _get_inclusive_range_swhids( return (range_start_swhid, range_end_swhid) -@dataclasses.dataclass +def get_datastore(storage) -> Datastore: + if isinstance(storage, PostgresqlStorage): + with postgresql_storage_db(storage) as db: + datastore = Datastore( + package="storage", + cls="postgresql", + instance=db.conn.dsn, + ) + elif isinstance(storage, CassandraStorage): + datastore = Datastore( + package="storage", + cls="cassandra", + instance=json.dumps( + { + "keyspace": storage.keyspace, + "hosts": storage.hosts, + "port": storage.port, + } + ), + ) + else: + raise NotImplementedError(f"StorageChecker(storage={storage!r}).datastore()") + return datastore + + class StorageChecker: """Reads a chunk of a swh-storage database, recomputes checksums, and reports errors in a separate database.""" - db: ScrubberDb - storage: StorageInterface - object_type: str - """``directory``/``revision``/``release``/``snapshot``""" - nb_partitions: int - """Number of partitions to split the whole set of objects into. - Must be a power of 2.""" - start_partition_id: int - """First partition id to check (inclusive). Must be in the range [0, nb_partitions). - """ - end_partition_id: int - """Last partition id to check (exclusive). Must be in the range - (start_partition_id, nb_partitions]""" - - _datastore = None - _statsd = None - - def datastore_info(self) -> Datastore: + def __init__( + self, db: ScrubberDb, config_id: int, storage: StorageInterface, limit: int = 0 + ): + self.db = db + self.storage = storage + self.config_id = config_id + self.limit = limit + + self._config: Optional[ConfigEntry] = None + self._datastore: Optional[Datastore] = None + self._statsd: Optional[Statsd] = None + + @property + def config(self) -> ConfigEntry: + if self._config is None: + self._config = self.db.config_get(self.config_id) + + assert self._config is not None + return self._config + + @property + def nb_partitions(self) -> int: + return self.config.nb_partitions + + @property + def object_type(self) -> str: + return self.config.object_type + + @property + def datastore(self) -> Datastore: """Returns a :class:`Datastore` instance representing the swh-storage instance being checked.""" if self._datastore is None: - if isinstance(self.storage, PostgresqlStorage): - with postgresql_storage_db(self.storage) as db: - self._datastore = Datastore( - package="storage", - cls="postgresql", - instance=db.conn.dsn, - ) - elif isinstance(self.storage, CassandraStorage): - self._datastore = Datastore( - package="storage", - cls="cassandra", - instance=json.dumps( - { - "keyspace": self.storage.keyspace, - "hosts": self.storage.hosts, - "port": self.storage.port, - } - ), - ) - else: - raise NotImplementedError( - f"StorageChecker(storage={self.storage!r}).datastore()" - ) + self._datastore = get_datastore(self.storage) + datastore_id = self.db.datastore_get_or_add(self._datastore) + assert self.config.datastore_id == datastore_id return self._datastore + @property def statsd(self) -> Statsd: if self._statsd is None: - datastore = self.datastore_info() self._statsd = Statsd( namespace="swh_scrubber", constant_tags={ "object_type": self.object_type, "nb_partitions": self.nb_partitions, - "datastore_package": datastore.package, - "datastore_cls": datastore.cls, + "datastore_package": self.datastore.package, + "datastore_cls": self.datastore.cls, }, ) return self._statsd @@ -159,27 +174,12 @@ class StorageChecker: ``start_partition_id`` (inclusive) and ``end_partition_id`` (exclusive) """ object_type = getattr(swhids.ObjectType, self.object_type.upper()) - for partition_id in range(self.start_partition_id, self.end_partition_id): - - start_time = datetime.datetime.now(tz=datetime.timezone.utc) - - # Currently, this matches range boundaries exactly, with no regard for - # ranges that contain or are contained by it. - last_check_time = self.db.checked_partition_get_last_date( - self.datastore_info(), object_type, partition_id, self.nb_partitions - ) - - if last_check_time is not None: - # TODO: re-check if 'last_check_time' was a long ago. - logger.debug( - "Skipping processing of %s partition %d/%d: already done at %s", - self.object_type, - partition_id, - self.nb_partitions, - last_check_time, - ) - continue - + counter: Iterable[int] = count() + if self.limit: + counter = islice(counter, 0, self.limit) + for i, partition_id in zip( + counter, self.db.checked_partition_iter_next(self.config_id) + ): logger.debug( "Processing %s partition %d/%d", self.object_type, @@ -190,11 +190,8 @@ class StorageChecker: self._check_partition(object_type, partition_id) self.db.checked_partition_upsert( - self.datastore_info(), - object_type, + self.config_id, partition_id, - self.nb_partitions, - start_time, ) @tenacity.retry( @@ -225,10 +222,10 @@ class StorageChecker: assert item is not None, f"Directory {dir_id.hex()} disappeared" (has_duplicate_entries, object_) = item if has_duplicate_entries: - self.statsd().increment("duplicate_directory_entries_total") + self.statsd.increment("duplicate_directory_entries_total") self.db.corrupt_object_add( object_.swhid(), - self.datastore_info(), + self.datastore, value_to_kafka(object_.to_dict()), ) objects.append(object_) @@ -244,13 +241,17 @@ class StorageChecker: else: assert False, f"Unexpected object type: {object_type}" - with self.statsd().timed( + with self.statsd.timed( "batch_duration_seconds", tags={"operation": "check_hashes"} ): + logger.debug("Checking %s %s object hashes", len(objects), object_type) self.check_object_hashes(objects) - with self.statsd().timed( + with self.statsd.timed( "batch_duration_seconds", tags={"operation": "check_references"} ): + logger.debug( + "Checking %s %s object references", len(objects), object_type + ) self.check_object_references(objects) page_token = page.next_page_token @@ -267,14 +268,14 @@ class StorageChecker: real_id = object_.compute_hash() count += 1 if object_.id != real_id: - self.statsd().increment("hash_mismatch_total") + self.statsd.increment("hash_mismatch_total") self.db.corrupt_object_add( object_.swhid(), - self.datastore_info(), + self.datastore, value_to_kafka(object_.to_dict()), ) if count: - self.statsd().increment("objects_hashed_total", count) + self.statsd.increment("objects_hashed_total", count) def check_object_references(self, objects: Iterable[ScrubbableObject]): """Check all objects references by these objects exist.""" @@ -348,27 +349,27 @@ class StorageChecker: missing_rels = set(self.storage.release_missing(list(rel_references))) missing_snps = set(self.storage.snapshot_missing(list(snp_references))) - self.statsd().increment( + self.statsd.increment( "missing_object_total", len(missing_cnts), tags={"target_object_type": "content"}, ) - self.statsd().increment( + self.statsd.increment( "missing_object_total", len(missing_dirs), tags={"target_object_type": "directory"}, ) - self.statsd().increment( + self.statsd.increment( "missing_object_total", len(missing_revs), tags={"target_object_type": "revision"}, ) - self.statsd().increment( + self.statsd.increment( "missing_object_total", len(missing_rels), tags={"target_object_type": "release"}, ) - self.statsd().increment( + self.statsd.increment( "missing_object_total", len(missing_snps), tags={"target_object_type": "snapshot"}, @@ -379,7 +380,7 @@ class StorageChecker: object_type=swhids.ObjectType.CONTENT, object_id=missing_id ) self.db.missing_object_add( - missing_swhid, cnt_references[missing_id], self.datastore_info() + missing_swhid, cnt_references[missing_id], self.datastore ) for missing_id in missing_dirs: @@ -387,7 +388,7 @@ class StorageChecker: object_type=swhids.ObjectType.DIRECTORY, object_id=missing_id ) self.db.missing_object_add( - missing_swhid, dir_references[missing_id], self.datastore_info() + missing_swhid, dir_references[missing_id], self.datastore ) for missing_id in missing_revs: @@ -395,7 +396,7 @@ class StorageChecker: object_type=swhids.ObjectType.REVISION, object_id=missing_id ) self.db.missing_object_add( - missing_swhid, rev_references[missing_id], self.datastore_info() + missing_swhid, rev_references[missing_id], self.datastore ) for missing_id in missing_rels: @@ -403,7 +404,7 @@ class StorageChecker: object_type=swhids.ObjectType.RELEASE, object_id=missing_id ) self.db.missing_object_add( - missing_swhid, rel_references[missing_id], self.datastore_info() + missing_swhid, rel_references[missing_id], self.datastore ) for missing_id in missing_snps: @@ -411,5 +412,5 @@ class StorageChecker: object_type=swhids.ObjectType.SNAPSHOT, object_id=missing_id ) self.db.missing_object_add( - missing_swhid, snp_references[missing_id], self.datastore_info() + missing_swhid, snp_references[missing_id], self.datastore ) diff --git a/swh/scrubber/tests/conftest.py b/swh/scrubber/tests/conftest.py index 807628a77e07557ee485428162df393da1c47253..b39883871d516096b9f964c6abbec881a270b100 100644 --- a/swh/scrubber/tests/conftest.py +++ b/swh/scrubber/tests/conftest.py @@ -9,14 +9,24 @@ import pytest from pytest_postgresql import factories from swh.core.db.db_utils import initialize_database_for_module -from swh.scrubber.db import ScrubberDb +from swh.model.swhids import ObjectType +from swh.scrubber.db import Datastore, ScrubberDb scrubber_postgresql_proc = factories.postgresql_proc( - load=[partial(initialize_database_for_module, modname="scrubber", version=1)], + load=[partial(initialize_database_for_module, modname="scrubber", version=6)], ) postgresql_scrubber = factories.postgresql("scrubber_postgresql_proc") +OBJECT_TYPE = ObjectType.DIRECTORY +PARTITION_ID = 2 +NB_PARTITIONS = 64 + + +@pytest.fixture +def datastore(): + return Datastore(package="storage", cls="postgresql", instance="service=swh-test") + @pytest.fixture def scrubber_db(postgresql_scrubber): @@ -24,4 +34,11 @@ def scrubber_db(postgresql_scrubber): with db.conn.cursor() as cur: cur.execute("TRUNCATE TABLE corrupt_object") cur.execute("TRUNCATE TABLE datastore CASCADE") - yield db + return db + + +@pytest.fixture +def config_id(scrubber_db, datastore): + return scrubber_db.config_add( + f"cfg_{OBJECT_TYPE}_{NB_PARTITIONS}", datastore, OBJECT_TYPE, NB_PARTITIONS + ) diff --git a/swh/scrubber/tests/storage_checker_tests.py b/swh/scrubber/tests/storage_checker_tests.py index 27a31f0e83e5aa16ecd951171d4109b7ec060976..66e5afe1bb8759a4c188ede0bf7f4a7ffe74bf0e 100644 --- a/swh/scrubber/tests/storage_checker_tests.py +++ b/swh/scrubber/tests/storage_checker_tests.py @@ -64,23 +64,36 @@ EXPECTED_PARTITIONS = { (swhids.ObjectType.RELEASE, 0, 1), } +OBJECT_TYPES = ( + swhids.ObjectType.SNAPSHOT, + swhids.ObjectType.DIRECTORY, + swhids.ObjectType.REVISION, + swhids.ObjectType.RELEASE, +) + def assert_checked_ranges( - scrubber_db, datastore, expected_ranges, before_date=None, after_date=None + scrubber_db, config, expected_ranges, before_date=None, after_date=None ): - if before_date is not None: - assert all( - before_date < date < after_date - for (_, _, _, date) in scrubber_db.checked_partition_iter(datastore) - ) + checked_ranges = set() + for object_type, config_id in config: + if before_date is not None: + assert all( + before_date < date < after_date + for (_, _, date, _) in scrubber_db.checked_partition_iter(config_id) + ) - checked_ranges = { - (object_type, start, end) - for (object_type, start, end, date) in scrubber_db.checked_partition_iter( - datastore + checked_ranges.update( + { + (object_type, partition, nb_partitions) + for ( + partition, + nb_partitions, + start_date, + end_date, + ) in scrubber_db.checked_partition_iter(config_id) + } ) - } - assert checked_ranges == expected_ranges @@ -91,21 +104,23 @@ def test_no_corruption(scrubber_db, datastore, swh_storage): swh_storage.snapshot_add(swh_model_data.SNAPSHOTS) before_date = datetime.datetime.now(tz=datetime.timezone.utc) - for object_type in ("snapshot", "release", "revision", "directory"): + config = [] + for object_type in OBJECT_TYPES: + config_id = scrubber_db.config_add( + f"cfg_{object_type.name}", datastore, object_type, 1 + ) + config.append((object_type, config_id)) StorageChecker( db=scrubber_db, storage=swh_storage, - object_type=object_type, - start_partition_id=0, - end_partition_id=1, - nb_partitions=1, + config_id=config_id, ).run() after_date = datetime.datetime.now(tz=datetime.timezone.utc) assert list(scrubber_db.corrupt_object_iter()) == [] assert_checked_ranges( - scrubber_db, datastore, EXPECTED_PARTITIONS, before_date, after_date + scrubber_db, config, EXPECTED_PARTITIONS, before_date, after_date ) @@ -116,14 +131,16 @@ def test_corrupt_snapshot(scrubber_db, datastore, swh_storage, corrupt_idx): swh_storage.snapshot_add(snapshots) before_date = datetime.datetime.now(tz=datetime.timezone.utc) - for object_type in ("snapshot", "release", "revision", "directory"): + config = [] + for object_type in OBJECT_TYPES: + config_id = scrubber_db.config_add( + f"cfg_{object_type.name}", datastore, object_type, 1 + ) + config.append((object_type, config_id)) StorageChecker( db=scrubber_db, storage=swh_storage, - object_type=object_type, - start_partition_id=0, - end_partition_id=1, - nb_partitions=1, + config_id=config_id, ).run() after_date = datetime.datetime.now(tz=datetime.timezone.utc) @@ -143,7 +160,7 @@ def test_corrupt_snapshot(scrubber_db, datastore, swh_storage, corrupt_idx): ) assert_checked_ranges( - scrubber_db, datastore, EXPECTED_PARTITIONS, before_date, after_date + scrubber_db, config, EXPECTED_PARTITIONS, before_date, after_date ) @@ -153,13 +170,11 @@ def test_corrupt_snapshots_same_batch(scrubber_db, datastore, swh_storage): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20) swh_storage.snapshot_add(snapshots) + config_id = scrubber_db.config_add("cfg1", datastore, swhids.ObjectType.SNAPSHOT, 1) StorageChecker( db=scrubber_db, storage=swh_storage, - object_type="snapshot", - start_partition_id=0, - end_partition_id=1, - nb_partitions=1, + config_id=config_id, ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) @@ -172,55 +187,57 @@ def test_corrupt_snapshots_same_batch(scrubber_db, datastore, swh_storage): ] } - assert_checked_ranges(scrubber_db, datastore, {(swhids.ObjectType.SNAPSHOT, 0, 1)}) + assert_checked_ranges( + scrubber_db, + [(swhids.ObjectType.SNAPSHOT, config_id)], + {(swhids.ObjectType.SNAPSHOT, 0, 1)}, + ) def test_corrupt_snapshots_different_batches(scrubber_db, datastore, swh_storage): - # FIXME: this is brittle, because it relies on both objects being on different + # FIXME: this is brittle, because it relies on objects being on different # partitions. In particular on Cassandra, it will break if the hashing scheme # or hash algorithm changes. snapshots = list(swh_model_data.SNAPSHOTS) - snapshots.extend([attr.evolve(snapshots[0], id=bytes([i]) * 20) for i in (0, 255)]) + snapshots.extend( + [attr.evolve(snapshots[0], id=bytes([17 * i]) * 20) for i in range(16)] + ) swh_storage.snapshot_add(snapshots) + config_id = scrubber_db.config_add( + "cfg1", datastore, swhids.ObjectType.SNAPSHOT, 16 + ) StorageChecker( db=scrubber_db, storage=swh_storage, - object_type="snapshot", - start_partition_id=0, - end_partition_id=1, - nb_partitions=2, + config_id=config_id, + limit=8, ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) - assert len(corrupt_objects) == 1 + assert len(corrupt_objects) == 8 - # Simulates resuming from a different process, with an empty lru_cache + # Simulates resuming from a different process scrubber_db.datastore_get_or_add.cache_clear() StorageChecker( db=scrubber_db, storage=swh_storage, - object_type="snapshot", - start_partition_id=1, - end_partition_id=2, - nb_partitions=2, + config_id=config_id, + limit=8, ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) - assert len(corrupt_objects) == 2 + assert len(corrupt_objects) == 16 assert {co.id for co in corrupt_objects} == { swhids.CoreSWHID.from_string(swhid) - for swhid in [ - "swh:1:snp:0000000000000000000000000000000000000000", - "swh:1:snp:ffffffffffffffffffffffffffffffffffffffff", - ] + for swhid in ["swh:1:snp:" + f"{i:x}" * 40 for i in range(16)] } assert_checked_ranges( scrubber_db, - datastore, - {(swhids.ObjectType.SNAPSHOT, 0, 2), (swhids.ObjectType.SNAPSHOT, 1, 2)}, + [(swhids.ObjectType.SNAPSHOT, config_id)], + {(swhids.ObjectType.SNAPSHOT, i, 16) for i in range(16)}, ) @@ -260,14 +277,14 @@ def test_directory_duplicate_entries(scrubber_db, datastore, swh_storage): ) before_date = datetime.datetime.now(tz=datetime.timezone.utc) - for object_type in ("snapshot", "release", "revision", "directory"): + config = [] + for object_type in OBJECT_TYPES: + config_id = scrubber_db.config_add("cfg2", datastore, object_type, 1) + config.append((object_type, config_id)) StorageChecker( db=scrubber_db, storage=swh_storage, - object_type=object_type, - start_partition_id=0, - end_partition_id=1, - nb_partitions=1, + config_id=config_id, ).run() after_date = datetime.datetime.now(tz=datetime.timezone.utc) @@ -285,7 +302,7 @@ def test_directory_duplicate_entries(scrubber_db, datastore, swh_storage): ) assert_checked_ranges( - scrubber_db, datastore, EXPECTED_PARTITIONS, before_date, after_date + scrubber_db, config, EXPECTED_PARTITIONS, before_date, after_date ) @@ -294,28 +311,27 @@ def test_no_recheck(scrubber_db, datastore, swh_storage): Tests that objects that were already checked are not checked again on the next run. """ - # Corrupt two snapshots + # check the whole (empty) storage with a given config + config_id = scrubber_db.config_add("cfg2", datastore, swhids.ObjectType.SNAPSHOT, 1) + StorageChecker( + db=scrubber_db, + storage=swh_storage, + config_id=config_id, + ).run() + + # Corrupt two snapshots and add them to the storage snapshots = list(swh_model_data.SNAPSHOTS) for i in (0, 1): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20) swh_storage.snapshot_add(snapshots) - # Mark ranges as already checked - now = datetime.datetime.now(tz=datetime.timezone.utc) - for (object_type, range_start, range_end) in EXPECTED_PARTITIONS: - scrubber_db.checked_partition_upsert( - datastore, object_type, range_start, range_end, now - ) - - previous_partitions = set(scrubber_db.checked_partition_iter(datastore)) + previous_partitions = set(scrubber_db.checked_partition_iter(config_id)) + # rerun a checker for the same config, it should be a noop StorageChecker( db=scrubber_db, storage=swh_storage, - object_type="snapshot", - start_partition_id=0, - end_partition_id=1, - nb_partitions=1, + config_id=config_id, ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) @@ -324,7 +340,7 @@ def test_no_recheck(scrubber_db, datastore, swh_storage): ), "Detected corrupt objects in ranges that should have been skipped." # Make sure the DB was not changed (in particular, that timestamps were not bumped) - assert set(scrubber_db.checked_partition_iter(datastore)) == previous_partitions + assert set(scrubber_db.checked_partition_iter(config_id)) == previous_partitions def test_no_hole(scrubber_db, datastore, swh_storage): @@ -334,19 +350,19 @@ def test_no_hole(scrubber_db, datastore, swh_storage): swh_storage.release_add([RELEASE1]) swh_storage.snapshot_add([SNAPSHOT1]) - for object_type in ("snapshot", "release", "revision", "directory"): + config = [] + for object_type in OBJECT_TYPES: + config_id = scrubber_db.config_add("cfg2", datastore, object_type, 1) + config.append((object_type, config_id)) StorageChecker( db=scrubber_db, storage=swh_storage, - object_type=object_type, - start_partition_id=0, - end_partition_id=1, - nb_partitions=1, + config_id=config_id, ).run() assert list(scrubber_db.missing_object_iter()) == [] - assert_checked_ranges(scrubber_db, datastore, EXPECTED_PARTITIONS) + assert_checked_ranges(scrubber_db, config, EXPECTED_PARTITIONS) @pytest.mark.parametrize( @@ -386,14 +402,16 @@ def test_one_hole(scrubber_db, datastore, swh_storage, missing_object): swh_storage.snapshot_add([SNAPSHOT1]) - for object_type in ("snapshot", "release", "revision", "directory"): + config = [] + for object_type in OBJECT_TYPES: + config_id = scrubber_db.config_add( + f"cfg_{object_type.name}", datastore, object_type, 1 + ) + config.append((object_type, config_id)) StorageChecker( db=scrubber_db, storage=swh_storage, - object_type=object_type, - start_partition_id=0, - end_partition_id=1, - nb_partitions=1, + config_id=config_id, ).run() assert [mo.id for mo in scrubber_db.missing_object_iter()] == [missing_swhid] @@ -402,7 +420,7 @@ def test_one_hole(scrubber_db, datastore, swh_storage, missing_object): for mor in scrubber_db.missing_object_reference_iter(missing_swhid) } == {(missing_swhid, reference_swhid) for reference_swhid in reference_swhids} - assert_checked_ranges(scrubber_db, datastore, EXPECTED_PARTITIONS) + assert_checked_ranges(scrubber_db, config, EXPECTED_PARTITIONS) def test_two_holes(scrubber_db, datastore, swh_storage): @@ -411,14 +429,16 @@ def test_two_holes(scrubber_db, datastore, swh_storage): swh_storage.release_add([RELEASE1]) swh_storage.snapshot_add([SNAPSHOT1]) - for object_type in ("snapshot", "release", "revision", "directory"): + config = [] + for object_type in OBJECT_TYPES: + config_id = scrubber_db.config_add( + f"cfg_{object_type.name}", datastore, object_type, 1 + ) + config.append((object_type, config_id)) StorageChecker( db=scrubber_db, storage=swh_storage, - object_type=object_type, - start_partition_id=0, - end_partition_id=1, - nb_partitions=1, + config_id=config_id, ).run() assert {mo.id for mo in scrubber_db.missing_object_iter()} == { @@ -434,4 +454,4 @@ def test_two_holes(scrubber_db, datastore, swh_storage): for mor in scrubber_db.missing_object_reference_iter(REVISION1.swhid()) } == {RELEASE1.swhid()} - assert_checked_ranges(scrubber_db, datastore, EXPECTED_PARTITIONS) + assert_checked_ranges(scrubber_db, config, EXPECTED_PARTITIONS) diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py index 7113a9401ea990c86dc136bf5b66db16fcb8e698..917b563935d478ac433b5ef6aa7b63dd5bd8337c 100644 --- a/swh/scrubber/tests/test_cli.py +++ b/swh/scrubber/tests/test_cli.py @@ -58,6 +58,46 @@ def invoke( return result +def test_check_init(mocker, scrubber_db, swh_storage): + mocker.patch("swh.scrubber.get_scrubber_db", return_value=scrubber_db) + result = invoke( + scrubber_db, + [ + "check", + "init", + "--object-type", + "snapshot", + "--nb-partitions", + "4", + "--name", + "cfg1", + ], + storage=swh_storage, + ) + assert result.exit_code == 0, result.output + msg = "Created configuration cfg1 [1] for checking snapshot in postgresql storage" + assert result.output.strip() == msg + + # error: cfg name already exists + result = invoke( + scrubber_db, + [ + "check", + "init", + "--object-type", + "snapshot", + "--nb-partitions", + "8", + "--name", + "cfg1", + ], + storage=swh_storage, + ) + assert result.exit_code == 1, result.output + msg = "Error: Configuration cfg1 already exists" + assert result.output.strip() == msg + + def test_check_storage(mocker, scrubber_db, swh_storage): storage_checker = MagicMock() StorageChecker = mocker.patch( @@ -67,22 +107,73 @@ def test_check_storage(mocker, scrubber_db, swh_storage): "swh.scrubber.get_scrubber_db", return_value=scrubber_db ) result = invoke( - scrubber_db, ["check", "storage", "--object-type=snapshot"], storage=swh_storage + scrubber_db, + [ + "check", + "init", + "--object-type", + "snapshot", + "--nb-partitions", + "4", + "--name", + "cfg1", + ], + storage=swh_storage, ) assert result.exit_code == 0, result.output + msg = "Created configuration cfg1 [1] for checking snapshot in postgresql storage" + assert result.output.strip() == msg + + result = invoke(scrubber_db, ["check", "storage", "cfg1"], storage=swh_storage) + assert result.exit_code == 0, result.output assert result.output == "" - get_scrubber_db.assert_called_once_with(cls="postgresql", db=scrubber_db.conn.dsn) + get_scrubber_db.assert_called_with(cls="postgresql", db=scrubber_db.conn.dsn) StorageChecker.assert_called_once_with( db=scrubber_db, + config_id=1, storage=StorageChecker.mock_calls[0][2]["storage"], - object_type="snapshot", - start_partition_id=0, - end_partition_id=4096, - nb_partitions=4096, + limit=0, ) assert storage_checker.method_calls == [call.run()] + # using the config id instead of the config name + result = invoke( + scrubber_db, ["check", "storage", "--config-id", "1"], storage=swh_storage + ) + assert result.exit_code == 0, result.output + assert result.output == "" + + +def test_check_list(mocker, scrubber_db, swh_storage): + mocker.patch("swh.scrubber.get_scrubber_db", return_value=scrubber_db) + result = invoke(scrubber_db, ["check", "list"], storage=swh_storage) + assert result.exit_code == 0, result.output + assert result.output == "" + with swh_storage.db() as db: + dsn = db.conn.dsn + + result = invoke( + scrubber_db, + [ + "check", + "init", + "--object-type", + "snapshot", + "--nb-partitions", + "4", + "--name", + "cfg1", + ], + storage=swh_storage, + ) + assert result.exit_code == 0, result.output + + result = invoke(scrubber_db, ["check", "list"], storage=swh_storage) + assert result.exit_code == 0, result.output + expected = f"[1] cfg1: snapshot, 4, storage:postgresql ({dsn})\n" + assert result.output == expected, result.output + def test_check_journal( mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group diff --git a/swh/scrubber/tests/test_db.py b/swh/scrubber/tests/test_db.py index 9b5c460f2b6d1b0511cf7e00bbb53bc48390e9d2..e6cc57368c93b4db460e176fdf7cdfb5bffa5583 100644 --- a/swh/scrubber/tests/test_db.py +++ b/swh/scrubber/tests/test_db.py @@ -4,80 +4,158 @@ # See top-level LICENSE file for more information import datetime +from unittest.mock import patch + +from psycopg2.errors import UniqueViolation +import pytest from swh.model.swhids import ObjectType from swh.scrubber.db import Datastore, ScrubberDb -DATASTORE = Datastore(package="storage", cls="postgresql", instance="service=swh-test") -OBJECT_TYPE = ObjectType.DIRECTORY -PARTITION_ID = 10 -NB_PARTITIONS = 64 +from .conftest import NB_PARTITIONS, OBJECT_TYPE + DATE = datetime.datetime(2022, 10, 4, 12, 1, 23, tzinfo=datetime.timezone.utc) +ONE_MINUTE = datetime.timedelta(minutes=1) +ONE_DAY = datetime.timedelta(days=1) + +def test_config_add(datastore: Datastore, scrubber_db: ScrubberDb, config_id: int): + cfg_snp = scrubber_db.config_add("cfg snp", datastore, ObjectType.SNAPSHOT, 42) + assert cfg_snp == 2 -def test_checked_partition_insert(scrubber_db: ScrubberDb): - scrubber_db.checked_partition_upsert( - DATASTORE, OBJECT_TYPE, PARTITION_ID, NB_PARTITIONS, DATE + cfg_snp2 = scrubber_db.config_add("cfg snp 2", datastore, ObjectType.SNAPSHOT, 43) + assert cfg_snp2 == 3 + + cfg_snp3 = scrubber_db.config_add(None, datastore, ObjectType.SNAPSHOT, 44) + assert cfg_snp3 == 4 + assert ( + scrubber_db.config_get(cfg_snp3).name == "check_snapshot_44_storage_postgresql" ) - assert list(scrubber_db.checked_partition_iter(DATASTORE)) == [ - (OBJECT_TYPE, PARTITION_ID, NB_PARTITIONS, DATE) + # XXX this is debatable: is there a good reason to allow 2 configs for the + # same datastore and object type but different partition number, but not + # for the same number of partitions? + with pytest.raises(UniqueViolation): + scrubber_db.config_add("cfg4", datastore, OBJECT_TYPE, NB_PARTITIONS) + + +def test_config_get(datastore: Datastore, scrubber_db: ScrubberDb, config_id: int): + cfg2 = scrubber_db.config_add("cfg2", datastore, ObjectType.SNAPSHOT, 42) + cfg3 = scrubber_db.config_add("cfg3", datastore, ObjectType.SNAPSHOT, 43) + + assert scrubber_db.config_get(cfg2) + assert scrubber_db.config_get(cfg3) + + with pytest.raises(ValueError): + scrubber_db.config_get(cfg3 + 1) + + +def test_checked_config_get_by_name( + datastore: Datastore, scrubber_db: ScrubberDb, config_id: int +): + cfg2 = scrubber_db.config_add("cfg2", datastore, ObjectType.SNAPSHOT, 42) + cfg3 = scrubber_db.config_add("cfg3", datastore, ObjectType.SNAPSHOT, 43) + + assert scrubber_db.config_get_by_name("cfg2") == cfg2 + assert scrubber_db.config_get_by_name("cfg3") == cfg3 + + assert scrubber_db.config_get_by_name("unknown config") is None + + +def test_datastore_get(datastore: Datastore, scrubber_db: ScrubberDb, config_id: int): + assert scrubber_db.datastore_get(1) == datastore + with pytest.raises(ValueError): + scrubber_db.datastore_get(42) + + +def test_checked_partition_insert( + datastore: Datastore, scrubber_db: ScrubberDb, config_id: int +): + with patch("swh.scrubber.db.now", return_value=DATE): + part_gen = scrubber_db.checked_partition_iter_next(config_id) + partition_id = next(part_gen) + scrubber_db.checked_partition_upsert(config_id, partition_id, DATE + ONE_MINUTE) + + assert list(scrubber_db.checked_partition_iter(config_id)) == [ + (partition_id, NB_PARTITIONS, DATE, DATE + ONE_MINUTE) ] -def test_checked_partition_insert_two(scrubber_db: ScrubberDb): - scrubber_db.checked_partition_upsert( - DATASTORE, OBJECT_TYPE, PARTITION_ID, NB_PARTITIONS, DATE - ) - scrubber_db.checked_partition_upsert( - DATASTORE, ObjectType.SNAPSHOT, PARTITION_ID, NB_PARTITIONS, DATE - ) +def test_checked_partition_insert_two( + datastore: Datastore, scrubber_db: ScrubberDb, config_id: int +): + with patch("swh.scrubber.db.now", return_value=DATE): + part_gen = scrubber_db.checked_partition_iter_next(config_id) + part_id = next(part_gen) + scrubber_db.checked_partition_upsert(config_id, part_id, DATE + ONE_MINUTE) + + config_snp = scrubber_db.config_add("cfg2", datastore, ObjectType.SNAPSHOT, 42) + snp_part_gen = scrubber_db.checked_partition_iter_next(config_snp) + snp_part_id = next(snp_part_gen) + scrubber_db.checked_partition_upsert(config_snp, snp_part_id, DATE + ONE_MINUTE) - assert set(scrubber_db.checked_partition_iter(DATASTORE)) == { - (OBJECT_TYPE, PARTITION_ID, NB_PARTITIONS, DATE), - (ObjectType.SNAPSHOT, PARTITION_ID, NB_PARTITIONS, DATE), + assert set(scrubber_db.checked_partition_iter(config_id)) == { + (part_id, NB_PARTITIONS, DATE, DATE + ONE_MINUTE), + } + assert set(scrubber_db.checked_partition_iter(config_snp)) == { + (snp_part_id, 42, DATE, DATE + ONE_MINUTE), } -def test_checked_partition_update(scrubber_db: ScrubberDb): - scrubber_db.checked_partition_upsert( - DATASTORE, OBJECT_TYPE, PARTITION_ID, NB_PARTITIONS, DATE - ) +def test_checked_partition_get_next( + datastore: Datastore, scrubber_db: ScrubberDb, config_id: int +): + config_snp = scrubber_db.config_add("cfg2", datastore, ObjectType.SNAPSHOT, 42) + snp_part_gen = scrubber_db.checked_partition_iter_next(config_snp) + dir_part_gen = scrubber_db.checked_partition_iter_next(config_id) + + assert next(dir_part_gen) == 0 + assert next(snp_part_gen) == 0 + assert next(dir_part_gen) == 1 + assert next(dir_part_gen) == 2 + assert next(snp_part_gen) == 1 + assert next(snp_part_gen) == 2 + assert next(dir_part_gen) == 3 + + date = datetime.datetime.now(tz=datetime.timezone.utc) + scrubber_db.checked_partition_upsert(config_snp, 0, date) + scrubber_db.checked_partition_upsert(config_id, 2, date) + + assert next(snp_part_gen) == 3 + assert next(dir_part_gen) == 4 - date2 = DATE + datetime.timedelta(days=1) - scrubber_db.checked_partition_upsert( - DATASTORE, OBJECT_TYPE, PARTITION_ID, NB_PARTITIONS, date2 - ) - assert list(scrubber_db.checked_partition_iter(DATASTORE)) == [ - (OBJECT_TYPE, PARTITION_ID, NB_PARTITIONS, date2) +def test_checked_partition_update( + datastore: Datastore, scrubber_db: ScrubberDb, config_id: int +): + with patch("swh.scrubber.db.now", return_value=DATE): + dir_part_gen = scrubber_db.checked_partition_iter_next(config_id) + part_id = next(dir_part_gen) + scrubber_db.checked_partition_upsert(config_id, part_id, DATE + ONE_MINUTE) + + date2 = DATE + 2 * ONE_MINUTE + scrubber_db.checked_partition_upsert(config_id, part_id, date2) + + assert list(scrubber_db.checked_partition_iter(config_id)) == [ + (part_id, NB_PARTITIONS, DATE, date2) ] - date3 = DATE + datetime.timedelta(days=-1) - scrubber_db.checked_partition_upsert( - DATASTORE, OBJECT_TYPE, PARTITION_ID, NB_PARTITIONS, date3 - ) + date3 = DATE - ONE_MINUTE + scrubber_db.checked_partition_upsert(config_id, part_id, date3) - assert list(scrubber_db.checked_partition_iter(DATASTORE)) == [ - (OBJECT_TYPE, PARTITION_ID, NB_PARTITIONS, date2) # newest date wins + assert list(scrubber_db.checked_partition_iter(config_id)) == [ + (part_id, NB_PARTITIONS, DATE, date2) # newest date wins ] -def test_checked_partition_get(scrubber_db: ScrubberDb): - assert ( - scrubber_db.checked_partition_get_last_date( - DATASTORE, OBJECT_TYPE, PARTITION_ID, NB_PARTITIONS - ) - is None - ) +def test_checked_partition_get( + datastore: Datastore, scrubber_db: ScrubberDb, config_id: int +): + with patch("swh.scrubber.db.now", return_value=DATE): + dir_part_gen = scrubber_db.checked_partition_iter_next(config_id) + part_id = next(dir_part_gen) + assert scrubber_db.checked_partition_get_last_date(config_id, part_id) is None - scrubber_db.checked_partition_upsert( - DATASTORE, OBJECT_TYPE, PARTITION_ID, NB_PARTITIONS, DATE - ) + scrubber_db.checked_partition_upsert(config_id, part_id, DATE) - assert ( - scrubber_db.checked_partition_get_last_date( - DATASTORE, OBJECT_TYPE, PARTITION_ID, NB_PARTITIONS - ) - == DATE - ) + assert scrubber_db.checked_partition_get_last_date(config_id, part_id) == DATE