Skip to content
Snippets Groups Projects

objstorage_checker: Add support for consuming content ids from journal

Merged Antoine Lambert requested to merge anlambert/swh-scrubber:objstorage-checker-journal into master
2 files
+ 246
58
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
from datetime import datetime, timedelta, timezone
import json
import attr
import pytest
@@ -12,7 +13,8 @@ from swh.journal.serializers import kafka_to_value
from swh.model.swhids import CoreSWHID, ObjectType
from swh.model.tests import swh_model_data
from swh.scrubber.objstorage_checker import (
ObjectStorageChecker,
ObjectStorageCheckerFromJournal,
ObjectStorageCheckerFromStoragePartition,
get_objstorage_datastore,
)
@@ -32,35 +34,49 @@ def datastore(swh_objstorage_config):
@pytest.fixture
def objstorage_checker(swh_storage, swh_objstorage, scrubber_db, datastore):
def objstorage_checker_partition(swh_storage, swh_objstorage, scrubber_db, datastore):
nb_partitions = len(EXPECTED_PARTITIONS)
config_id = scrubber_db.config_add(
"cfg_objstorage_checker", datastore, ObjectType.CONTENT, nb_partitions
"cfg_objstorage_checker_partition", datastore, ObjectType.CONTENT, nb_partitions
)
return ObjectStorageCheckerFromStoragePartition(
scrubber_db, config_id, swh_storage, swh_objstorage
)
@pytest.fixture
def objstorage_checker_journal(
journal_client_config, swh_objstorage, scrubber_db, datastore
):
config_id = scrubber_db.config_add(
"cfg_objstorage_checker_journal", datastore, ObjectType.CONTENT, nb_partitions=1
)
return ObjectStorageCheckerFromJournal(
scrubber_db, config_id, journal_client_config, swh_objstorage
)
return ObjectStorageChecker(scrubber_db, config_id, swh_storage, swh_objstorage)
def test_objstorage_checker_no_corruption(
swh_storage, swh_objstorage, objstorage_checker
def test_objstorage_checker_partition_no_corruption(
swh_storage, swh_objstorage, objstorage_checker_partition
):
swh_storage.content_add(swh_model_data.CONTENTS)
swh_objstorage.add_batch({c.sha1: c.data for c in swh_model_data.CONTENTS})
objstorage_checker.run()
objstorage_checker_partition.run()
scrubber_db = objstorage_checker.db
scrubber_db = objstorage_checker_partition.db
assert list(scrubber_db.corrupt_object_iter()) == []
assert_checked_ranges(
scrubber_db,
[(ObjectType.CONTENT, objstorage_checker.config_id)],
[(ObjectType.CONTENT, objstorage_checker_partition.config_id)],
EXPECTED_PARTITIONS,
)
@pytest.mark.parametrize("missing_idx", range(0, len(swh_model_data.CONTENTS), 5))
def test_objstorage_checker_missing_content(
swh_storage, swh_objstorage, objstorage_checker, missing_idx
def test_objstorage_checker_partition_missing_content(
swh_storage, swh_objstorage, objstorage_checker_partition, missing_idx
):
contents = list(swh_model_data.CONTENTS)
swh_storage.content_add(contents)
@@ -69,15 +85,15 @@ def test_objstorage_checker_missing_content(
)
before_date = datetime.now(tz=timezone.utc)
objstorage_checker.run()
objstorage_checker_partition.run()
after_date = datetime.now(tz=timezone.utc)
scrubber_db = objstorage_checker.db
scrubber_db = objstorage_checker_partition.db
missing_objects = list(scrubber_db.missing_object_iter())
assert len(missing_objects) == 1
assert missing_objects[0].id == contents[missing_idx].swhid()
assert missing_objects[0].config.datastore == objstorage_checker.datastore
assert missing_objects[0].config.datastore == objstorage_checker_partition.datastore
assert (
before_date - timedelta(seconds=5)
<= missing_objects[0].first_occurrence
@@ -86,7 +102,7 @@ def test_objstorage_checker_missing_content(
assert_checked_ranges(
scrubber_db,
[(ObjectType.CONTENT, objstorage_checker.config_id)],
[(ObjectType.CONTENT, objstorage_checker_partition.config_id)],
EXPECTED_PARTITIONS,
before_date,
after_date,
@@ -94,8 +110,8 @@ def test_objstorage_checker_missing_content(
@pytest.mark.parametrize("corrupt_idx", range(0, len(swh_model_data.CONTENTS), 5))
def test_objstorage_checker_corrupt_content(
swh_storage, swh_objstorage, objstorage_checker, corrupt_idx
def test_objstorage_checker_partition_corrupt_content(
swh_storage, swh_objstorage, objstorage_checker_partition, corrupt_idx
):
contents = list(swh_model_data.CONTENTS)
contents[corrupt_idx] = attr.evolve(contents[corrupt_idx], sha1_git=b"\x00" * 20)
@@ -103,17 +119,17 @@ def test_objstorage_checker_corrupt_content(
swh_objstorage.add_batch({c.sha1: c.data for c in contents})
before_date = datetime.now(tz=timezone.utc)
objstorage_checker.run()
objstorage_checker_partition.run()
after_date = datetime.now(tz=timezone.utc)
scrubber_db = objstorage_checker.db
scrubber_db = objstorage_checker_partition.db
corrupt_objects = list(scrubber_db.corrupt_object_iter())
assert len(corrupt_objects) == 1
assert corrupt_objects[0].id == CoreSWHID.from_string(
"swh:1:cnt:0000000000000000000000000000000000000000"
)
assert corrupt_objects[0].config.datastore == objstorage_checker.datastore
assert corrupt_objects[0].config.datastore == objstorage_checker_partition.datastore
assert (
before_date - timedelta(seconds=5)
<= corrupt_objects[0].first_occurrence
@@ -126,8 +142,99 @@ def test_objstorage_checker_corrupt_content(
assert_checked_ranges(
scrubber_db,
[(ObjectType.CONTENT, objstorage_checker.config_id)],
[(ObjectType.CONTENT, objstorage_checker_partition.config_id)],
EXPECTED_PARTITIONS,
before_date,
after_date,
)
def test_objstorage_checker_journal_contents_no_corruption(
scrubber_db,
journal_writer,
journal_client_config,
objstorage_checker_journal,
):
journal_writer.write_additions("content", swh_model_data.CONTENTS)
gid = journal_client_config["group_id"] + "_"
object_type = "content"
journal_client_config["group_id"] = gid + object_type
objstorage_checker_journal.objstorage.add_batch(
{c.sha1: c.data for c in swh_model_data.CONTENTS}
)
objstorage_checker_journal.run()
objstorage_checker_journal.journal_client.close()
assert list(scrubber_db.corrupt_object_iter()) == []
@pytest.mark.parametrize("corrupt_idx", range(0, len(swh_model_data.CONTENTS), 5))
def test_objstorage_checker_journal_corrupt_content(
scrubber_db,
journal_writer,
objstorage_checker_journal,
swh_objstorage_config,
corrupt_idx,
):
contents = list(swh_model_data.CONTENTS)
contents[corrupt_idx] = attr.evolve(contents[corrupt_idx], sha1_git=b"\x00" * 20)
journal_writer.write_additions("content", contents)
before_date = datetime.now(tz=timezone.utc)
objstorage_checker_journal.objstorage.add_batch({c.sha1: c.data for c in contents})
objstorage_checker_journal.run()
after_date = datetime.now(tz=timezone.utc)
corrupt_objects = list(scrubber_db.corrupt_object_iter())
assert len(corrupt_objects) == 1
assert corrupt_objects[0].id == CoreSWHID.from_string(
"swh:1:cnt:0000000000000000000000000000000000000000"
)
assert corrupt_objects[0].config.datastore.package == "objstorage"
assert corrupt_objects[0].config.datastore.cls == swh_objstorage_config.pop("cls")
assert corrupt_objects[0].config.datastore.instance == json.dumps(
swh_objstorage_config
)
assert (
before_date - timedelta(seconds=5)
<= corrupt_objects[0].first_occurrence
<= after_date + timedelta(seconds=5)
)
corrupted_content = contents[corrupt_idx].to_dict()
corrupted_content.pop("data")
assert kafka_to_value(corrupt_objects[0].object_) == corrupted_content
@pytest.mark.parametrize("missing_idx", range(0, len(swh_model_data.CONTENTS), 5))
def test_objstorage_checker_journal_missing_content(
scrubber_db,
journal_writer,
objstorage_checker_journal,
missing_idx,
):
contents = list(swh_model_data.CONTENTS)
journal_writer.write_additions("content", contents)
before_date = datetime.now(tz=timezone.utc)
objstorage_checker_journal.objstorage.add_batch(
{c.sha1: c.data for i, c in enumerate(contents) if i != missing_idx}
)
objstorage_checker_journal.run()
after_date = datetime.now(tz=timezone.utc)
missing_objects = list(scrubber_db.missing_object_iter())
assert len(missing_objects) == 1
assert missing_objects[0].id == contents[missing_idx].swhid()
assert missing_objects[0].config.datastore == objstorage_checker_journal.datastore
assert (
before_date - timedelta(seconds=5)
<= missing_objects[0].first_occurrence
<= after_date + timedelta(seconds=5)
)
Loading