From 7ef7db47d628c589049342772096f6cfab89ec79 Mon Sep 17 00:00:00 2001
From: Antoine Lambert <anlambert@softwareheritage.org>
Date: Mon, 25 Mar 2024 16:35:13 +0100
Subject: [PATCH] journal_checker: Add support for checking content objects
 integrity

Allow JournalChecker class to process content objects in order to check
their presence in a given object storage but also to check their integrity
by fetching their bytes and recomputing checksums.

To do such processing, the journal checker must be configured to process
content object type and an object storage configuration must be provided
along the journal client one.

Related to #4694.
---
 swh/scrubber/base_checker.py             |  23 +--
 swh/scrubber/journal_checker.py          |  74 ++++++---
 swh/scrubber/objstorage_checker.py       |  53 +++---
 swh/scrubber/tests/test_journal_kafka.py | 200 ++++++++++++++++++++++-
 4 files changed, 292 insertions(+), 58 deletions(-)

diff --git a/swh/scrubber/base_checker.py b/swh/scrubber/base_checker.py
index 5cf7fc3..3e7c36f 100644
--- a/swh/scrubber/base_checker.py
+++ b/swh/scrubber/base_checker.py
@@ -33,6 +33,12 @@ class BaseChecker(ABC):
 
         self._config: Optional[ConfigEntry] = None
         self._statsd: Optional[Statsd] = None
+        self.statsd_constant_tags = {
+            "object_type": self.object_type.name.lower(),
+            "datastore_package": self.datastore.package,
+            "datastore_cls": self.datastore.cls,
+            "datastore_instance": self.datastore.instance,
+        }
 
     @property
     def config(self) -> ConfigEntry:
@@ -59,6 +65,11 @@ class BaseChecker(ABC):
             )
         return self._statsd
 
+    @property
+    def object_type(self) -> swhids.ObjectType:
+        """Returns the type of object being checked."""
+        return self.config.object_type
+
     @property
     def check_hashes(self) -> bool:
         return self.config.check_hashes
@@ -84,17 +95,7 @@ class BasePartitionChecker(BaseChecker):
     ):
         super().__init__(db=db, config_id=config_id)
         self.limit = limit
-        self.statsd_constant_tags = {
-            "object_type": self.object_type,
-            "nb_partitions": self.nb_partitions,
-            "datastore_package": self.datastore.package,
-            "datastore_cls": self.datastore.cls,
-        }
-
-    @property
-    def object_type(self) -> swhids.ObjectType:
-        """Returns the type of object being checked."""
-        return self.config.object_type
+        self.statsd_constant_tags["nb_partitions"] = self.nb_partitions
 
     @property
     def nb_partitions(self) -> int:
diff --git a/swh/scrubber/journal_checker.py b/swh/scrubber/journal_checker.py
index dae1aae..229e918 100644
--- a/swh/scrubber/journal_checker.py
+++ b/swh/scrubber/journal_checker.py
@@ -7,32 +7,40 @@
 
 import json
 import logging
-from typing import Any, Dict, List
+from typing import Any, Dict, List, Optional
 
 import attr
 
 from swh.journal.client import get_journal_client
 from swh.journal.serializers import kafka_to_value
 from swh.model import model
+from swh.objstorage.factory import get_objstorage
 
 from .base_checker import BaseChecker
 from .db import Datastore, ScrubberDb
+from .objstorage_checker import check_content
 
 logger = logging.getLogger(__name__)
 
 
-def get_datastore(journal_cfg) -> Datastore:
+def get_datastore(
+    journal_cfg: Dict[str, Any], objstorage_cfg: Optional[Dict[str, Any]] = None
+) -> Datastore:
     if journal_cfg.get("cls") == "kafka":
+        package = "journal"
+        cls = "kafka"
+        instance = {
+            "brokers": journal_cfg["brokers"],
+            "group_id": journal_cfg["group_id"],
+            "prefix": journal_cfg["prefix"],
+        }
+        if objstorage_cfg is not None:
+            # objstorage integrity is checked by reading journal content topic in that case
+            instance = {"journal": instance, "objstorage": objstorage_cfg}
         datastore = Datastore(
-            package="journal",
-            cls="kafka",
-            instance=json.dumps(
-                {
-                    "brokers": journal_cfg["brokers"],
-                    "group_id": journal_cfg["group_id"],
-                    "prefix": journal_cfg["prefix"],
-                }
-            ),
+            package=package,
+            cls=cls,
+            instance=json.dumps(instance),
         )
     else:
         raise NotImplementedError(
@@ -46,15 +54,17 @@ class JournalChecker(BaseChecker):
     reports errors in a separate database."""
 
     def __init__(
-        self, db: ScrubberDb, config_id: int, journal_client_config: Dict[str, Any]
+        self,
+        db: ScrubberDb,
+        config_id: int,
+        journal_client_config: Dict[str, Any],
+        objstorage_config: Optional[Dict[str, Any]] = None,
     ):
         super().__init__(db=db, config_id=config_id)
-        self.statsd_constant_tags = {
-            "datastore_package": self.datastore.package,
-            "datastore_cls": self.datastore.cls,
-        }
 
-        if self.config.check_references:
+        object_type = self.object_type.name.lower()
+
+        if self.config.check_references and object_type != "content":
             raise ValueError(
                 "The journal checker cannot check for references, please set "
                 "the 'check_references' to False in the config entry %s.",
@@ -66,15 +76,23 @@ class JournalChecker(BaseChecker):
                 "The journal_client configuration entry should not define the "
                 "object_types field; this is handled by the scrubber configuration entry"
             )
-        self.journal_client_config["object_types"] = [
-            self.config.object_type.name.lower()
-        ]
+        self.journal_client_config["object_types"] = [object_type]
         self.journal_client = get_journal_client(
             **self.journal_client_config,
             # Remove default deserializer; so process_kafka_values() gets the message
             # verbatim so it can archive it with as few modifications a possible.
             value_deserializer=lambda obj_type, msg: msg,
         )
+        self.objstorage = (
+            get_objstorage(**objstorage_config)
+            if objstorage_config is not None
+            else None
+        )
+        if object_type == "content" and self.objstorage is None:
+            raise ValueError(
+                "The journal checker cannot check content objects if an objstorage "
+                "configuration is not provided."
+            )
 
     def run(self) -> None:
         """Runs a journal client with the given configuration.
@@ -106,9 +124,19 @@ class JournalChecker(BaseChecker):
                             "duplicate_directory_entries_total",
                             tags={"object_type": "directory"},
                         )
+                elif object_type == "content":
+                    content = model.Content.from_dict(kafka_to_value(message))
+                    assert self.objstorage is not None
+                    check_content(
+                        content, self.objstorage, self.db, self.config, self.statsd
+                    )
                 else:
                     object_ = cls.from_dict(kafka_to_value(message))
                     has_duplicate_dir_entries = False
-                real_id = object_.compute_hash()
-                if object_.id != real_id or has_duplicate_dir_entries:
-                    self.db.corrupt_object_add(object_.swhid(), self.config, message)
+
+                if object_type != "content":
+                    real_id = object_.compute_hash()
+                    if object_.id != real_id or has_duplicate_dir_entries:
+                        self.db.corrupt_object_add(
+                            object_.swhid(), self.config, message
+                        )
diff --git a/swh/scrubber/objstorage_checker.py b/swh/scrubber/objstorage_checker.py
index de08a88..cbbebdb 100644
--- a/swh/scrubber/objstorage_checker.py
+++ b/swh/scrubber/objstorage_checker.py
@@ -7,6 +7,7 @@ import json
 import logging
 from typing import Iterable, Optional
 
+from swh.core.statsd import Statsd
 from swh.journal.serializers import value_to_kafka
 from swh.model.model import Content
 from swh.model.swhids import ObjectType
@@ -15,7 +16,7 @@ from swh.objstorage.interface import ObjStorageInterface, objid_from_dict
 from swh.storage.interface import StorageInterface
 
 from .base_checker import BasePartitionChecker
-from .db import Datastore, ScrubberDb
+from .db import ConfigEntry, Datastore, ScrubberDb
 
 logger = logging.getLogger(__name__)
 
@@ -29,6 +30,34 @@ def get_objstorage_datastore(objstorage_config):
     )
 
 
+def check_content(
+    content: Content,
+    objstorage: ObjStorageInterface,
+    db: ScrubberDb,
+    config: ConfigEntry,
+    statsd: Statsd,
+) -> None:
+    content_hashes = objid_from_dict(content.hashes())
+    try:
+        content_bytes = objstorage.get(content_hashes)
+    except ObjNotFoundError:
+        if config.check_references:
+            statsd.increment("missing_object_total")
+            db.missing_object_add(id=content.swhid(), reference_ids={}, config=config)
+    else:
+        if config.check_hashes:
+            recomputed_hashes = objid_from_dict(
+                Content.from_data(content_bytes).hashes()
+            )
+            if content_hashes != recomputed_hashes:
+                statsd.increment("hash_mismatch_total")
+                db.corrupt_object_add(
+                    id=content.swhid(),
+                    config=config,
+                    serialized_object=value_to_kafka(content.to_dict()),
+                )
+
+
 class ObjectStorageChecker(BasePartitionChecker):
     """A checker to detect missing and corrupted contents in an object storage.
 
@@ -79,24 +108,4 @@ class ObjectStorageChecker(BasePartitionChecker):
 
     def check_contents(self, contents: Iterable[Content]) -> None:
         for content in contents:
-            content_hashes = objid_from_dict(content.hashes())
-            try:
-                content_bytes = self.objstorage.get(content_hashes)
-            except ObjNotFoundError:
-                if self.check_references:
-                    self.statsd.increment("missing_object_total")
-                    self.db.missing_object_add(
-                        id=content.swhid(), reference_ids={}, config=self.config
-                    )
-            else:
-                if self.check_hashes:
-                    recomputed_hashes = objid_from_dict(
-                        Content.from_data(content_bytes).hashes()
-                    )
-                    if content_hashes != recomputed_hashes:
-                        self.statsd.increment("hash_mismatch_total")
-                        self.db.corrupt_object_add(
-                            id=content.swhid(),
-                            config=self.config,
-                            serialized_object=value_to_kafka(content.to_dict()),
-                        )
+            check_content(content, self.objstorage, self.db, self.config, self.statsd)
diff --git a/swh/scrubber/tests/test_journal_kafka.py b/swh/scrubber/tests/test_journal_kafka.py
index 6f5096f..5c14560 100644
--- a/swh/scrubber/tests/test_journal_kafka.py
+++ b/swh/scrubber/tests/test_journal_kafka.py
@@ -5,6 +5,7 @@
 
 import datetime
 import hashlib
+import json
 
 import attr
 import pytest
@@ -40,9 +41,21 @@ def journal_writer(kafka_server: str, kafka_prefix: str):
     )
 
 
+@pytest.fixture
+def swh_objstorage_config(tmpdir):
+    return {
+        "cls": "pathslicing",
+        "root": str(tmpdir),
+        "slicing": "0:2/2:4/4:6",
+        "compression": "gzip",
+    }
+
+
 @pytest.fixture
 def datastore(
-    kafka_server: str, kafka_prefix: str, kafka_consumer_group: str
+    kafka_server: str,
+    kafka_prefix: str,
+    kafka_consumer_group: str,
 ) -> Datastore:
     journal_config = journal_client_config(
         kafka_server, kafka_prefix, kafka_consumer_group
@@ -51,8 +64,26 @@ def datastore(
     return datastore
 
 
+@pytest.fixture
+def objstorage_datastore(
+    kafka_server: str,
+    kafka_prefix: str,
+    kafka_consumer_group: str,
+    swh_objstorage_config,
+) -> Datastore:
+    journal_config = journal_client_config(
+        kafka_server, kafka_prefix, kafka_consumer_group
+    )
+    datastore = get_datastore(journal_config, swh_objstorage_config)
+    return datastore
+
+
 def test_no_corruption(
-    scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, datastore
+    scrubber_db,
+    kafka_server,
+    kafka_prefix,
+    kafka_consumer_group,
+    datastore,
 ):
     writer = journal_writer(kafka_server, kafka_prefix)
     writer.write_additions("directory", swh_model_data.DIRECTORIES)
@@ -85,6 +116,67 @@ def test_no_corruption(
     assert list(scrubber_db.corrupt_object_iter()) == []
 
 
+def test_contents_check_missing_objstorage_config(
+    scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, datastore
+):
+    journal_cfg = journal_client_config(
+        kafka_server, kafka_prefix, kafka_consumer_group
+    )
+    gid = journal_cfg["group_id"] + "_"
+
+    object_type = "content"
+    journal_cfg["group_id"] = gid + object_type
+    config_id = scrubber_db.config_add(
+        name=f"cfg_{object_type}",
+        datastore=datastore,
+        object_type=ObjectType.CONTENT,
+        nb_partitions=1,
+    )
+    with pytest.raises(
+        ValueError, match="journal checker cannot check content objects"
+    ):
+        JournalChecker(
+            db=scrubber_db, config_id=config_id, journal_client_config=journal_cfg
+        )
+
+
+def test_contents_no_corruption(
+    scrubber_db,
+    kafka_server,
+    kafka_prefix,
+    kafka_consumer_group,
+    objstorage_datastore,
+    swh_objstorage_config,
+):
+    writer = journal_writer(kafka_server, kafka_prefix)
+    writer.write_additions("content", swh_model_data.CONTENTS)
+
+    journal_cfg = journal_client_config(
+        kafka_server, kafka_prefix, kafka_consumer_group
+    )
+    gid = journal_cfg["group_id"] + "_"
+
+    object_type = "content"
+    journal_cfg["group_id"] = gid + object_type
+    config_id = scrubber_db.config_add(
+        name=f"cfg_{object_type}",
+        datastore=objstorage_datastore,
+        object_type=ObjectType.CONTENT,
+        nb_partitions=1,
+    )
+    jc = JournalChecker(
+        db=scrubber_db,
+        config_id=config_id,
+        journal_client_config=journal_cfg,
+        objstorage_config=swh_objstorage_config,
+    )
+    jc.objstorage.add_batch({c.sha1: c.data for c in swh_model_data.CONTENTS})
+    jc.run()
+    jc.journal_client.close()
+
+    assert list(scrubber_db.corrupt_object_iter()) == []
+
+
 @pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS)))
 def test_corrupt_snapshot(
     scrubber_db,
@@ -174,6 +266,110 @@ def test_corrupt_snapshots(
     }
 
 
+@pytest.mark.parametrize("corrupt_idx", range(0, len(swh_model_data.CONTENTS), 5))
+def test_corrupt_content(
+    scrubber_db,
+    kafka_server,
+    kafka_prefix,
+    kafka_consumer_group,
+    objstorage_datastore,
+    corrupt_idx,
+    swh_objstorage_config,
+):
+    config_id = scrubber_db.config_add(
+        name="cfg_content",
+        datastore=objstorage_datastore,
+        object_type=ObjectType.CONTENT,
+        nb_partitions=1,
+    )
+    contents = list(swh_model_data.CONTENTS)
+    contents[corrupt_idx] = attr.evolve(contents[corrupt_idx], sha1_git=b"\x00" * 20)
+
+    writer = journal_writer(kafka_server, kafka_prefix)
+    writer.write_additions("content", contents)
+
+    before_date = datetime.datetime.now(tz=datetime.timezone.utc)
+    jc = JournalChecker(
+        db=scrubber_db,
+        config_id=config_id,
+        journal_client_config=journal_client_config(
+            kafka_server, kafka_prefix, kafka_consumer_group
+        ),
+        objstorage_config=swh_objstorage_config,
+    )
+    jc.objstorage.add_batch({c.sha1: c.data for c in contents})
+    jc.run()
+    after_date = datetime.datetime.now(tz=datetime.timezone.utc)
+
+    corrupt_objects = list(scrubber_db.corrupt_object_iter())
+    assert len(corrupt_objects) == 1
+    assert corrupt_objects[0].id == swhids.CoreSWHID.from_string(
+        "swh:1:cnt:0000000000000000000000000000000000000000"
+    )
+    assert corrupt_objects[0].config.datastore.package == "journal"
+    assert corrupt_objects[0].config.datastore.cls == "kafka"
+    assert '"journal":' in corrupt_objects[0].config.datastore.instance
+    assert (
+        json.dumps(swh_objstorage_config)
+        in corrupt_objects[0].config.datastore.instance
+    )
+    assert (
+        before_date - datetime.timedelta(seconds=5)
+        <= corrupt_objects[0].first_occurrence
+        <= after_date + datetime.timedelta(seconds=5)
+    )
+    corrupted_content = contents[corrupt_idx].to_dict()
+    corrupted_content.pop("data")
+    assert kafka_to_value(corrupt_objects[0].object_) == corrupted_content
+
+
+@pytest.mark.parametrize("missing_idx", range(0, len(swh_model_data.CONTENTS), 5))
+def test_missing_content(
+    scrubber_db,
+    kafka_server,
+    kafka_prefix,
+    kafka_consumer_group,
+    objstorage_datastore,
+    missing_idx,
+    swh_objstorage_config,
+):
+    config_id = scrubber_db.config_add(
+        name="cfg_content",
+        datastore=objstorage_datastore,
+        object_type=ObjectType.CONTENT,
+        nb_partitions=1,
+    )
+    contents = list(swh_model_data.CONTENTS)
+
+    writer = journal_writer(kafka_server, kafka_prefix)
+    writer.write_additions("content", contents)
+
+    before_date = datetime.datetime.now(tz=datetime.timezone.utc)
+    jc = JournalChecker(
+        db=scrubber_db,
+        config_id=config_id,
+        journal_client_config=journal_client_config(
+            kafka_server, kafka_prefix, kafka_consumer_group
+        ),
+        objstorage_config=swh_objstorage_config,
+    )
+    jc.objstorage.add_batch(
+        {c.sha1: c.data for i, c in enumerate(contents) if i != missing_idx}
+    )
+    jc.run()
+    after_date = datetime.datetime.now(tz=datetime.timezone.utc)
+
+    missing_objects = list(scrubber_db.missing_object_iter())
+    assert len(missing_objects) == 1
+    assert missing_objects[0].id == contents[missing_idx].swhid()
+    assert missing_objects[0].config.datastore == objstorage_datastore
+    assert (
+        before_date - datetime.timedelta(seconds=5)
+        <= missing_objects[0].first_occurrence
+        <= after_date + datetime.timedelta(seconds=5)
+    )
+
+
 def test_duplicate_directory_entries(
     scrubber_db,
     kafka_server,
-- 
GitLab