diff --git a/swh/scrubber/base_checker.py b/swh/scrubber/base_checker.py new file mode 100644 index 0000000000000000000000000000000000000000..5cf7fc39093b79446d62a2226915ea430fd1d543 --- /dev/null +++ b/swh/scrubber/base_checker.py @@ -0,0 +1,144 @@ +# Copyright (C) 2024 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from abc import ABC, abstractmethod +from itertools import count, islice +import logging +from typing import Any, Dict, Iterable, Optional + +import psycopg2 +import tenacity + +from swh.core.statsd import Statsd +from swh.model import swhids + +from .db import ConfigEntry, Datastore, ScrubberDb + +logger = logging.getLogger(__name__) + + +class BaseChecker(ABC): + """Base Checker class wrapping common features.""" + + def __init__( + self, + db: ScrubberDb, + config_id: int, + ): + self.db = db + self.config_id = config_id + self.statsd_constant_tags: Dict[str, Any] = {} + + self._config: Optional[ConfigEntry] = None + self._statsd: Optional[Statsd] = None + + @property + def config(self) -> ConfigEntry: + """Returns a :class:`ConfigEntry` instance containing checker configuration.""" + if self._config is None: + self._config = self.db.config_get(self.config_id) + + assert self._config is not None + return self._config + + @property + def datastore(self) -> Datastore: + """Returns a :class:`Datastore` instance representing the source of data + being checked.""" + return self.config.datastore + + @property + def statsd(self) -> Statsd: + """Returns a :class:`Statsd` instance to send statsd metrics.""" + if self._statsd is None: + self._statsd = Statsd( + namespace="swh_scrubber", + constant_tags=self.statsd_constant_tags, + ) + return self._statsd + + @property + def check_hashes(self) -> bool: + return self.config.check_hashes + + @property + def check_references(self) -> bool: + return self.config.check_references + + @abstractmethod + def run(self) -> None: + """Run the checker processing, derived classes must implement this method.""" + pass + + +class BasePartitionChecker(BaseChecker): + """Base class for checkers processing partition of objects.""" + + def __init__( + self, + db: ScrubberDb, + config_id: int, + limit: int = 0, + ): + super().__init__(db=db, config_id=config_id) + self.limit = limit + self.statsd_constant_tags = { + "object_type": self.object_type, + "nb_partitions": self.nb_partitions, + "datastore_package": self.datastore.package, + "datastore_cls": self.datastore.cls, + } + + @property + def object_type(self) -> swhids.ObjectType: + """Returns the type of object being checked.""" + return self.config.object_type + + @property + def nb_partitions(self) -> int: + """Returns the number of partitions set in configuration.""" + return self.config.nb_partitions + + def run(self) -> None: + """Runs on all objects of ``object_type`` in each partition between + ``start_partition_id`` (inclusive) and ``end_partition_id`` (exclusive). + """ + counter: Iterable[int] = count() + if self.limit: + counter = islice(counter, 0, self.limit) + for _, partition_id in zip( + counter, self.db.checked_partition_iter_next(self.config_id) + ): + logger.debug( + "Processing %s partition %d/%d", + self.object_type, + partition_id, + self.nb_partitions, + ) + + self._check_partition(self.object_type, partition_id) + + self.db.checked_partition_upsert( + self.config_id, + partition_id, + ) + + @tenacity.retry( + retry=tenacity.retry_if_exception_type(psycopg2.OperationalError), + wait=tenacity.wait_random_exponential(min=10, max=180), + ) + def _check_partition( + self, object_type: swhids.ObjectType, partition_id: int + ) -> None: + "Retryable method checking objects in partition." + return self.check_partition(object_type, partition_id) + + @abstractmethod + def check_partition( + self, object_type: swhids.ObjectType, partition_id: int + ) -> None: + """Abstract method that derived classes must implement to check objects + in partition.""" + pass diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py index ca2fafe6df82f21c025142bf760bcbb6a81c652f..2347a2c428748c079b4fdcb9eb3ba8244dd5760c 100644 --- a/swh/scrubber/cli.py +++ b/swh/scrubber/cli.py @@ -1,4 +1,4 @@ -# Copyright (C) 2022-2023 The Software Heritage developers +# Copyright (C) 2022-2024 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -531,7 +531,7 @@ def scrubber_check_journal(ctx, name, config_id) -> None: checker = JournalChecker( db=ctx.obj["db"], - journal=conf["journal"], + journal_client_config=conf["journal"], config_id=config_id, ) diff --git a/swh/scrubber/journal_checker.py b/swh/scrubber/journal_checker.py index eabd4a6f0ab5570533f738ffd156043015aae412..6d72df6bfb52ec1710749a7921e7b981c3e4859e 100644 --- a/swh/scrubber/journal_checker.py +++ b/swh/scrubber/journal_checker.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021-2023 The Software Heritage developers +# Copyright (C) 2021-2024 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -7,16 +7,16 @@ import json import logging -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List import attr -from swh.core.statsd import Statsd from swh.journal.client import get_journal_client from swh.journal.serializers import kafka_to_value from swh.model import model -from .db import ConfigEntry, Datastore, ScrubberDb +from .base_checker import BaseChecker +from .db import Datastore, ScrubberDb logger = logging.getLogger(__name__) @@ -36,29 +36,31 @@ def get_datastore(journal_cfg) -> Datastore: ) else: raise NotImplementedError( - f"JournalChecker(journal={journal_cfg!r}).datastore()" + f"JournalChecker(journal_client_config={journal_cfg!r}).datastore()" ) return datastore -class JournalChecker: +class JournalChecker(BaseChecker): """Reads a chunk of a swh-storage database, recomputes checksums, and reports errors in a separate database.""" - _config: Optional[ConfigEntry] = None - _datastore: Optional[Datastore] = None - - def __init__(self, db: ScrubberDb, config_id: int, journal: Dict[str, Any]): - self.db = db - self.config_id = config_id + def __init__( + self, db: ScrubberDb, config_id: int, journal_client_config: Dict[str, Any] + ): + super().__init__(db=db, config_id=config_id) + self.statsd_constant_tags = { + "datastore_package": self.datastore.package, + "datastore_cls": self.datastore.cls, + } if self.config.check_references: raise ValueError( - "The journal checcker cannot check for references, please set " + "The journal checker cannot check for references, please set " "the 'check_references' to False in the config entry %s.", self.config_id, ) - self.journal_client_config = journal.copy() + self.journal_client_config = journal_client_config.copy() if "object_types" in self.journal_client_config: raise ValueError( "The journal_client configuration entry should not define the " @@ -73,35 +75,8 @@ class JournalChecker: # verbatim so it can archive it with as few modifications a possible. value_deserializer=lambda obj_type, msg: msg, ) - self._statsd: Optional[Statsd] = None - - @property - def config(self) -> ConfigEntry: - if self._config is None: - self._config = self.db.config_get(self.config_id) - - assert self._config is not None - return self._config - - @property - def datastore(self) -> Datastore: - """Returns a :class:`Datastore` instance representing the journal instance - being checked.""" - return self.config.datastore - - @property - def statsd(self) -> Statsd: - if self._statsd is None: - self._statsd = Statsd( - namespace="swh_scrubber", - constant_tags={ - "datastore_package": self.datastore.package, - "datastore_cls": self.datastore.cls, - }, - ) - return self._statsd - def run(self): + def run(self) -> None: """Runs a journal client with the given configuration. This method does not return, unless otherwise configured (with ``stop_on_eof``). """ diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py index 4e9203d6ecfcd6a724bdfde8f3085360a2ced5ab..74f398e496ad9dc934be542fe1e082121002f9ec 100644 --- a/swh/scrubber/storage_checker.py +++ b/swh/scrubber/storage_checker.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021-2023 The Software Heritage developers +# Copyright (C) 2021-2024 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -7,15 +7,10 @@ import collections import contextlib -from itertools import count, islice import json import logging from typing import Iterable, Optional, Tuple, Union -import psycopg2 -import tenacity - -from swh.core.statsd import Statsd from swh.journal.serializers import value_to_kafka from swh.model import swhids from swh.model.model import ( @@ -35,7 +30,8 @@ from swh.storage.cassandra.storage import CassandraStorage from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage as PostgresqlStorage -from .db import ConfigEntry, Datastore, ScrubberDb +from .base_checker import BasePartitionChecker +from .db import Datastore, ScrubberDb logger = logging.getLogger(__name__) @@ -113,94 +109,17 @@ def get_datastore(storage) -> Datastore: return datastore -class StorageChecker: +class StorageChecker(BasePartitionChecker): """Reads a chunk of a swh-storage database, recomputes checksums, and reports errors in a separate database.""" def __init__( self, db: ScrubberDb, config_id: int, storage: StorageInterface, limit: int = 0 ): - self.db = db + super().__init__(db=db, config_id=config_id, limit=limit) self.storage = storage - self.config_id = config_id - self.limit = limit - - self._config: Optional[ConfigEntry] = None - self._statsd: Optional[Statsd] = None - - @property - def config(self) -> ConfigEntry: - if self._config is None: - self._config = self.db.config_get(self.config_id) - - assert self._config is not None - return self._config - - @property - def nb_partitions(self) -> int: - return self.config.nb_partitions - - @property - def object_type(self) -> swhids.ObjectType: - return self.config.object_type - - @property - def check_hashes(self) -> bool: - return self.config.check_hashes - - @property - def check_references(self) -> bool: - return self.config.check_references - - @property - def datastore(self) -> Datastore: - """Returns a :class:`Datastore` instance representing the swh-storage instance - being checked.""" - return self.config.datastore - - @property - def statsd(self) -> Statsd: - if self._statsd is None: - self._statsd = Statsd( - namespace="swh_scrubber", - constant_tags={ - "object_type": self.object_type, - "nb_partitions": self.nb_partitions, - "datastore_package": self.datastore.package, - "datastore_cls": self.datastore.cls, - }, - ) - return self._statsd - - def run(self) -> None: - """Runs on all objects of ``object_type`` in a partition between - ``start_partition_id`` (inclusive) and ``end_partition_id`` (exclusive) - """ - counter: Iterable[int] = count() - if self.limit: - counter = islice(counter, 0, self.limit) - for i, partition_id in zip( - counter, self.db.checked_partition_iter_next(self.config_id) - ): - logger.debug( - "Processing %s partition %d/%d", - self.object_type, - partition_id, - self.nb_partitions, - ) - - self._check_partition(self.object_type, partition_id) - self.db.checked_partition_upsert( - self.config_id, - partition_id, - ) - - @tenacity.retry( - retry=tenacity.retry_if_exception_type(psycopg2.OperationalError), - wait=tenacity.wait_random_exponential(min=10, max=180), - ) - def _check_partition( + def check_partition( self, object_type: swhids.ObjectType, partition_id: int ) -> None: page_token = None diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py index 381f7d18207fb5af05ccb6e258c4f3a7d29d7a2a..45aecf42c50b7c8b1af377a655bd30e277b2a043 100644 --- a/swh/scrubber/tests/test_cli.py +++ b/swh/scrubber/tests/test_cli.py @@ -1,4 +1,4 @@ -# Copyright (C) 2020-2022 The Software Heritage developers +# Copyright (C) 2020-2024 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -688,7 +688,7 @@ def test_check_journal( JournalChecker.assert_called_once_with( db=scrubber_db, - journal={ + journal_client_config={ "brokers": kafka_server, "cls": "kafka", "group_id": kafka_consumer_group, diff --git a/swh/scrubber/tests/test_journal_kafka.py b/swh/scrubber/tests/test_journal_kafka.py index e31bba3fd08e4590002a2878199552bd52c76a4c..6f5096faaa514c82bf8f7dec072a3c0a0fba1548 100644 --- a/swh/scrubber/tests/test_journal_kafka.py +++ b/swh/scrubber/tests/test_journal_kafka.py @@ -1,4 +1,4 @@ -# Copyright (C) 2022-2023 The Software Heritage developers +# Copyright (C) 2022-2024 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -77,7 +77,7 @@ def test_no_corruption( jc = JournalChecker( db=scrubber_db, config_id=config_id, - journal=journal_cfg, + journal_client_config=journal_cfg, ) jc.run() jc.journal_client.close() @@ -111,7 +111,9 @@ def test_corrupt_snapshot( JournalChecker( db=scrubber_db, config_id=config_id, - journal=journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group), + journal_client_config=journal_client_config( + kafka_server, kafka_prefix, kafka_consumer_group + ), ).run() after_date = datetime.datetime.now(tz=datetime.timezone.utc) @@ -156,7 +158,9 @@ def test_corrupt_snapshots( JournalChecker( db=scrubber_db, config_id=config_id, - journal=journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group), + journal_client_config=journal_client_config( + kafka_server, kafka_prefix, kafka_consumer_group + ), ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) @@ -223,7 +227,9 @@ def test_duplicate_directory_entries( JournalChecker( db=scrubber_db, config_id=config_id, - journal=journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group), + journal_client_config=journal_client_config( + kafka_server, kafka_prefix, kafka_consumer_group + ), ).run() corrupt_objects = list(scrubber_db.corrupt_object_iter()) @@ -258,5 +264,5 @@ def test_check_references_raises( JournalChecker( db=scrubber_db, config_id=config_id, - journal=journal_config, + journal_client_config=journal_config, )