Skip to content
Snippets Groups Projects
Commit 5b9b6a5b authored by vlorentz's avatar vlorentz Committed by vlorentz
Browse files

journal_checker: Fix crash on duplicate directory entries

parent a84e8048
No related branches found
No related tags found
No related merge requests found
# Copyright (C) 2021-2022 The Software Heritage developers
# Copyright (C) 2021-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -9,6 +9,9 @@ import json
import logging
from typing import Any, Dict, List, Optional
import attr
from swh.core.statsd import Statsd
from swh.journal.client import get_journal_client
from swh.journal.serializers import kafka_to_value
from swh.model import model
......@@ -70,6 +73,7 @@ class JournalChecker:
# verbatim so it can archive it with as few modifications a possible.
value_deserializer=lambda obj_type, msg: msg,
)
self._statsd: Optional[Statsd] = None
@property
def config(self) -> ConfigEntry:
......@@ -85,6 +89,18 @@ class JournalChecker:
being checked."""
return self.config.datastore
@property
def statsd(self) -> Statsd:
if self._statsd is None:
self._statsd = Statsd(
namespace="swh_scrubber",
constant_tags={
"datastore_package": self.datastore.package,
"datastore_cls": self.datastore.cls,
},
)
return self._statsd
def run(self):
"""Runs a journal client with the given configuration.
This method does not return, unless otherwise configured (with ``stop_on_eof``).
......@@ -96,7 +112,26 @@ class JournalChecker:
logger.debug("Processing %s %s", len(messages), object_type)
cls = getattr(model, object_type.capitalize())
for message in messages:
object_ = cls.from_dict(kafka_to_value(message))
if object_type == "directory":
d = kafka_to_value(message)
(
has_duplicate_dir_entries,
object_,
) = cls.from_possibly_duplicated_entries(
entries=tuple(
map(model.DirectoryEntry.from_dict, d["entries"])
),
raw_manifest=d.get("raw_manifest"),
)
object_ = attr.evolve(object_, id=d["id"])
if has_duplicate_dir_entries:
self.statsd.increment(
"duplicate_directory_entries_total",
tags={"object_type": "directory"},
)
else:
object_ = cls.from_dict(kafka_to_value(message))
has_duplicate_dir_entries = False
real_id = object_.compute_hash()
if object_.id != real_id:
if object_.id != real_id or has_duplicate_dir_entries:
self.db.corrupt_object_add(object_.swhid(), self.config, message)
# Copyright (C) 2022 The Software Heritage developers
# Copyright (C) 2022-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import hashlib
import attr
import pytest
from swh.journal.serializers import kafka_to_value
from swh.journal.writer import get_journal_writer
from swh.model import swhids
from swh.model import model, swhids
from swh.model.swhids import ObjectType
from swh.model.tests import swh_model_data
from swh.scrubber.db import Datastore
......@@ -169,6 +170,73 @@ def test_corrupt_snapshots(
}
def test_duplicate_directory_entries(
scrubber_db,
kafka_server,
kafka_prefix,
kafka_consumer_group,
datastore,
):
config_id = scrubber_db.config_add(
name="cfg_directory",
datastore=datastore,
object_type=ObjectType.DIRECTORY,
nb_partitions=1,
check_references=False,
)
directory = model.Directory(
entries=(
model.DirectoryEntry(
name=b"filename", type="file", target=b"\x01" * 20, perms=0
),
)
)
# has duplicate entries and wrong hash
corrupt_directory = {
"id": b"\x00" * 20,
"entries": [
{"name": b"filename", "type": "file", "target": b"\x01" * 20, "perms": 0},
{"name": b"filename", "type": "file", "target": b"\x02" * 20, "perms": 0},
],
}
# has duplicate entries but correct hash
raw_manifest = (
b"tree 62\x00"
+ b"0 filename\x00"
+ b"\x01" * 20
+ b"0 filename\x00"
+ b"\x02" * 20
)
dupe_directory = {
"id": hashlib.sha1(raw_manifest).digest(),
"entries": corrupt_directory["entries"],
"raw_manifest": raw_manifest,
}
writer = journal_writer(kafka_server, kafka_prefix)
writer.send(f"{kafka_prefix}.directory", directory.id, directory.to_dict())
writer.send(f"{kafka_prefix}.directory", corrupt_directory["id"], corrupt_directory)
writer.send(f"{kafka_prefix}.directory", dupe_directory["id"], dupe_directory)
JournalChecker(
db=scrubber_db,
config_id=config_id,
journal=journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group),
).run()
corrupt_objects = list(scrubber_db.corrupt_object_iter())
assert len(corrupt_objects) == 2
assert {co.id for co in corrupt_objects} == {
swhids.CoreSWHID.from_string(swhid)
for swhid in [
"swh:1:dir:0000000000000000000000000000000000000000",
f"swh:1:dir:{dupe_directory['id'].hex()}",
]
}
def test_check_references_raises(
scrubber_db,
kafka_server,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment