Skip to content
Snippets Groups Projects
Commit 4144ac78 authored by vlorentz's avatar vlorentz
Browse files

Add doc + rename check_{journal,storage}.py to {journal,storage}_checker.py

as well as the run() method.

For consistency with a future module called fixer.py
parent f4214c2f
No related branches found
No related tags found
No related merge requests found
......@@ -115,7 +115,7 @@ def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object:
from swh.storage import get_storage
from .check_storage import StorageChecker
from .storage_checker import StorageChecker
checker = StorageChecker(
db=ctx.obj["db"],
......@@ -125,7 +125,7 @@ def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object:
end_object=end_object,
)
checker.check_storage()
checker.run()
@scrubber_check_cli_group.command(name="journal")
......@@ -137,8 +137,8 @@ def scrubber_check_journal(ctx) -> None:
if "journal_client" not in conf:
ctx.fail("You must have a journal_client configured in your config file.")
from .check_journal import JournalChecker
from .journal_checker import JournalChecker
checker = JournalChecker(db=ctx.obj["db"], journal_client=conf["journal_client"],)
checker.check_journal()
checker.run()
......@@ -16,9 +16,15 @@ from swh.model.swhids import CoreSWHID
@dataclasses.dataclass(frozen=True)
class Datastore:
"""Represents a datastore being scrubbed; eg. swh-storage or swh-journal."""
package: str
"""'storage', 'journal', or 'objstorage'."""
cls: str
"""'postgresql'/'cassandra' for storage, 'kafka' for journal,
'pathslicer'/'winery'/... for objstorage."""
instance: str
"""Human readable string."""
@dataclasses.dataclass(frozen=True)
......@@ -66,6 +72,7 @@ class ScrubberDb(BaseDb):
)
def corrupt_object_iter(self) -> Iterator[CorruptObject]:
"""Yields all records in the 'corrupt_object' table."""
cur = self.cursor()
cur.execute(
"""
......
......@@ -18,6 +18,9 @@ logger = logging.getLogger(__name__)
class JournalChecker:
"""Reads a chunk of a swh-storage database, recomputes checksums, and
reports errors in a separate database."""
_datastore = None
def __init__(self, db: ScrubberDb, journal_client: Dict[str, Any]):
......@@ -31,6 +34,8 @@ class JournalChecker:
)
def datastore_info(self) -> Datastore:
"""Returns a :class:`Datastore` instance representing the journal instance
being checked."""
if self._datastore is None:
config = self.journal_client_config
if config["cls"] == "kafka":
......@@ -48,7 +53,10 @@ class JournalChecker:
)
return self._datastore
def check_journal(self):
def run(self):
"""Runs a journal client with the given configuration.
This method does not return, unless otherwise configured (with ``stop_on_eof``).
"""
self.journal_client.process(self.process_kafka_messages)
def process_kafka_messages(self, all_messages: Dict[str, List[bytes]]):
......
......@@ -34,15 +34,23 @@ def storage_db(storage):
@dataclasses.dataclass
class StorageChecker:
"""Reads a chunk of a swh-storage database, recomputes checksums, and
reports errors in a separate database."""
db: ScrubberDb
storage: StorageInterface
object_type: str
"""``directory``/``revision``/``release``/``snapshot``"""
start_object: str
"""minimum value of the hexdigest of the object's sha1."""
end_object: str
"""maximum value of the hexdigest of the object's sha1."""
_datastore = None
def datastore_info(self) -> Datastore:
"""Returns a :class:`Datastore` instance representing the swh-storage instance
being checked."""
if self._datastore is None:
if isinstance(self.storage, PostgresqlStorage):
with storage_db(self.storage) as db:
......@@ -55,7 +63,10 @@ class StorageChecker:
)
return self._datastore
def check_storage(self):
def run(self):
"""Runs on all objects of ``object_type`` and with id between
``start_object`` and ``end_object``.
"""
if isinstance(self.storage, PostgresqlStorage):
with storage_db(self.storage) as db:
return self._check_postgresql(db)
......
......@@ -4,13 +4,13 @@
# See top-level LICENSE file for more information
import tempfile
from unittest.mock import MagicMock
from unittest.mock import MagicMock, call
from click.testing import CliRunner
import yaml
from swh.scrubber.check_storage import storage_db
from swh.scrubber.cli import scrubber_cli_group
from swh.scrubber.storage_checker import storage_db
def invoke(
......@@ -59,7 +59,7 @@ def invoke(
def test_check_storage(mocker, scrubber_db, swh_storage):
storage_checker = MagicMock()
StorageChecker = mocker.patch(
"swh.scrubber.check_storage.StorageChecker", return_value=storage_checker
"swh.scrubber.storage_checker.StorageChecker", return_value=storage_checker
)
get_scrubber_db = mocker.patch(
"swh.scrubber.get_scrubber_db", return_value=scrubber_db
......@@ -78,6 +78,7 @@ def test_check_storage(mocker, scrubber_db, swh_storage):
start_object="0" * 40,
end_object="f" * 40,
)
assert storage_checker.method_calls == [call.run()]
def test_check_journal(
......@@ -85,7 +86,7 @@ def test_check_journal(
):
journal_checker = MagicMock()
JournalChecker = mocker.patch(
"swh.scrubber.check_journal.JournalChecker", return_value=journal_checker
"swh.scrubber.journal_checker.JournalChecker", return_value=journal_checker
)
get_scrubber_db = mocker.patch(
"swh.scrubber.get_scrubber_db", return_value=scrubber_db
......@@ -111,3 +112,4 @@ def test_check_journal(
"stop_on_eof": True,
},
)
assert journal_checker.method_calls == [call.run()]
......@@ -12,7 +12,7 @@ from swh.journal.serializers import kafka_to_value
from swh.journal.writer import get_journal_writer
from swh.model import swhids
from swh.model.tests import swh_model_data
from swh.scrubber.check_journal import JournalChecker
from swh.scrubber.journal_checker import JournalChecker
def journal_client_config(kafka_server, kafka_prefix, kafka_consumer_group):
......@@ -47,7 +47,7 @@ def test_no_corruption(scrubber_db, kafka_server, kafka_prefix, kafka_consumer_g
journal_client=journal_client_config(
kafka_server, kafka_prefix, kafka_consumer_group
),
).check_journal()
).run()
assert list(scrubber_db.corrupt_object_iter()) == []
......@@ -68,7 +68,7 @@ def test_corrupt_snapshot(
journal_client=journal_client_config(
kafka_server, kafka_prefix, kafka_consumer_group
),
).check_journal()
).run()
after_date = datetime.datetime.now(tz=datetime.timezone.utc)
corrupt_objects = list(scrubber_db.corrupt_object_iter())
......@@ -107,7 +107,7 @@ def test_corrupt_snapshots(
journal_client=journal_client_config(
kafka_server, kafka_prefix, kafka_consumer_group
),
).check_journal()
).run()
corrupt_objects = list(scrubber_db.corrupt_object_iter())
assert len(corrupt_objects) == 2
......
......@@ -12,7 +12,7 @@ import pytest
from swh.journal.serializers import kafka_to_value
from swh.model import swhids
from swh.model.tests import swh_model_data
from swh.scrubber.check_storage import StorageChecker
from swh.scrubber.storage_checker import StorageChecker
from swh.storage.backfill import byte_ranges
# decorator to make swh.storage.backfill use less ranges, so tests run faster
......@@ -36,7 +36,7 @@ def test_no_corruption(scrubber_db, swh_storage):
object_type=object_type,
start_object="00" * 20,
end_object="ff" * 20,
).check_storage()
).run()
assert list(scrubber_db.corrupt_object_iter()) == []
......@@ -56,7 +56,7 @@ def test_corrupt_snapshot(scrubber_db, swh_storage, corrupt_idx):
object_type=object_type,
start_object="00" * 20,
end_object="ff" * 20,
).check_storage()
).run()
after_date = datetime.datetime.now(tz=datetime.timezone.utc)
corrupt_objects = list(scrubber_db.corrupt_object_iter())
......@@ -92,7 +92,7 @@ def test_corrupt_snapshots(scrubber_db, swh_storage):
object_type="snapshot",
start_object="00" * 20,
end_object="ff" * 20,
).check_storage()
).run()
corrupt_objects = list(scrubber_db.corrupt_object_iter())
assert len(corrupt_objects) == 2
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment