From 5b9b6a5b9ad9adb139ae44d234d438550556ba26 Mon Sep 17 00:00:00 2001
From: Valentin Lorentz <vlorentz@softwareheritage.org>
Date: Wed, 15 Nov 2023 11:21:18 +0100
Subject: [PATCH] journal_checker: Fix crash on duplicate directory entries

---
 swh/scrubber/journal_checker.py          | 41 +++++++++++++-
 swh/scrubber/tests/test_journal_kafka.py | 72 +++++++++++++++++++++++-
 2 files changed, 108 insertions(+), 5 deletions(-)

diff --git a/swh/scrubber/journal_checker.py b/swh/scrubber/journal_checker.py
index 5ae8de4..1ce441c 100644
--- a/swh/scrubber/journal_checker.py
+++ b/swh/scrubber/journal_checker.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2021-2022  The Software Heritage developers
+# Copyright (C) 2021-2023  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
@@ -9,6 +9,9 @@ import json
 import logging
 from typing import Any, Dict, List, Optional
 
+import attr
+
+from swh.core.statsd import Statsd
 from swh.journal.client import get_journal_client
 from swh.journal.serializers import kafka_to_value
 from swh.model import model
@@ -70,6 +73,7 @@ class JournalChecker:
             # verbatim so it can archive it with as few modifications a possible.
             value_deserializer=lambda obj_type, msg: msg,
         )
+        self._statsd: Optional[Statsd] = None
 
     @property
     def config(self) -> ConfigEntry:
@@ -85,6 +89,18 @@ class JournalChecker:
         being checked."""
         return self.config.datastore
 
+    @property
+    def statsd(self) -> Statsd:
+        if self._statsd is None:
+            self._statsd = Statsd(
+                namespace="swh_scrubber",
+                constant_tags={
+                    "datastore_package": self.datastore.package,
+                    "datastore_cls": self.datastore.cls,
+                },
+            )
+        return self._statsd
+
     def run(self):
         """Runs a journal client with the given configuration.
         This method does not return, unless otherwise configured (with ``stop_on_eof``).
@@ -96,7 +112,26 @@ class JournalChecker:
             logger.debug("Processing %s %s", len(messages), object_type)
             cls = getattr(model, object_type.capitalize())
             for message in messages:
-                object_ = cls.from_dict(kafka_to_value(message))
+                if object_type == "directory":
+                    d = kafka_to_value(message)
+                    (
+                        has_duplicate_dir_entries,
+                        object_,
+                    ) = cls.from_possibly_duplicated_entries(
+                        entries=tuple(
+                            map(model.DirectoryEntry.from_dict, d["entries"])
+                        ),
+                        raw_manifest=d.get("raw_manifest"),
+                    )
+                    object_ = attr.evolve(object_, id=d["id"])
+                    if has_duplicate_dir_entries:
+                        self.statsd.increment(
+                            "duplicate_directory_entries_total",
+                            tags={"object_type": "directory"},
+                        )
+                else:
+                    object_ = cls.from_dict(kafka_to_value(message))
+                    has_duplicate_dir_entries = False
                 real_id = object_.compute_hash()
-                if object_.id != real_id:
+                if object_.id != real_id or has_duplicate_dir_entries:
                     self.db.corrupt_object_add(object_.swhid(), self.config, message)
diff --git a/swh/scrubber/tests/test_journal_kafka.py b/swh/scrubber/tests/test_journal_kafka.py
index d32363b..e31bba3 100644
--- a/swh/scrubber/tests/test_journal_kafka.py
+++ b/swh/scrubber/tests/test_journal_kafka.py
@@ -1,16 +1,17 @@
-# Copyright (C) 2022  The Software Heritage developers
+# Copyright (C) 2022-2023  The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
 
 import datetime
+import hashlib
 
 import attr
 import pytest
 
 from swh.journal.serializers import kafka_to_value
 from swh.journal.writer import get_journal_writer
-from swh.model import swhids
+from swh.model import model, swhids
 from swh.model.swhids import ObjectType
 from swh.model.tests import swh_model_data
 from swh.scrubber.db import Datastore
@@ -169,6 +170,73 @@ def test_corrupt_snapshots(
     }
 
 
+def test_duplicate_directory_entries(
+    scrubber_db,
+    kafka_server,
+    kafka_prefix,
+    kafka_consumer_group,
+    datastore,
+):
+    config_id = scrubber_db.config_add(
+        name="cfg_directory",
+        datastore=datastore,
+        object_type=ObjectType.DIRECTORY,
+        nb_partitions=1,
+        check_references=False,
+    )
+    directory = model.Directory(
+        entries=(
+            model.DirectoryEntry(
+                name=b"filename", type="file", target=b"\x01" * 20, perms=0
+            ),
+        )
+    )
+
+    # has duplicate entries and wrong hash
+    corrupt_directory = {
+        "id": b"\x00" * 20,
+        "entries": [
+            {"name": b"filename", "type": "file", "target": b"\x01" * 20, "perms": 0},
+            {"name": b"filename", "type": "file", "target": b"\x02" * 20, "perms": 0},
+        ],
+    }
+
+    # has duplicate entries but correct hash
+    raw_manifest = (
+        b"tree 62\x00"
+        + b"0 filename\x00"
+        + b"\x01" * 20
+        + b"0 filename\x00"
+        + b"\x02" * 20
+    )
+    dupe_directory = {
+        "id": hashlib.sha1(raw_manifest).digest(),
+        "entries": corrupt_directory["entries"],
+        "raw_manifest": raw_manifest,
+    }
+
+    writer = journal_writer(kafka_server, kafka_prefix)
+    writer.send(f"{kafka_prefix}.directory", directory.id, directory.to_dict())
+    writer.send(f"{kafka_prefix}.directory", corrupt_directory["id"], corrupt_directory)
+    writer.send(f"{kafka_prefix}.directory", dupe_directory["id"], dupe_directory)
+
+    JournalChecker(
+        db=scrubber_db,
+        config_id=config_id,
+        journal=journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group),
+    ).run()
+
+    corrupt_objects = list(scrubber_db.corrupt_object_iter())
+    assert len(corrupt_objects) == 2
+    assert {co.id for co in corrupt_objects} == {
+        swhids.CoreSWHID.from_string(swhid)
+        for swhid in [
+            "swh:1:dir:0000000000000000000000000000000000000000",
+            f"swh:1:dir:{dupe_directory['id'].hex()}",
+        ]
+    }
+
+
 def test_check_references_raises(
     scrubber_db,
     kafka_server,
-- 
GitLab