Skip to content
Snippets Groups Projects
Commit 0030d605 authored by Antoine Lambert's avatar Antoine Lambert
Browse files

journal_checker: Add support for checking content objects integrity

Allow JournalChecker class to process content objects in order to check
their presence in a given object storage but also to check their integrity
by fetching their bytes and recomputing checksums.

To do such processing, the journal checker must be configured to process
content object type and an object storage configuration must be provided
along the journal client one.

Related to #4694.
parent 13181d59
No related branches found
No related tags found
No related merge requests found
......@@ -33,6 +33,12 @@ class BaseChecker(ABC):
self._config: Optional[ConfigEntry] = None
self._statsd: Optional[Statsd] = None
self.statsd_constant_tags = {
"object_type": self.object_type.name.lower(),
"datastore_package": self.datastore.package,
"datastore_cls": self.datastore.cls,
"datastore_instance": self.datastore.instance,
}
@property
def config(self) -> ConfigEntry:
......@@ -59,6 +65,11 @@ class BaseChecker(ABC):
)
return self._statsd
@property
def object_type(self) -> swhids.ObjectType:
"""Returns the type of object being checked."""
return self.config.object_type
@property
def check_hashes(self) -> bool:
return self.config.check_hashes
......@@ -84,17 +95,7 @@ class BasePartitionChecker(BaseChecker):
):
super().__init__(db=db, config_id=config_id)
self.limit = limit
self.statsd_constant_tags = {
"object_type": self.object_type,
"nb_partitions": self.nb_partitions,
"datastore_package": self.datastore.package,
"datastore_cls": self.datastore.cls,
}
@property
def object_type(self) -> swhids.ObjectType:
"""Returns the type of object being checked."""
return self.config.object_type
self.statsd_constant_tags["nb_partitions"] = self.nb_partitions
@property
def nb_partitions(self) -> int:
......
......@@ -7,32 +7,41 @@
import json
import logging
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
import attr
from swh.journal.client import get_journal_client
from swh.journal.serializers import kafka_to_value
from swh.model import model
from swh.objstorage.factory import get_objstorage
from .base_checker import BaseChecker
from .db import Datastore, ScrubberDb
from .objstorage_checker import check_content
logger = logging.getLogger(__name__)
def get_datastore(journal_cfg) -> Datastore:
def get_datastore(
journal_cfg: Dict[str, Any], objstorage_cfg: Optional[Dict[str, Any]] = None
) -> Datastore:
if journal_cfg.get("cls") == "kafka":
package = "journal"
cls = "kafka"
instance = {
"brokers": journal_cfg["brokers"],
"group_id": journal_cfg["group_id"],
"prefix": journal_cfg["prefix"],
}
if objstorage_cfg is not None:
# objstorage integrity is checked by reading journal content topic in that case
instance = {"journal": instance, "objstorage": objstorage_cfg}
cls = objstorage_cfg.get("cls", "")
datastore = Datastore(
package="journal",
cls="kafka",
instance=json.dumps(
{
"brokers": journal_cfg["brokers"],
"group_id": journal_cfg["group_id"],
"prefix": journal_cfg["prefix"],
}
),
package=package,
cls=cls,
instance=json.dumps(instance),
)
else:
raise NotImplementedError(
......@@ -46,15 +55,17 @@ class JournalChecker(BaseChecker):
reports errors in a separate database."""
def __init__(
self, db: ScrubberDb, config_id: int, journal_client_config: Dict[str, Any]
self,
db: ScrubberDb,
config_id: int,
journal_client_config: Dict[str, Any],
objstorage_config: Optional[Dict[str, Any]] = None,
):
super().__init__(db=db, config_id=config_id)
self.statsd_constant_tags = {
"datastore_package": self.datastore.package,
"datastore_cls": self.datastore.cls,
}
if self.config.check_references:
object_type = self.object_type.name.lower()
if self.config.check_references and object_type != "content":
raise ValueError(
"The journal checker cannot check for references, please set "
"the 'check_references' to False in the config entry %s.",
......@@ -66,15 +77,23 @@ class JournalChecker(BaseChecker):
"The journal_client configuration entry should not define the "
"object_types field; this is handled by the scrubber configuration entry"
)
self.journal_client_config["object_types"] = [
self.config.object_type.name.lower()
]
self.journal_client_config["object_types"] = [object_type]
self.journal_client = get_journal_client(
**self.journal_client_config,
# Remove default deserializer; so process_kafka_values() gets the message
# verbatim so it can archive it with as few modifications a possible.
value_deserializer=lambda obj_type, msg: msg,
)
self.objstorage = (
get_objstorage(**objstorage_config)
if objstorage_config is not None
else None
)
if object_type == "content" and self.objstorage is None:
raise ValueError(
"The journal checker cannot check content objects if an objstorage "
"configuration is not provided."
)
def run(self) -> None:
"""Runs a journal client with the given configuration.
......@@ -106,9 +125,19 @@ class JournalChecker(BaseChecker):
"duplicate_directory_entries_total",
tags={"object_type": "directory"},
)
elif object_type == "content":
content = model.Content.from_dict(kafka_to_value(message))
assert self.objstorage is not None
check_content(
content, self.objstorage, self.db, self.config, self.statsd
)
else:
object_ = cls.from_dict(kafka_to_value(message))
has_duplicate_dir_entries = False
real_id = object_.compute_hash()
if object_.id != real_id or has_duplicate_dir_entries:
self.db.corrupt_object_add(object_.swhid(), self.config, message)
if object_type != "content":
real_id = object_.compute_hash()
if object_.id != real_id or has_duplicate_dir_entries:
self.db.corrupt_object_add(
object_.swhid(), self.config, message
)
......@@ -7,6 +7,7 @@ import json
import logging
from typing import Iterable, Optional
from swh.core.statsd import Statsd
from swh.journal.serializers import value_to_kafka
from swh.model.model import Content
from swh.model.swhids import ObjectType
......@@ -15,7 +16,7 @@ from swh.objstorage.interface import ObjStorageInterface, objid_from_dict
from swh.storage.interface import StorageInterface
from .base_checker import BasePartitionChecker
from .db import Datastore, ScrubberDb
from .db import ConfigEntry, Datastore, ScrubberDb
logger = logging.getLogger(__name__)
......@@ -29,6 +30,34 @@ def get_objstorage_datastore(objstorage_config):
)
def check_content(
content: Content,
objstorage: ObjStorageInterface,
db: ScrubberDb,
config: ConfigEntry,
statsd: Statsd,
) -> None:
content_hashes = objid_from_dict(content.hashes())
try:
content_bytes = objstorage.get(content_hashes)
except ObjNotFoundError:
if config.check_references:
statsd.increment("missing_object_total")
db.missing_object_add(id=content.swhid(), reference_ids={}, config=config)
else:
if config.check_hashes:
recomputed_hashes = objid_from_dict(
Content.from_data(content_bytes).hashes()
)
if content_hashes != recomputed_hashes:
statsd.increment("hash_mismatch_total")
db.corrupt_object_add(
id=content.swhid(),
config=config,
serialized_object=value_to_kafka(content.to_dict()),
)
class ObjectStorageChecker(BasePartitionChecker):
"""A checker to detect missing and corrupted contents in an object storage.
......@@ -79,24 +108,4 @@ class ObjectStorageChecker(BasePartitionChecker):
def check_contents(self, contents: Iterable[Content]) -> None:
for content in contents:
content_hashes = objid_from_dict(content.hashes())
try:
content_bytes = self.objstorage.get(content_hashes)
except ObjNotFoundError:
if self.check_references:
self.statsd.increment("missing_object_total")
self.db.missing_object_add(
id=content.swhid(), reference_ids={}, config=self.config
)
else:
if self.check_hashes:
recomputed_hashes = objid_from_dict(
Content.from_data(content_bytes).hashes()
)
if content_hashes != recomputed_hashes:
self.statsd.increment("hash_mismatch_total")
self.db.corrupt_object_add(
id=content.swhid(),
config=self.config,
serialized_object=value_to_kafka(content.to_dict()),
)
check_content(content, self.objstorage, self.db, self.config, self.statsd)
......@@ -40,9 +40,21 @@ def journal_writer(kafka_server: str, kafka_prefix: str):
)
@pytest.fixture
def swh_objstorage_config(tmpdir):
return {
"cls": "pathslicing",
"root": str(tmpdir),
"slicing": "0:2/2:4/4:6",
"compression": "gzip",
}
@pytest.fixture
def datastore(
kafka_server: str, kafka_prefix: str, kafka_consumer_group: str
kafka_server: str,
kafka_prefix: str,
kafka_consumer_group: str,
) -> Datastore:
journal_config = journal_client_config(
kafka_server, kafka_prefix, kafka_consumer_group
......@@ -51,8 +63,26 @@ def datastore(
return datastore
@pytest.fixture
def objstorage_datastore(
kafka_server: str,
kafka_prefix: str,
kafka_consumer_group: str,
swh_objstorage_config,
) -> Datastore:
journal_config = journal_client_config(
kafka_server, kafka_prefix, kafka_consumer_group
)
datastore = get_datastore(journal_config, swh_objstorage_config)
return datastore
def test_no_corruption(
scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, datastore
scrubber_db,
kafka_server,
kafka_prefix,
kafka_consumer_group,
datastore,
):
writer = journal_writer(kafka_server, kafka_prefix)
writer.write_additions("directory", swh_model_data.DIRECTORIES)
......@@ -85,6 +115,67 @@ def test_no_corruption(
assert list(scrubber_db.corrupt_object_iter()) == []
def test_contents_check_missing_objstorage_config(
scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group, datastore
):
journal_cfg = journal_client_config(
kafka_server, kafka_prefix, kafka_consumer_group
)
gid = journal_cfg["group_id"] + "_"
object_type = "content"
journal_cfg["group_id"] = gid + object_type
config_id = scrubber_db.config_add(
name=f"cfg_{object_type}",
datastore=datastore,
object_type=ObjectType.CONTENT,
nb_partitions=1,
)
with pytest.raises(
ValueError, match="journal checker cannot check content objects"
):
JournalChecker(
db=scrubber_db, config_id=config_id, journal_client_config=journal_cfg
)
def test_contents_no_corruption(
scrubber_db,
kafka_server,
kafka_prefix,
kafka_consumer_group,
objstorage_datastore,
swh_objstorage_config,
):
writer = journal_writer(kafka_server, kafka_prefix)
writer.write_additions("content", swh_model_data.CONTENTS)
journal_cfg = journal_client_config(
kafka_server, kafka_prefix, kafka_consumer_group
)
gid = journal_cfg["group_id"] + "_"
object_type = "content"
journal_cfg["group_id"] = gid + object_type
config_id = scrubber_db.config_add(
name=f"cfg_{object_type}",
datastore=objstorage_datastore,
object_type=ObjectType.CONTENT,
nb_partitions=1,
)
jc = JournalChecker(
db=scrubber_db,
config_id=config_id,
journal_client_config=journal_cfg,
objstorage_config=swh_objstorage_config,
)
jc.objstorage.add_batch({c.sha1: c.data for c in swh_model_data.CONTENTS})
jc.run()
jc.journal_client.close()
assert list(scrubber_db.corrupt_object_iter()) == []
@pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS)))
def test_corrupt_snapshot(
scrubber_db,
......@@ -174,6 +265,107 @@ def test_corrupt_snapshots(
}
@pytest.mark.parametrize("corrupt_idx", range(0, len(swh_model_data.CONTENTS), 5))
def test_corrupt_content(
scrubber_db,
kafka_server,
kafka_prefix,
kafka_consumer_group,
objstorage_datastore,
corrupt_idx,
swh_objstorage_config,
):
config_id = scrubber_db.config_add(
name="cfg_content",
datastore=objstorage_datastore,
object_type=ObjectType.CONTENT,
nb_partitions=1,
)
contents = list(swh_model_data.CONTENTS)
contents[corrupt_idx] = attr.evolve(contents[corrupt_idx], sha1_git=b"\x00" * 20)
writer = journal_writer(kafka_server, kafka_prefix)
writer.write_additions("content", contents)
before_date = datetime.datetime.now(tz=datetime.timezone.utc)
jc = JournalChecker(
db=scrubber_db,
config_id=config_id,
journal_client_config=journal_client_config(
kafka_server, kafka_prefix, kafka_consumer_group
),
objstorage_config=swh_objstorage_config,
)
jc.objstorage.add_batch({c.sha1: c.data for c in contents})
jc.run()
after_date = datetime.datetime.now(tz=datetime.timezone.utc)
corrupt_objects = list(scrubber_db.corrupt_object_iter())
assert len(corrupt_objects) == 1
assert corrupt_objects[0].id == swhids.CoreSWHID.from_string(
"swh:1:cnt:0000000000000000000000000000000000000000"
)
assert corrupt_objects[0].config.datastore.package == "journal"
assert corrupt_objects[0].config.datastore.cls == swh_objstorage_config["cls"]
assert '"journal":' in corrupt_objects[0].config.datastore.instance
assert '"objstorage":' in corrupt_objects[0].config.datastore.instance
assert (
before_date - datetime.timedelta(seconds=5)
<= corrupt_objects[0].first_occurrence
<= after_date + datetime.timedelta(seconds=5)
)
corrupted_content = contents[corrupt_idx].to_dict()
corrupted_content.pop("data")
assert kafka_to_value(corrupt_objects[0].object_) == corrupted_content
@pytest.mark.parametrize("missing_idx", range(0, len(swh_model_data.CONTENTS), 5))
def test_missing_content(
scrubber_db,
kafka_server,
kafka_prefix,
kafka_consumer_group,
objstorage_datastore,
missing_idx,
swh_objstorage_config,
):
config_id = scrubber_db.config_add(
name="cfg_content",
datastore=objstorage_datastore,
object_type=ObjectType.CONTENT,
nb_partitions=1,
)
contents = list(swh_model_data.CONTENTS)
writer = journal_writer(kafka_server, kafka_prefix)
writer.write_additions("content", contents)
before_date = datetime.datetime.now(tz=datetime.timezone.utc)
jc = JournalChecker(
db=scrubber_db,
config_id=config_id,
journal_client_config=journal_client_config(
kafka_server, kafka_prefix, kafka_consumer_group
),
objstorage_config=swh_objstorage_config,
)
jc.objstorage.add_batch(
{c.sha1: c.data for i, c in enumerate(contents) if i != missing_idx}
)
jc.run()
after_date = datetime.datetime.now(tz=datetime.timezone.utc)
missing_objects = list(scrubber_db.missing_object_iter())
assert len(missing_objects) == 1
assert missing_objects[0].id == contents[missing_idx].swhid()
assert missing_objects[0].config.datastore == objstorage_datastore
assert (
before_date - datetime.timedelta(seconds=5)
<= missing_objects[0].first_occurrence
<= after_date + datetime.timedelta(seconds=5)
)
def test_duplicate_directory_entries(
scrubber_db,
kafka_server,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment