diff --git a/swh/scrubber/journal_checker.py b/swh/scrubber/journal_checker.py index dae1aae2c2c6bedc3c058bc53b8b70cbeb836cb0..bba4024ffcab3b3bac5e2a2b74e002ae30dbbaa5 100644 --- a/swh/scrubber/journal_checker.py +++ b/swh/scrubber/journal_checker.py @@ -7,32 +7,42 @@ import json import logging -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional import attr from swh.journal.client import get_journal_client from swh.journal.serializers import kafka_to_value from swh.model import model +from swh.objstorage.factory import get_objstorage from .base_checker import BaseChecker from .db import Datastore, ScrubberDb +from .objstorage_checker import check_content logger = logging.getLogger(__name__) -def get_datastore(journal_cfg) -> Datastore: +def get_datastore( + journal_cfg: Dict[str, Any], objstorage_cfg: Optional[Dict[str, Any]] = None +) -> Datastore: if journal_cfg.get("cls") == "kafka": + package = "journal" + cls = "kafka" + instance = { + "brokers": journal_cfg["brokers"], + "group_id": journal_cfg["group_id"], + "prefix": journal_cfg["prefix"], + } + if objstorage_cfg is not None: + # objstorage integrity is checked by reading journal content topic in that case + instance = {"journal": instance, "objstorage": objstorage_cfg} + package = "objstorage" + cls = objstorage_cfg.get("cls", "") datastore = Datastore( - package="journal", - cls="kafka", - instance=json.dumps( - { - "brokers": journal_cfg["brokers"], - "group_id": journal_cfg["group_id"], - "prefix": journal_cfg["prefix"], - } - ), + package=package, + cls=cls, + instance=json.dumps(instance), ) else: raise NotImplementedError( @@ -46,15 +56,24 @@ class JournalChecker(BaseChecker): reports errors in a separate database.""" def __init__( - self, db: ScrubberDb, config_id: int, journal_client_config: Dict[str, Any] + self, + db: ScrubberDb, + config_id: int, + journal_client_config: Dict[str, Any], + objstorage_config: Optional[Dict[str, Any]] = None, ): super().__init__(db=db, config_id=config_id) self.statsd_constant_tags = { "datastore_package": self.datastore.package, "datastore_cls": self.datastore.cls, } + if objstorage_config is not None: + # to differentiate metrics from partition based checker + self.statsd_constant_tags["journal_topic"] = "content" + + object_type = self.config.object_type.name.lower() - if self.config.check_references: + if self.config.check_references and object_type != "content": raise ValueError( "The journal checker cannot check for references, please set " "the 'check_references' to False in the config entry %s.", @@ -66,15 +85,23 @@ class JournalChecker(BaseChecker): "The journal_client configuration entry should not define the " "object_types field; this is handled by the scrubber configuration entry" ) - self.journal_client_config["object_types"] = [ - self.config.object_type.name.lower() - ] + self.journal_client_config["object_types"] = [object_type] self.journal_client = get_journal_client( **self.journal_client_config, # Remove default deserializer; so process_kafka_values() gets the message # verbatim so it can archive it with as few modifications a possible. value_deserializer=lambda obj_type, msg: msg, ) + self.objstorage = ( + get_objstorage(**objstorage_config) + if objstorage_config is not None + else None + ) + if object_type == "content" and self.objstorage is None: + raise ValueError( + "The journal checker cannot check content objects if an objstorage " + "configuration is not provided." + ) def run(self) -> None: """Runs a journal client with the given configuration. @@ -106,9 +133,19 @@ class JournalChecker(BaseChecker): "duplicate_directory_entries_total", tags={"object_type": "directory"}, ) + elif object_type == "content": + content = model.Content.from_dict(kafka_to_value(message)) + assert self.objstorage is not None + check_content( + content, self.objstorage, self.db, self.config, self.statsd + ) else: object_ = cls.from_dict(kafka_to_value(message)) has_duplicate_dir_entries = False - real_id = object_.compute_hash() - if object_.id != real_id or has_duplicate_dir_entries: - self.db.corrupt_object_add(object_.swhid(), self.config, message) + + if object_type != "content": + real_id = object_.compute_hash() + if object_.id != real_id or has_duplicate_dir_entries: + self.db.corrupt_object_add( + object_.swhid(), self.config, message + ) diff --git a/swh/scrubber/objstorage_checker.py b/swh/scrubber/objstorage_checker.py index de08a88f1286a25ea565d1ec5c8c90e536a830f0..cbbebdb5133d88bfc90cc7fedba5180a44af2c4b 100644 --- a/swh/scrubber/objstorage_checker.py +++ b/swh/scrubber/objstorage_checker.py @@ -7,6 +7,7 @@ import json import logging from typing import Iterable, Optional +from swh.core.statsd import Statsd from swh.journal.serializers import value_to_kafka from swh.model.model import Content from swh.model.swhids import ObjectType @@ -15,7 +16,7 @@ from swh.objstorage.interface import ObjStorageInterface, objid_from_dict from swh.storage.interface import StorageInterface from .base_checker import BasePartitionChecker -from .db import Datastore, ScrubberDb +from .db import ConfigEntry, Datastore, ScrubberDb logger = logging.getLogger(__name__) @@ -29,6 +30,34 @@ def get_objstorage_datastore(objstorage_config): ) +def check_content( + content: Content, + objstorage: ObjStorageInterface, + db: ScrubberDb, + config: ConfigEntry, + statsd: Statsd, +) -> None: + content_hashes = objid_from_dict(content.hashes()) + try: + content_bytes = objstorage.get(content_hashes) + except ObjNotFoundError: + if config.check_references: + statsd.increment("missing_object_total") + db.missing_object_add(id=content.swhid(), reference_ids={}, config=config) + else: + if config.check_hashes: + recomputed_hashes = objid_from_dict( + Content.from_data(content_bytes).hashes() + ) + if content_hashes != recomputed_hashes: + statsd.increment("hash_mismatch_total") + db.corrupt_object_add( + id=content.swhid(), + config=config, + serialized_object=value_to_kafka(content.to_dict()), + ) + + class ObjectStorageChecker(BasePartitionChecker): """A checker to detect missing and corrupted contents in an object storage. @@ -79,24 +108,4 @@ class ObjectStorageChecker(BasePartitionChecker): def check_contents(self, contents: Iterable[Content]) -> None: for content in contents: - content_hashes = objid_from_dict(content.hashes()) - try: - content_bytes = self.objstorage.get(content_hashes) - except ObjNotFoundError: - if self.check_references: - self.statsd.increment("missing_object_total") - self.db.missing_object_add( - id=content.swhid(), reference_ids={}, config=self.config - ) - else: - if self.check_hashes: - recomputed_hashes = objid_from_dict( - Content.from_data(content_bytes).hashes() - ) - if content_hashes != recomputed_hashes: - self.statsd.increment("hash_mismatch_total") - self.db.corrupt_object_add( - id=content.swhid(), - config=self.config, - serialized_object=value_to_kafka(content.to_dict()), - ) + check_content(content, self.objstorage, self.db, self.config, self.statsd) diff --git a/swh/scrubber/tests/test_journal_kafka.py b/swh/scrubber/tests/test_journal_kafka.py index 6f5096faaa514c82bf8f7dec072a3c0a0fba1548..b97a8f1c2d2237604cdb806ddc0f1d3e93644885 100644 --- a/swh/scrubber/tests/test_journal_kafka.py +++ b/swh/scrubber/tests/test_journal_kafka.py @@ -40,9 +40,21 @@ def journal_writer(kafka_server: str, kafka_prefix: str): ) +@pytest.fixture +def swh_objstorage_config(tmpdir): + return { + "cls": "pathslicing", + "root": str(tmpdir), + "slicing": "0:2/2:4/4:6", + "compression": "gzip", + } + + @pytest.fixture def datastore( - kafka_server: str, kafka_prefix: str, kafka_consumer_group: str + kafka_server: str, + kafka_prefix: str, + kafka_consumer_group: str, ) -> Datastore: journal_config = journal_client_config( kafka_server, kafka_prefix, kafka_consumer_group @@ -51,8 +63,26 @@ def datastore( return datastore +@pytest.fixture +def objstorage_datastore( + kafka_server: str, + kafka_prefix: str, + kafka_consumer_group: str, + swh_objstorage_config, +) -> Datastore: + journal_config = journal_client_config( + kafka_server, kafka_prefix, kafka_consumer_group + ) + datastore = get_datastore(journal_config, swh_objstorage_config) + return datastore + + def test_no_corruption( - scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, datastore + scrubber_db, + kafka_server, + kafka_prefix, + kafka_consumer_group, + datastore, ): writer = journal_writer(kafka_server, kafka_prefix) writer.write_additions("directory", swh_model_data.DIRECTORIES) @@ -85,6 +115,67 @@ def test_no_corruption( assert list(scrubber_db.corrupt_object_iter()) == [] +def test_contents_check_missing_objstorage_config( + scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, datastore +): + journal_cfg = journal_client_config( + kafka_server, kafka_prefix, kafka_consumer_group + ) + gid = journal_cfg["group_id"] + "_" + + object_type = "content" + journal_cfg["group_id"] = gid + object_type + config_id = scrubber_db.config_add( + name=f"cfg_{object_type}", + datastore=datastore, + object_type=ObjectType.CONTENT, + nb_partitions=1, + ) + with pytest.raises( + ValueError, match="journal checker cannot check content objects" + ): + JournalChecker( + db=scrubber_db, config_id=config_id, journal_client_config=journal_cfg + ) + + +def test_contents_no_corruption( + scrubber_db, + kafka_server, + kafka_prefix, + kafka_consumer_group, + objstorage_datastore, + swh_objstorage_config, +): + writer = journal_writer(kafka_server, kafka_prefix) + writer.write_additions("content", swh_model_data.CONTENTS) + + journal_cfg = journal_client_config( + kafka_server, kafka_prefix, kafka_consumer_group + ) + gid = journal_cfg["group_id"] + "_" + + object_type = "content" + journal_cfg["group_id"] = gid + object_type + config_id = scrubber_db.config_add( + name=f"cfg_{object_type}", + datastore=objstorage_datastore, + object_type=ObjectType.CONTENT, + nb_partitions=1, + ) + jc = JournalChecker( + db=scrubber_db, + config_id=config_id, + journal_client_config=journal_cfg, + objstorage_config=swh_objstorage_config, + ) + jc.objstorage.add_batch({c.sha1: c.data for c in swh_model_data.CONTENTS}) + jc.run() + jc.journal_client.close() + + assert list(scrubber_db.corrupt_object_iter()) == [] + + @pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS))) def test_corrupt_snapshot( scrubber_db, @@ -174,6 +265,106 @@ def test_corrupt_snapshots( } +@pytest.mark.parametrize("corrupt_idx", range(0, len(swh_model_data.CONTENTS), 5)) +def test_corrupt_content( + scrubber_db, + kafka_server, + kafka_prefix, + kafka_consumer_group, + objstorage_datastore, + corrupt_idx, + swh_objstorage_config, +): + config_id = scrubber_db.config_add( + name="cfg_content", + datastore=objstorage_datastore, + object_type=ObjectType.CONTENT, + nb_partitions=1, + ) + contents = list(swh_model_data.CONTENTS) + contents[corrupt_idx] = attr.evolve(contents[corrupt_idx], sha1_git=b"\x00" * 20) + + writer = journal_writer(kafka_server, kafka_prefix) + writer.write_additions("content", contents) + + before_date = datetime.datetime.now(tz=datetime.timezone.utc) + jc = JournalChecker( + db=scrubber_db, + config_id=config_id, + journal_client_config=journal_client_config( + kafka_server, kafka_prefix, kafka_consumer_group + ), + objstorage_config=swh_objstorage_config, + ) + jc.objstorage.add_batch({c.sha1: c.data for c in contents}) + jc.run() + after_date = datetime.datetime.now(tz=datetime.timezone.utc) + + corrupt_objects = list(scrubber_db.corrupt_object_iter()) + assert len(corrupt_objects) == 1 + assert corrupt_objects[0].id == swhids.CoreSWHID.from_string( + "swh:1:cnt:0000000000000000000000000000000000000000" + ) + assert corrupt_objects[0].config.datastore.package == "objstorage" + assert corrupt_objects[0].config.datastore.cls == swh_objstorage_config["cls"] + assert '"journal":' in corrupt_objects[0].config.datastore.instance + assert ( + before_date - datetime.timedelta(seconds=5) + <= corrupt_objects[0].first_occurrence + <= after_date + datetime.timedelta(seconds=5) + ) + corrupted_content = contents[corrupt_idx].to_dict() + corrupted_content.pop("data") + assert kafka_to_value(corrupt_objects[0].object_) == corrupted_content + + +@pytest.mark.parametrize("missing_idx", range(0, len(swh_model_data.CONTENTS), 5)) +def test_missing_content( + scrubber_db, + kafka_server, + kafka_prefix, + kafka_consumer_group, + objstorage_datastore, + missing_idx, + swh_objstorage_config, +): + config_id = scrubber_db.config_add( + name="cfg_content", + datastore=objstorage_datastore, + object_type=ObjectType.CONTENT, + nb_partitions=1, + ) + contents = list(swh_model_data.CONTENTS) + + writer = journal_writer(kafka_server, kafka_prefix) + writer.write_additions("content", contents) + + before_date = datetime.datetime.now(tz=datetime.timezone.utc) + jc = JournalChecker( + db=scrubber_db, + config_id=config_id, + journal_client_config=journal_client_config( + kafka_server, kafka_prefix, kafka_consumer_group + ), + objstorage_config=swh_objstorage_config, + ) + jc.objstorage.add_batch( + {c.sha1: c.data for i, c in enumerate(contents) if i != missing_idx} + ) + jc.run() + after_date = datetime.datetime.now(tz=datetime.timezone.utc) + + missing_objects = list(scrubber_db.missing_object_iter()) + assert len(missing_objects) == 1 + assert missing_objects[0].id == contents[missing_idx].swhid() + assert missing_objects[0].config.datastore == objstorage_datastore + assert ( + before_date - datetime.timedelta(seconds=5) + <= missing_objects[0].first_occurrence + <= after_date + datetime.timedelta(seconds=5) + ) + + def test_duplicate_directory_entries( scrubber_db, kafka_server,