From a4045264f6159b3aa9e790139a23264c8c4533f0 Mon Sep 17 00:00:00 2001
From: Antoine Lambert <anlambert@softwareheritage.org>
Date: Mon, 25 Mar 2024 16:35:13 +0100
Subject: [PATCH] objstorage_checker: Add support for consuming content ids
 from journal

Add ObjectStorageCheckerFromJournal class to consume content ids from a kafka
topic in order to check their presence in a given object storage but also
to check their integrity by fetching their bytes and recomputing checksums.

Related to #4694.
---
 swh/scrubber/objstorage_checker.py            | 155 +++++++++++++-----
 .../tests/objstorage_checker_tests.py         | 149 ++++++++++++++---
 2 files changed, 246 insertions(+), 58 deletions(-)

diff --git a/swh/scrubber/objstorage_checker.py b/swh/scrubber/objstorage_checker.py
index 45d013b..2780e4a 100644
--- a/swh/scrubber/objstorage_checker.py
+++ b/swh/scrubber/objstorage_checker.py
@@ -5,17 +5,19 @@
 
 import json
 import logging
-from typing import Iterable, Optional
+from typing import Any, Dict, List, Optional, Protocol
 
-from swh.journal.serializers import value_to_kafka
+from swh.core.statsd import Statsd
+from swh.journal.client import get_journal_client
+from swh.journal.serializers import kafka_to_value, value_to_kafka
 from swh.model.model import Content
 from swh.model.swhids import ObjectType
 from swh.objstorage.exc import ObjNotFoundError
 from swh.objstorage.interface import ObjStorageInterface, objid_from_dict
 from swh.storage.interface import StorageInterface
 
-from .base_checker import BasePartitionChecker
-from .db import Datastore, ScrubberDb
+from .base_checker import BaseChecker, BasePartitionChecker
+from .db import ConfigEntry, Datastore, ScrubberDb
 
 logger = logging.getLogger(__name__)
 
@@ -29,12 +31,60 @@ def get_objstorage_datastore(objstorage_config):
     )
 
 
-class ObjectStorageChecker(BasePartitionChecker):
-    """A checker to detect missing and corrupted contents in an object storage.
+class ObjectStorageCheckerProtocol(Protocol):
+    db: ScrubberDb
+    objstorage: ObjStorageInterface
 
-    It iterates on content objects referenced in a storage instance, check they
-    are available in a given object storage instance then retrieve their bytes
-    from it in order to recompute checksums and detect corruptions."""
+    @property
+    def config(self) -> ConfigEntry:
+        ...
+
+    @property
+    def statsd(self) -> Statsd:
+        ...
+
+
+class ContentCheckerMixin(ObjectStorageCheckerProtocol):
+    """Mixin class implementing content checks used by object storage checkers."""
+
+    def check_content(self, content: Content) -> None:
+        """Checks if a content exists in an object storage (if ``check_references`` is set to
+        :const:`True` in checker config) or if a content is corrupted in an object storage (if
+        ``check_hashes`` is set to :const:`True` in checker config).
+        """
+
+        content_hashes = objid_from_dict(content.hashes())
+        try:
+            content_bytes = self.objstorage.get(content_hashes)
+        except ObjNotFoundError:
+            if self.config.check_references:
+                self.statsd.increment("missing_object_total")
+                self.db.missing_object_add(
+                    id=content.swhid(), reference_ids={}, config=self.config
+                )
+        else:
+            if self.config.check_hashes:
+                recomputed_hashes = objid_from_dict(
+                    Content.from_data(content_bytes).hashes()
+                )
+                if content_hashes != recomputed_hashes:
+                    self.statsd.increment("hash_mismatch_total")
+                    self.db.corrupt_object_add(
+                        id=content.swhid(),
+                        config=self.config,
+                        serialized_object=value_to_kafka(content.to_dict()),
+                    )
+
+
+class ObjectStorageCheckerFromStoragePartition(
+    BasePartitionChecker, ContentCheckerMixin
+):
+    """A partition based checker to detect missing and corrupted contents in an object storage.
+
+    It iterates on content objects referenced in a storage instance, check they are available
+    in a given object storage instance (if ``check_references`` is set to :const:`True` in
+    checker config) then retrieve their bytes from it in order to recompute checksums and detect
+    corruptions (if ``check_hashes`` is set to :const:`True` in checker config)."""
 
     def __init__(
         self,
@@ -50,13 +100,20 @@ class ObjectStorageChecker(BasePartitionChecker):
             objstorage if objstorage is not None else getattr(storage, "objstorage")
         )
 
-    def check_partition(self, object_type: ObjectType, partition_id: int) -> None:
-        if object_type != ObjectType.CONTENT:
+        object_type = self.object_type.name.lower()
+
+        if object_type != "content":
+            raise ValueError(
+                "ObjectStorageCheckerFromStoragePartition can only check objects of type "
+                f"content, checking objects of type {object_type} is not supported."
+            )
+
+        if self.objstorage is None:
             raise ValueError(
-                "ObjectStorageChecker can only check objects of type content,"
-                f"checking objects of type {object_type.name.lower()} is not supported."
+                "An object storage must be provided to ObjectStorageCheckerFromStoragePartition."  # noqa
             )
 
+    def check_partition(self, object_type: ObjectType, partition_id: int) -> None:
         page_token = None
         while True:
             page = self.storage.content_get_partition(
@@ -70,32 +127,56 @@ class ObjectStorageChecker(BasePartitionChecker):
                 "batch_duration_seconds", tags={"operation": "check_hashes"}
             ):
                 logger.debug("Checking %s content object hashes", len(contents))
-                self.check_contents(contents)
+                for content in contents:
+                    self.check_content(content)
 
             page_token = page.next_page_token
             if page_token is None:
                 break
 
-    def check_contents(self, contents: Iterable[Content]) -> None:
-        for content in contents:
-            content_hashes = objid_from_dict(content.hashes())
-            try:
-                content_bytes = self.objstorage.get(content_hashes)
-            except ObjNotFoundError:
-                if self.check_references:
-                    self.statsd.increment("missing_object_total")
-                    self.db.missing_object_add(
-                        id=content.swhid(), reference_ids={}, config=self.config
-                    )
-            else:
-                if self.check_hashes:
-                    recomputed_hashes = objid_from_dict(
-                        Content.from_data(content_bytes).hashes()
-                    )
-                    if content_hashes != recomputed_hashes:
-                        self.statsd.increment("hash_mismatch_total")
-                        self.db.corrupt_object_add(
-                            id=content.swhid(),
-                            config=self.config,
-                            serialized_object=value_to_kafka(content.to_dict()),
-                        )
+
+class ObjectStorageCheckerFromJournal(BaseChecker, ContentCheckerMixin):
+    """A journal based checker to detect missing and corrupted contents in an object storage.
+
+    It iterates on content objects referenced in a kafka topic, check they are available
+    in a given object storage instance then retrieve their bytes from it in order to
+    recompute checksums and detect corruptions."""
+
+    def __init__(
+        self,
+        db: ScrubberDb,
+        config_id: int,
+        journal_client_config: Dict[str, Any],
+        objstorage: ObjStorageInterface,
+    ):
+        super().__init__(db=db, config_id=config_id)
+        self.objstorage = objstorage
+
+        object_type = self.object_type.name.lower()
+
+        if object_type != "content":
+            raise ValueError(
+                "ObjectStorageCheckerFromJournal can only check objects of type content,"
+                f"checking objects of type {object_type} is not supported."
+            )
+
+        self.journal_client_config = journal_client_config.copy()
+        if "object_types" in self.journal_client_config:
+            raise ValueError(
+                "The journal_client configuration entry should not define the "
+                "object_types field; this is handled by the scrubber configuration entry"
+            )
+        self.journal_client_config["object_types"] = [object_type]
+        self.journal_client = get_journal_client(
+            **self.journal_client_config,
+            # Remove default deserializer; so process_kafka_values() gets the message
+            # verbatim so it can archive it with as few modifications a possible.
+            value_deserializer=lambda obj_type, msg: msg,
+        )
+
+    def run(self) -> None:
+        self.journal_client.process(self.process_kafka_messages)
+
+    def process_kafka_messages(self, all_messages: Dict[str, List[bytes]]):
+        for message in all_messages["content"]:
+            self.check_content(Content.from_dict(kafka_to_value(message)))
diff --git a/swh/scrubber/tests/objstorage_checker_tests.py b/swh/scrubber/tests/objstorage_checker_tests.py
index 73fc095..0a4fa0d 100644
--- a/swh/scrubber/tests/objstorage_checker_tests.py
+++ b/swh/scrubber/tests/objstorage_checker_tests.py
@@ -4,6 +4,7 @@
 # See top-level LICENSE file for more information
 
 from datetime import datetime, timedelta, timezone
+import json
 
 import attr
 import pytest
@@ -12,7 +13,8 @@ from swh.journal.serializers import kafka_to_value
 from swh.model.swhids import CoreSWHID, ObjectType
 from swh.model.tests import swh_model_data
 from swh.scrubber.objstorage_checker import (
-    ObjectStorageChecker,
+    ObjectStorageCheckerFromJournal,
+    ObjectStorageCheckerFromStoragePartition,
     get_objstorage_datastore,
 )
 
@@ -32,35 +34,49 @@ def datastore(swh_objstorage_config):
 
 
 @pytest.fixture
-def objstorage_checker(swh_storage, swh_objstorage, scrubber_db, datastore):
+def objstorage_checker_partition(swh_storage, swh_objstorage, scrubber_db, datastore):
     nb_partitions = len(EXPECTED_PARTITIONS)
     config_id = scrubber_db.config_add(
-        "cfg_objstorage_checker", datastore, ObjectType.CONTENT, nb_partitions
+        "cfg_objstorage_checker_partition", datastore, ObjectType.CONTENT, nb_partitions
+    )
+    return ObjectStorageCheckerFromStoragePartition(
+        scrubber_db, config_id, swh_storage, swh_objstorage
+    )
+
+
+@pytest.fixture
+def objstorage_checker_journal(
+    journal_client_config, swh_objstorage, scrubber_db, datastore
+):
+    config_id = scrubber_db.config_add(
+        "cfg_objstorage_checker_journal", datastore, ObjectType.CONTENT, nb_partitions=1
+    )
+    return ObjectStorageCheckerFromJournal(
+        scrubber_db, config_id, journal_client_config, swh_objstorage
     )
-    return ObjectStorageChecker(scrubber_db, config_id, swh_storage, swh_objstorage)
 
 
-def test_objstorage_checker_no_corruption(
-    swh_storage, swh_objstorage, objstorage_checker
+def test_objstorage_checker_partition_no_corruption(
+    swh_storage, swh_objstorage, objstorage_checker_partition
 ):
     swh_storage.content_add(swh_model_data.CONTENTS)
     swh_objstorage.add_batch({c.sha1: c.data for c in swh_model_data.CONTENTS})
 
-    objstorage_checker.run()
+    objstorage_checker_partition.run()
 
-    scrubber_db = objstorage_checker.db
+    scrubber_db = objstorage_checker_partition.db
     assert list(scrubber_db.corrupt_object_iter()) == []
 
     assert_checked_ranges(
         scrubber_db,
-        [(ObjectType.CONTENT, objstorage_checker.config_id)],
+        [(ObjectType.CONTENT, objstorage_checker_partition.config_id)],
         EXPECTED_PARTITIONS,
     )
 
 
 @pytest.mark.parametrize("missing_idx", range(0, len(swh_model_data.CONTENTS), 5))
-def test_objstorage_checker_missing_content(
-    swh_storage, swh_objstorage, objstorage_checker, missing_idx
+def test_objstorage_checker_partition_missing_content(
+    swh_storage, swh_objstorage, objstorage_checker_partition, missing_idx
 ):
     contents = list(swh_model_data.CONTENTS)
     swh_storage.content_add(contents)
@@ -69,15 +85,15 @@ def test_objstorage_checker_missing_content(
     )
 
     before_date = datetime.now(tz=timezone.utc)
-    objstorage_checker.run()
+    objstorage_checker_partition.run()
     after_date = datetime.now(tz=timezone.utc)
 
-    scrubber_db = objstorage_checker.db
+    scrubber_db = objstorage_checker_partition.db
 
     missing_objects = list(scrubber_db.missing_object_iter())
     assert len(missing_objects) == 1
     assert missing_objects[0].id == contents[missing_idx].swhid()
-    assert missing_objects[0].config.datastore == objstorage_checker.datastore
+    assert missing_objects[0].config.datastore == objstorage_checker_partition.datastore
     assert (
         before_date - timedelta(seconds=5)
         <= missing_objects[0].first_occurrence
@@ -86,7 +102,7 @@ def test_objstorage_checker_missing_content(
 
     assert_checked_ranges(
         scrubber_db,
-        [(ObjectType.CONTENT, objstorage_checker.config_id)],
+        [(ObjectType.CONTENT, objstorage_checker_partition.config_id)],
         EXPECTED_PARTITIONS,
         before_date,
         after_date,
@@ -94,8 +110,8 @@ def test_objstorage_checker_missing_content(
 
 
 @pytest.mark.parametrize("corrupt_idx", range(0, len(swh_model_data.CONTENTS), 5))
-def test_objstorage_checker_corrupt_content(
-    swh_storage, swh_objstorage, objstorage_checker, corrupt_idx
+def test_objstorage_checker_partition_corrupt_content(
+    swh_storage, swh_objstorage, objstorage_checker_partition, corrupt_idx
 ):
     contents = list(swh_model_data.CONTENTS)
     contents[corrupt_idx] = attr.evolve(contents[corrupt_idx], sha1_git=b"\x00" * 20)
@@ -103,17 +119,17 @@ def test_objstorage_checker_corrupt_content(
     swh_objstorage.add_batch({c.sha1: c.data for c in contents})
 
     before_date = datetime.now(tz=timezone.utc)
-    objstorage_checker.run()
+    objstorage_checker_partition.run()
     after_date = datetime.now(tz=timezone.utc)
 
-    scrubber_db = objstorage_checker.db
+    scrubber_db = objstorage_checker_partition.db
 
     corrupt_objects = list(scrubber_db.corrupt_object_iter())
     assert len(corrupt_objects) == 1
     assert corrupt_objects[0].id == CoreSWHID.from_string(
         "swh:1:cnt:0000000000000000000000000000000000000000"
     )
-    assert corrupt_objects[0].config.datastore == objstorage_checker.datastore
+    assert corrupt_objects[0].config.datastore == objstorage_checker_partition.datastore
     assert (
         before_date - timedelta(seconds=5)
         <= corrupt_objects[0].first_occurrence
@@ -126,8 +142,99 @@ def test_objstorage_checker_corrupt_content(
 
     assert_checked_ranges(
         scrubber_db,
-        [(ObjectType.CONTENT, objstorage_checker.config_id)],
+        [(ObjectType.CONTENT, objstorage_checker_partition.config_id)],
         EXPECTED_PARTITIONS,
         before_date,
         after_date,
     )
+
+
+def test_objstorage_checker_journal_contents_no_corruption(
+    scrubber_db,
+    journal_writer,
+    journal_client_config,
+    objstorage_checker_journal,
+):
+    journal_writer.write_additions("content", swh_model_data.CONTENTS)
+
+    gid = journal_client_config["group_id"] + "_"
+
+    object_type = "content"
+    journal_client_config["group_id"] = gid + object_type
+
+    objstorage_checker_journal.objstorage.add_batch(
+        {c.sha1: c.data for c in swh_model_data.CONTENTS}
+    )
+    objstorage_checker_journal.run()
+    objstorage_checker_journal.journal_client.close()
+
+    assert list(scrubber_db.corrupt_object_iter()) == []
+
+
+@pytest.mark.parametrize("corrupt_idx", range(0, len(swh_model_data.CONTENTS), 5))
+def test_objstorage_checker_journal_corrupt_content(
+    scrubber_db,
+    journal_writer,
+    objstorage_checker_journal,
+    swh_objstorage_config,
+    corrupt_idx,
+):
+    contents = list(swh_model_data.CONTENTS)
+    contents[corrupt_idx] = attr.evolve(contents[corrupt_idx], sha1_git=b"\x00" * 20)
+
+    journal_writer.write_additions("content", contents)
+
+    before_date = datetime.now(tz=timezone.utc)
+
+    objstorage_checker_journal.objstorage.add_batch({c.sha1: c.data for c in contents})
+    objstorage_checker_journal.run()
+    after_date = datetime.now(tz=timezone.utc)
+
+    corrupt_objects = list(scrubber_db.corrupt_object_iter())
+    assert len(corrupt_objects) == 1
+    assert corrupt_objects[0].id == CoreSWHID.from_string(
+        "swh:1:cnt:0000000000000000000000000000000000000000"
+    )
+    assert corrupt_objects[0].config.datastore.package == "objstorage"
+    assert corrupt_objects[0].config.datastore.cls == swh_objstorage_config.pop("cls")
+    assert corrupt_objects[0].config.datastore.instance == json.dumps(
+        swh_objstorage_config
+    )
+    assert (
+        before_date - timedelta(seconds=5)
+        <= corrupt_objects[0].first_occurrence
+        <= after_date + timedelta(seconds=5)
+    )
+    corrupted_content = contents[corrupt_idx].to_dict()
+    corrupted_content.pop("data")
+    assert kafka_to_value(corrupt_objects[0].object_) == corrupted_content
+
+
+@pytest.mark.parametrize("missing_idx", range(0, len(swh_model_data.CONTENTS), 5))
+def test_objstorage_checker_journal_missing_content(
+    scrubber_db,
+    journal_writer,
+    objstorage_checker_journal,
+    missing_idx,
+):
+    contents = list(swh_model_data.CONTENTS)
+
+    journal_writer.write_additions("content", contents)
+
+    before_date = datetime.now(tz=timezone.utc)
+
+    objstorage_checker_journal.objstorage.add_batch(
+        {c.sha1: c.data for i, c in enumerate(contents) if i != missing_idx}
+    )
+    objstorage_checker_journal.run()
+    after_date = datetime.now(tz=timezone.utc)
+
+    missing_objects = list(scrubber_db.missing_object_iter())
+    assert len(missing_objects) == 1
+    assert missing_objects[0].id == contents[missing_idx].swhid()
+    assert missing_objects[0].config.datastore == objstorage_checker_journal.datastore
+    assert (
+        before_date - timedelta(seconds=5)
+        <= missing_objects[0].first_occurrence
+        <= after_date + timedelta(seconds=5)
+    )
-- 
GitLab