From 651f647a351dfa45997422d6e5d65ab6f8de0034 Mon Sep 17 00:00:00 2001
From: Antoine Lambert <anlambert@softwareheritage.org>
Date: Thu, 21 Mar 2024 16:50:50 +0100
Subject: [PATCH] Add an objstorage checker to detect missing and corrupted
 contents

Add class ObjectStorageChecker to detect missing and corrupted contents
in an object storage.

It iterates on content objects referenced in a storage instance, check they
are available in a given object storage instance then retrieve their bytes
from it in order to recompute checksums and detect corruptions.

Related to #4694.
---
 conftest.py                                   |   1 +
 swh/scrubber/db.py                            |  27 ++--
 swh/scrubber/objstorage_checker.py            | 102 ++++++++++++++
 .../tests/objstorage_checker_tests.py         | 133 ++++++++++++++++++
 swh/scrubber/tests/test_objstorage_memory.py  |  21 +++
 .../tests/test_objstorage_pathslicing.py      |  19 +++
 6 files changed, 289 insertions(+), 14 deletions(-)
 create mode 100644 swh/scrubber/objstorage_checker.py
 create mode 100644 swh/scrubber/tests/objstorage_checker_tests.py
 create mode 100644 swh/scrubber/tests/test_objstorage_memory.py
 create mode 100644 swh/scrubber/tests/test_objstorage_pathslicing.py

diff --git a/conftest.py b/conftest.py
index 8391a78..692a02b 100644
--- a/conftest.py
+++ b/conftest.py
@@ -1,4 +1,5 @@
 pytest_plugins = [
     "swh.storage.pytest_plugin",
     "swh.graph.pytest_plugin",
+    "swh.objstorage.pytest_plugin",
 ]
diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py
index b1ced12..0b62966 100644
--- a/swh/scrubber/db.py
+++ b/swh/scrubber/db.py
@@ -814,8 +814,6 @@ class ScrubberDb(BaseDb):
             datastore: representation of the swh-storage/swh-journal/... instance
               containing this hole
         """
-        if not reference_ids:
-            raise ValueError("reference_ids is empty")
         config_id = self.config_get_by_name(config.name)
         with self.transaction() as cur:
             cur.execute(
@@ -826,18 +824,19 @@ class ScrubberDb(BaseDb):
                 """,
                 (str(id), config_id),
             )
-            psycopg2.extras.execute_batch(
-                cur,
-                """
-                INSERT INTO missing_object_reference (missing_id, reference_id, config_id)
-                VALUES (%s, %s, %s)
-                ON CONFLICT DO NOTHING
-                """,
-                [
-                    (str(id), str(reference_id), config_id)
-                    for reference_id in reference_ids
-                ],
-            )
+            if reference_ids:
+                psycopg2.extras.execute_batch(
+                    cur,
+                    """
+                    INSERT INTO missing_object_reference (missing_id, reference_id, config_id)
+                    VALUES (%s, %s, %s)
+                    ON CONFLICT DO NOTHING
+                    """,
+                    [
+                        (str(id), str(reference_id), config_id)
+                        for reference_id in reference_ids
+                    ],
+                )
 
     def missing_object_iter(self) -> Iterator[MissingObject]:
         """Yields all records in the 'missing_object' table."""
diff --git a/swh/scrubber/objstorage_checker.py b/swh/scrubber/objstorage_checker.py
new file mode 100644
index 0000000..de08a88
--- /dev/null
+++ b/swh/scrubber/objstorage_checker.py
@@ -0,0 +1,102 @@
+# Copyright (C) 2024  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import json
+import logging
+from typing import Iterable, Optional
+
+from swh.journal.serializers import value_to_kafka
+from swh.model.model import Content
+from swh.model.swhids import ObjectType
+from swh.objstorage.exc import ObjNotFoundError
+from swh.objstorage.interface import ObjStorageInterface, objid_from_dict
+from swh.storage.interface import StorageInterface
+
+from .base_checker import BasePartitionChecker
+from .db import Datastore, ScrubberDb
+
+logger = logging.getLogger(__name__)
+
+
+def get_objstorage_datastore(objstorage_config):
+    objstorage_config = dict(objstorage_config)
+    return Datastore(
+        package="objstorage",
+        cls=objstorage_config.pop("cls"),
+        instance=json.dumps(objstorage_config),
+    )
+
+
+class ObjectStorageChecker(BasePartitionChecker):
+    """A checker to detect missing and corrupted contents in an object storage.
+
+    It iterates on content objects referenced in a storage instance, check they
+    are available in a given object storage instance then retrieve their bytes
+    from it in order to recompute checksums and detect corruptions."""
+
+    def __init__(
+        self,
+        db: ScrubberDb,
+        config_id: int,
+        storage: StorageInterface,
+        objstorage: Optional[ObjStorageInterface] = None,
+        limit: int = 0,
+    ):
+        super().__init__(db=db, config_id=config_id, limit=limit)
+        self.storage = storage
+        self.objstorage = (
+            objstorage if objstorage is not None else getattr(storage, "objstorage")
+        )
+        self.statsd_constant_tags["datastore_instance"] = self.datastore.instance
+
+    def check_partition(self, object_type: ObjectType, partition_id: int) -> None:
+        if object_type != ObjectType.CONTENT:
+            raise ValueError(
+                "ObjectStorageChecker can only check objects of type content,"
+                f"checking objects of type {object_type.name.lower()} is not supported."
+            )
+
+        page_token = None
+        while True:
+            page = self.storage.content_get_partition(
+                partition_id=partition_id,
+                nb_partitions=self.nb_partitions,
+                page_token=page_token,
+            )
+            contents = page.results
+
+            with self.statsd.timed(
+                "batch_duration_seconds", tags={"operation": "check_hashes"}
+            ):
+                logger.debug("Checking %s content object hashes", len(contents))
+                self.check_contents(contents)
+
+            page_token = page.next_page_token
+            if page_token is None:
+                break
+
+    def check_contents(self, contents: Iterable[Content]) -> None:
+        for content in contents:
+            content_hashes = objid_from_dict(content.hashes())
+            try:
+                content_bytes = self.objstorage.get(content_hashes)
+            except ObjNotFoundError:
+                if self.check_references:
+                    self.statsd.increment("missing_object_total")
+                    self.db.missing_object_add(
+                        id=content.swhid(), reference_ids={}, config=self.config
+                    )
+            else:
+                if self.check_hashes:
+                    recomputed_hashes = objid_from_dict(
+                        Content.from_data(content_bytes).hashes()
+                    )
+                    if content_hashes != recomputed_hashes:
+                        self.statsd.increment("hash_mismatch_total")
+                        self.db.corrupt_object_add(
+                            id=content.swhid(),
+                            config=self.config,
+                            serialized_object=value_to_kafka(content.to_dict()),
+                        )
diff --git a/swh/scrubber/tests/objstorage_checker_tests.py b/swh/scrubber/tests/objstorage_checker_tests.py
new file mode 100644
index 0000000..73fc095
--- /dev/null
+++ b/swh/scrubber/tests/objstorage_checker_tests.py
@@ -0,0 +1,133 @@
+# Copyright (C) 2024  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from datetime import datetime, timedelta, timezone
+
+import attr
+import pytest
+
+from swh.journal.serializers import kafka_to_value
+from swh.model.swhids import CoreSWHID, ObjectType
+from swh.model.tests import swh_model_data
+from swh.scrubber.objstorage_checker import (
+    ObjectStorageChecker,
+    get_objstorage_datastore,
+)
+
+from .storage_checker_tests import assert_checked_ranges
+
+EXPECTED_PARTITIONS = {
+    (ObjectType.CONTENT, 0, 4),
+    (ObjectType.CONTENT, 1, 4),
+    (ObjectType.CONTENT, 2, 4),
+    (ObjectType.CONTENT, 3, 4),
+}
+
+
+@pytest.fixture
+def datastore(swh_objstorage_config):
+    return get_objstorage_datastore(swh_objstorage_config)
+
+
+@pytest.fixture
+def objstorage_checker(swh_storage, swh_objstorage, scrubber_db, datastore):
+    nb_partitions = len(EXPECTED_PARTITIONS)
+    config_id = scrubber_db.config_add(
+        "cfg_objstorage_checker", datastore, ObjectType.CONTENT, nb_partitions
+    )
+    return ObjectStorageChecker(scrubber_db, config_id, swh_storage, swh_objstorage)
+
+
+def test_objstorage_checker_no_corruption(
+    swh_storage, swh_objstorage, objstorage_checker
+):
+    swh_storage.content_add(swh_model_data.CONTENTS)
+    swh_objstorage.add_batch({c.sha1: c.data for c in swh_model_data.CONTENTS})
+
+    objstorage_checker.run()
+
+    scrubber_db = objstorage_checker.db
+    assert list(scrubber_db.corrupt_object_iter()) == []
+
+    assert_checked_ranges(
+        scrubber_db,
+        [(ObjectType.CONTENT, objstorage_checker.config_id)],
+        EXPECTED_PARTITIONS,
+    )
+
+
+@pytest.mark.parametrize("missing_idx", range(0, len(swh_model_data.CONTENTS), 5))
+def test_objstorage_checker_missing_content(
+    swh_storage, swh_objstorage, objstorage_checker, missing_idx
+):
+    contents = list(swh_model_data.CONTENTS)
+    swh_storage.content_add(contents)
+    swh_objstorage.add_batch(
+        {c.sha1: c.data for i, c in enumerate(contents) if i != missing_idx}
+    )
+
+    before_date = datetime.now(tz=timezone.utc)
+    objstorage_checker.run()
+    after_date = datetime.now(tz=timezone.utc)
+
+    scrubber_db = objstorage_checker.db
+
+    missing_objects = list(scrubber_db.missing_object_iter())
+    assert len(missing_objects) == 1
+    assert missing_objects[0].id == contents[missing_idx].swhid()
+    assert missing_objects[0].config.datastore == objstorage_checker.datastore
+    assert (
+        before_date - timedelta(seconds=5)
+        <= missing_objects[0].first_occurrence
+        <= after_date + timedelta(seconds=5)
+    )
+
+    assert_checked_ranges(
+        scrubber_db,
+        [(ObjectType.CONTENT, objstorage_checker.config_id)],
+        EXPECTED_PARTITIONS,
+        before_date,
+        after_date,
+    )
+
+
+@pytest.mark.parametrize("corrupt_idx", range(0, len(swh_model_data.CONTENTS), 5))
+def test_objstorage_checker_corrupt_content(
+    swh_storage, swh_objstorage, objstorage_checker, corrupt_idx
+):
+    contents = list(swh_model_data.CONTENTS)
+    contents[corrupt_idx] = attr.evolve(contents[corrupt_idx], sha1_git=b"\x00" * 20)
+    swh_storage.content_add(contents)
+    swh_objstorage.add_batch({c.sha1: c.data for c in contents})
+
+    before_date = datetime.now(tz=timezone.utc)
+    objstorage_checker.run()
+    after_date = datetime.now(tz=timezone.utc)
+
+    scrubber_db = objstorage_checker.db
+
+    corrupt_objects = list(scrubber_db.corrupt_object_iter())
+    assert len(corrupt_objects) == 1
+    assert corrupt_objects[0].id == CoreSWHID.from_string(
+        "swh:1:cnt:0000000000000000000000000000000000000000"
+    )
+    assert corrupt_objects[0].config.datastore == objstorage_checker.datastore
+    assert (
+        before_date - timedelta(seconds=5)
+        <= corrupt_objects[0].first_occurrence
+        <= after_date + timedelta(seconds=5)
+    )
+
+    corrupted_content = contents[corrupt_idx].to_dict()
+    corrupted_content.pop("data")
+    assert kafka_to_value(corrupt_objects[0].object_) == corrupted_content
+
+    assert_checked_ranges(
+        scrubber_db,
+        [(ObjectType.CONTENT, objstorage_checker.config_id)],
+        EXPECTED_PARTITIONS,
+        before_date,
+        after_date,
+    )
diff --git a/swh/scrubber/tests/test_objstorage_memory.py b/swh/scrubber/tests/test_objstorage_memory.py
new file mode 100644
index 0000000..f929054
--- /dev/null
+++ b/swh/scrubber/tests/test_objstorage_memory.py
@@ -0,0 +1,21 @@
+# Copyright (C) 2024  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+
+from .objstorage_checker_tests import *  # noqa
+
+# Use cassandra storage and an objstorage with memory backend to run
+# the tests
+
+
+@pytest.fixture
+def swh_storage_backend_config(swh_storage_cassandra_backend_config):
+    return swh_storage_cassandra_backend_config
+
+
+@pytest.fixture
+def swh_objstorage_config():
+    return {"cls": "memory"}
diff --git a/swh/scrubber/tests/test_objstorage_pathslicing.py b/swh/scrubber/tests/test_objstorage_pathslicing.py
new file mode 100644
index 0000000..c618068
--- /dev/null
+++ b/swh/scrubber/tests/test_objstorage_pathslicing.py
@@ -0,0 +1,19 @@
+# Copyright (C) 2024  The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+from .objstorage_checker_tests import *  # noqa
+
+# Use postgreql storage and an objstorage with pathslicing backend to run
+# the tests
+
+
+def swh_objstorage_config(tmpdir):
+    return {
+        "cls": "pathslicing",
+        "root": str(tmpdir),
+        "slicing": "0:2/2:4/4:6",
+        "compression": "gzip",
+    }
-- 
GitLab