diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2cb96446f4eb357a45e8916b4e38671b2901983a..26657676b844d7e6227d557e3d095557feb059ab 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,6 +15,7 @@ repos: rev: v1.16.0 hooks: - id: codespell + args: [-L mor] - repo: local hooks: diff --git a/PKG-INFO b/PKG-INFO index 6a80e88d72262d568770807fe18052ad53412921..33a2bcfe222a719ab0d83fce53441959af045359 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: swh.scrubber -Version: 0.1.0 +Version: 0.1.1 Summary: Software Heritage Datastore Scrubber Home-page: https://forge.softwareheritage.org/diffusion/swh-scrubber Author: Software Heritage developers @@ -42,6 +42,10 @@ the corrupt object. There is one "checker" for each datastore package: storage (postgresql and cassandra), journal (kafka), and objstorage. +The journal is "crawled" using its native streaming; others are crawled by range, +reusing swh-storage's backfiller utilities, and checkpointed from time to time +to the scrubber's database (in the ``checked_range`` table). + Recovery -------- diff --git a/README.rst b/README.rst index bc3ab6204b6ea161c5fe6b4b74176148478eed71..01a5b56b44adbc6858e4bd6e82a0e35e956bbc76 100644 --- a/README.rst +++ b/README.rst @@ -20,6 +20,10 @@ the corrupt object. There is one "checker" for each datastore package: storage (postgresql and cassandra), journal (kafka), and objstorage. +The journal is "crawled" using its native streaming; others are crawled by range, +reusing swh-storage's backfiller utilities, and checkpointed from time to time +to the scrubber's database (in the ``checked_range`` table). + Recovery -------- diff --git a/docs/README.rst b/docs/README.rst index bc3ab6204b6ea161c5fe6b4b74176148478eed71..01a5b56b44adbc6858e4bd6e82a0e35e956bbc76 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -20,6 +20,10 @@ the corrupt object. There is one "checker" for each datastore package: storage (postgresql and cassandra), journal (kafka), and objstorage. +The journal is "crawled" using its native streaming; others are crawled by range, +reusing swh-storage's backfiller utilities, and checkpointed from time to time +to the scrubber's database (in the ``checked_range`` table). + Recovery -------- diff --git a/swh.scrubber.egg-info/PKG-INFO b/swh.scrubber.egg-info/PKG-INFO index 6a80e88d72262d568770807fe18052ad53412921..33a2bcfe222a719ab0d83fce53441959af045359 100644 --- a/swh.scrubber.egg-info/PKG-INFO +++ b/swh.scrubber.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: swh.scrubber -Version: 0.1.0 +Version: 0.1.1 Summary: Software Heritage Datastore Scrubber Home-page: https://forge.softwareheritage.org/diffusion/swh-scrubber Author: Software Heritage developers @@ -42,6 +42,10 @@ the corrupt object. There is one "checker" for each datastore package: storage (postgresql and cassandra), journal (kafka), and objstorage. +The journal is "crawled" using its native streaming; others are crawled by range, +reusing swh-storage's backfiller utilities, and checkpointed from time to time +to the scrubber's database (in the ``checked_range`` table). + Recovery -------- diff --git a/swh.scrubber.egg-info/SOURCES.txt b/swh.scrubber.egg-info/SOURCES.txt index 959e1d610e4af5cec471f6f7827f6889b368c24d..d88ef3d1a720fe9dce075bdf972755a0dc9594d3 100644 --- a/swh.scrubber.egg-info/SOURCES.txt +++ b/swh.scrubber.egg-info/SOURCES.txt @@ -46,9 +46,11 @@ swh/scrubber/sql/30-schema.sql swh/scrubber/sql/60-indexes.sql swh/scrubber/sql/upgrades/2.sql swh/scrubber/sql/upgrades/3.sql +swh/scrubber/sql/upgrades/4.sql swh/scrubber/tests/__init__.py swh/scrubber/tests/conftest.py swh/scrubber/tests/test_cli.py +swh/scrubber/tests/test_db.py swh/scrubber/tests/test_fixer.py swh/scrubber/tests/test_init.py swh/scrubber/tests/test_journal_kafka.py diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py index 6b2d7672e9ff9b0f13f41e1808b476d15b6bfdec..8178d03171da4ba5a1802e8cecb7b603e2a5eaee 100644 --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -7,7 +7,7 @@ import dataclasses import datetime import functools -from typing import Iterable, Iterator, List, Optional +from typing import Iterable, Iterator, List, Optional, Tuple import psycopg2 @@ -60,7 +60,7 @@ class FixedObject: class ScrubberDb(BaseDb): - current_version = 3 + current_version = 4 #################################### # Shared tables @@ -98,6 +98,81 @@ class ScrubberDb(BaseDb): (id_,) = res return id_ + #################################### + # Checkpointing/progress tracking + #################################### + + def checked_range_upsert( + self, + datastore: Datastore, + range_start: CoreSWHID, + range_end: CoreSWHID, + date: datetime.datetime, + ) -> None: + """ + Records in the database the given range was last checked at the given date. + """ + datastore_id = self.datastore_get_or_add(datastore) + with self.transaction() as cur: + cur.execute( + """ + INSERT INTO checked_range(datastore, range_start, range_end, last_date) + VALUES (%s, %s, %s, %s) + ON CONFLICT (datastore, range_start, range_end) DO UPDATE + SET last_date = GREATEST(checked_range.last_date, EXCLUDED.last_date) + """, + (datastore_id, str(range_start), str(range_end), date), + ) + + def checked_range_get_last_date( + self, datastore: Datastore, range_start: CoreSWHID, range_end: CoreSWHID + ) -> Optional[datetime.datetime]: + """ + Returns the last date the given range was checked in the given datastore, + or :const:`None` if it was never checked. + + Currently, this matches range boundaries exactly, with no regard for + ranges that contain or are contained by it. + """ + datastore_id = self.datastore_get_or_add(datastore) + with self.transaction() as cur: + cur.execute( + """ + SELECT last_date + FROM checked_range + WHERE datastore=%s AND range_start=%s AND range_end=%s + """, + (datastore_id, str(range_start), str(range_end)), + ) + + res = cur.fetchone() + if res is None: + return None + else: + (date,) = res + return date + + def checked_range_iter( + self, datastore: Datastore + ) -> Iterator[Tuple[CoreSWHID, CoreSWHID, datetime.datetime]]: + datastore_id = self.datastore_get_or_add(datastore) + with self.transaction() as cur: + cur.execute( + """ + SELECT range_start, range_end, last_date + FROM checked_range + WHERE datastore=%s + """, + (datastore_id,), + ) + + for (range_start, range_end, last_date) in cur: + yield ( + CoreSWHID.from_string(range_start), + CoreSWHID.from_string(range_end), + last_date, + ) + #################################### # Inventory of objects with issues #################################### diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql index a67eb67dbf2e538bc2c215cd522ab8c8a48611a4..b28ea3c7410ddc9bd126c0187b2edfab92eec827 100644 --- a/swh/scrubber/sql/30-schema.sql +++ b/swh/scrubber/sql/30-schema.sql @@ -19,6 +19,23 @@ comment on column datastore.class is 'For datastores with multiple backends, nam comment on column datastore.instance is 'Human-readable way to uniquely identify the datastore; eg. its URL or DSN.'; +------------------------------------- +-- Checkpointing/progress tracking +------------------------------------- + +create table checked_range +( + datastore int not null, + range_start swhid not null, + range_end swhid not null, + last_date timestamptz not null +); + +comment on table checked_range is 'Each row represents a range of objects in a datastore that were fetched, checksumed, and checked at some point in the past.'; +comment on column checked_range.range_start is 'First SWHID of the range that was checked (inclusive, possibly non-existent).'; +comment on column checked_range.range_end is 'Last SWHID of the range that was checked (inclusive, possiby non-existent).'; +comment on column checked_range.last_date is 'Date the last scrub of that range *started*.'; + ------------------------------------- -- Inventory of objects with issues ------------------------------------- diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql index 88b5e615896f5c6da58e90235a6952903b5b46c0..98694ccc5709b8f77ecd6fe36d85101fe7079f19 100644 --- a/swh/scrubber/sql/60-indexes.sql +++ b/swh/scrubber/sql/60-indexes.sql @@ -9,6 +9,12 @@ alter table datastore add primary key using index datastore_pkey; create unique index concurrently datastore_package_class_instance on datastore(package, class, instance); +------------------------------------- +-- Checkpointing/progress tracking +------------------------------------- + +create unique index concurrently checked_range_pkey on checked_range(datastore, range_start, range_end); +alter table checked_range add primary key using index checked_range_pkey; ------------------------------------- -- Inventory of objects with issues diff --git a/swh/scrubber/sql/upgrades/4.sql b/swh/scrubber/sql/upgrades/4.sql new file mode 100644 index 0000000000000000000000000000000000000000..9dc7e2f9b1cd0077fb958a20a5575de70e7e58f2 --- /dev/null +++ b/swh/scrubber/sql/upgrades/4.sql @@ -0,0 +1,21 @@ +-- SWH Scrubber DB schema upgrade +-- from_version: 3 +-- to_version: 4 +-- description: Add checked_range + + +create table checked_range +( + datastore int not null, + range_start swhid not null, + range_end swhid not null, + last_date timestamptz not null +); + +comment on table checked_range is 'Each row represents a range of objects in a datastore that were fetched, checksumed, and checked at some point in the past.'; +comment on column checked_range.range_start is 'First SWHID of the range that was checked (inclusive, possibly non-existent).'; +comment on column checked_range.range_end is 'Last SWHID of the range that was checked (inclusive, possiby non-existent).'; +comment on column checked_range.last_date is 'Date the last scrub of that range *started*.'; + +create unique index concurrently checked_range_pkey on checked_range(datastore, range_start, range_end); +alter table checked_range add primary key using index checked_range_pkey; diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py index 2305fa7d222f009ba40d5745e38e7d0f28da0a1e..c29a903a18bd29d007cd2929b6c6924501cd4df9 100644 --- a/swh/scrubber/storage_checker.py +++ b/swh/scrubber/storage_checker.py @@ -8,8 +8,9 @@ import collections import contextlib import dataclasses +import datetime import logging -from typing import Iterable, Union +from typing import Iterable, Optional, Tuple, Union from swh.core.statsd import Statsd from swh.journal.serializers import value_to_kafka @@ -43,6 +44,43 @@ def storage_db(storage): storage.put_db(db) +def _get_inclusive_range_swhids( + inclusive_range_start: Optional[bytes], + exclusive_range_end: Optional[bytes], + object_type: swhids.ObjectType, +) -> Tuple[swhids.CoreSWHID, swhids.CoreSWHID]: + r""" + Given a ``[range_start, range_end)`` right-open interval of id prefixes + and an object type (as returned by :const:`swh.storage.backfill.RANGE_GENERATORS`), + returns a ``[range_start_swhid, range_end_swhid]`` closed interval of SWHIDs + suitable for the scrubber database. + + >>> _get_inclusive_range_swhids(b"\x42", None, swhids.ObjectType.SNAPSHOT) + (CoreSWHID.from_string('swh:1:snp:4200000000000000000000000000000000000000'), CoreSWHID.from_string('swh:1:snp:ffffffffffffffffffffffffffffffffffffffff')) + + >>> _get_inclusive_range_swhids(b"\x00", b"\x12\x34", swhids.ObjectType.REVISION) + (CoreSWHID.from_string('swh:1:rev:0000000000000000000000000000000000000000'), CoreSWHID.from_string('swh:1:rev:1233ffffffffffffffffffffffffffffffffffff')) + + """ # noqa + range_start_swhid = swhids.CoreSWHID( + object_type=object_type, + object_id=(inclusive_range_start or b"").ljust(20, b"\00"), + ) + if exclusive_range_end is None: + inclusive_range_end = b"\xff" * 20 + else: + # convert "1230000000..." to "122fffffff..." + inclusive_range_end = ( + int.from_bytes(exclusive_range_end.ljust(20, b"\x00"), "big") - 1 + ).to_bytes(20, "big") + range_end_swhid = swhids.CoreSWHID( + object_type=object_type, + object_id=inclusive_range_end, + ) + + return (range_start_swhid, range_end_swhid) + + @dataclasses.dataclass class StorageChecker: """Reads a chunk of a swh-storage database, recomputes checksums, and @@ -98,9 +136,35 @@ class StorageChecker: ) def _check_postgresql(self, db): + object_type = getattr(swhids.ObjectType, self.object_type.upper()) for range_start, range_end in backfill.RANGE_GENERATORS[self.object_type]( self.start_object, self.end_object ): + (range_start_swhid, range_end_swhid) = _get_inclusive_range_swhids( + range_start, range_end, object_type + ) + + start_time = datetime.datetime.now(tz=datetime.timezone.utc) + + # Currently, this matches range boundaries exactly, with no regard for + # ranges that contain or are contained by it. + last_check_time = self.db.checked_range_get_last_date( + self.datastore_info(), + range_start_swhid, + range_end_swhid, + ) + + if last_check_time is not None: + # TODO: re-check if 'last_check_time' was a long ago. + logger.debug( + "Skipping processing of %s range %s to %s: already done at %s", + self.object_type, + backfill._format_range_bound(range_start), + backfill._format_range_bound(range_end), + last_check_time, + ) + continue + logger.debug( "Processing %s range %s to %s", self.object_type, @@ -122,6 +186,13 @@ class StorageChecker: ): self.check_object_references(objects) + self.db.checked_range_upsert( + self.datastore_info(), + range_start_swhid, + range_end_swhid, + start_time, + ) + def check_object_hashes(self, objects: Iterable[ScrubbableObject]): """Recomputes hashes, and reports mismatches.""" count = 0 diff --git a/swh/scrubber/tests/test_db.py b/swh/scrubber/tests/test_db.py new file mode 100644 index 0000000000000000000000000000000000000000..2406575089462ad4b5504552516c497809b9ed1c --- /dev/null +++ b/swh/scrubber/tests/test_db.py @@ -0,0 +1,58 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime + +from swh.model import swhids +from swh.scrubber.db import Datastore, ScrubberDb + +DATASTORE = Datastore(package="storage", cls="postgresql", instance="service=swh-test") +SNP_SWHID1 = swhids.CoreSWHID.from_string( + "swh:1:snp:5000000000000000000000000000000000000000" +) +SNP_SWHID2 = swhids.CoreSWHID.from_string( + "swh:1:snp:e000000000000000000000000000000000000000" +) +DATE = datetime.datetime(2022, 10, 4, 12, 1, 23, tzinfo=datetime.timezone.utc) + + +def test_checked_range_insert(scrubber_db: ScrubberDb): + scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, DATE) + + assert list(scrubber_db.checked_range_iter(DATASTORE)) == [ + (SNP_SWHID1, SNP_SWHID2, DATE) + ] + + +def test_checked_range_update(scrubber_db: ScrubberDb): + scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, DATE) + + date2 = DATE + datetime.timedelta(days=1) + scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, date2) + + assert list(scrubber_db.checked_range_iter(DATASTORE)) == [ + (SNP_SWHID1, SNP_SWHID2, date2) + ] + + date3 = DATE + datetime.timedelta(days=-1) + scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, date3) + + assert list(scrubber_db.checked_range_iter(DATASTORE)) == [ + (SNP_SWHID1, SNP_SWHID2, date2) # newest date wins + ] + + +def test_checked_range_get(scrubber_db: ScrubberDb): + assert ( + scrubber_db.checked_range_get_last_date(DATASTORE, SNP_SWHID1, SNP_SWHID2) + is None + ) + + scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, DATE) + + assert ( + scrubber_db.checked_range_get_last_date(DATASTORE, SNP_SWHID1, SNP_SWHID2) + == DATE + ) diff --git a/swh/scrubber/tests/test_storage_postgresql.py b/swh/scrubber/tests/test_storage_postgresql.py index efd38d746d41aa364e7100d2656e9222f6066a53..8e75e94d8af856c83c29546e6f822ad99d0b35cc 100644 --- a/swh/scrubber/tests/test_storage_postgresql.py +++ b/swh/scrubber/tests/test_storage_postgresql.py @@ -12,7 +12,8 @@ import pytest from swh.journal.serializers import kafka_to_value from swh.model import model, swhids from swh.model.tests import swh_model_data -from swh.scrubber.storage_checker import StorageChecker +from swh.scrubber.db import Datastore +from swh.scrubber.storage_checker import StorageChecker, storage_db from swh.storage.backfill import byte_ranges CONTENT1 = model.Content.from_data(b"foo") @@ -58,6 +59,16 @@ SNAPSHOT1 = model.Snapshot( ) +@pytest.fixture +def datastore(swh_storage): + with storage_db(swh_storage) as db: + return Datastore( + package="storage", + cls="postgresql", + instance=db.conn.dsn, + ) + + # decorator to make swh.storage.backfill use fewer ranges, so tests run faster patch_byte_ranges = unittest.mock.patch( "swh.storage.backfill.byte_ranges", @@ -65,13 +76,98 @@ patch_byte_ranges = unittest.mock.patch( ) +def _short_ranges(type_): + return [ + ( + f"swh:1:{type_}:0000000000000000000000000000000000000000", + f"swh:1:{type_}:1fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:2000000000000000000000000000000000000000", + f"swh:1:{type_}:3fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:4000000000000000000000000000000000000000", + f"swh:1:{type_}:5fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:6000000000000000000000000000000000000000", + f"swh:1:{type_}:7fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:8000000000000000000000000000000000000000", + f"swh:1:{type_}:9fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:a000000000000000000000000000000000000000", + f"swh:1:{type_}:bfffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:c000000000000000000000000000000000000000", + f"swh:1:{type_}:dfffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:e000000000000000000000000000000000000000", + f"swh:1:{type_}:ffffffffffffffffffffffffffffffffffffffff", + ), + ] + + +def _long_ranges(type_): + return [ + ( + f"swh:1:{type_}:0000000000000000000000000000000000000000", + f"swh:1:{type_}:3fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:4000000000000000000000000000000000000000", + f"swh:1:{type_}:7fffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:8000000000000000000000000000000000000000", + f"swh:1:{type_}:bfffffffffffffffffffffffffffffffffffffff", + ), + ( + f"swh:1:{type_}:c000000000000000000000000000000000000000", + f"swh:1:{type_}:ffffffffffffffffffffffffffffffffffffffff", + ), + ] + + +EXPECTED_RANGES = [ + *_short_ranges("dir"), + *_long_ranges("rel"), + *_short_ranges("rev"), + *_long_ranges("snp"), +] + + +def assert_checked_ranges( + scrubber_db, datastore, expected_ranges, before_date=None, after_date=None +): + if before_date is not None: + assert all( + before_date < date < after_date + for (_, _, date) in scrubber_db.checked_range_iter(datastore) + ) + + checked_ranges = [ + (str(start), str(end)) + for (start, end, date) in scrubber_db.checked_range_iter(datastore) + ] + checked_ranges.sort(key=str) + + assert checked_ranges == expected_ranges + + @patch_byte_ranges -def test_no_corruption(scrubber_db, swh_storage): +def test_no_corruption(scrubber_db, datastore, swh_storage): swh_storage.directory_add(swh_model_data.DIRECTORIES) swh_storage.revision_add(swh_model_data.REVISIONS) swh_storage.release_add(swh_model_data.RELEASES) swh_storage.snapshot_add(swh_model_data.SNAPSHOTS) + before_date = datetime.datetime.now(tz=datetime.timezone.utc) for object_type in ("snapshot", "release", "revision", "directory"): StorageChecker( db=scrubber_db, @@ -80,13 +176,18 @@ def test_no_corruption(scrubber_db, swh_storage): start_object="00" * 20, end_object="ff" * 20, ).run() + after_date = datetime.datetime.now(tz=datetime.timezone.utc) assert list(scrubber_db.corrupt_object_iter()) == [] + assert_checked_ranges( + scrubber_db, datastore, EXPECTED_RANGES, before_date, after_date + ) + @pytest.mark.parametrize("corrupt_idx", range(len(swh_model_data.SNAPSHOTS))) @patch_byte_ranges -def test_corrupt_snapshot(scrubber_db, swh_storage, corrupt_idx): +def test_corrupt_snapshot(scrubber_db, datastore, swh_storage, corrupt_idx): storage_dsn = swh_storage.get_db().conn.dsn snapshots = list(swh_model_data.SNAPSHOTS) snapshots[corrupt_idx] = attr.evolve(snapshots[corrupt_idx], id=b"\x00" * 20) @@ -120,9 +221,13 @@ def test_corrupt_snapshot(scrubber_db, swh_storage, corrupt_idx): kafka_to_value(corrupt_objects[0].object_) == snapshots[corrupt_idx].to_dict() ) + assert_checked_ranges( + scrubber_db, datastore, EXPECTED_RANGES, before_date, after_date + ) + @patch_byte_ranges -def test_corrupt_snapshots_same_batch(scrubber_db, swh_storage): +def test_corrupt_snapshots_same_batch(scrubber_db, datastore, swh_storage): snapshots = list(swh_model_data.SNAPSHOTS) for i in (0, 1): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20) @@ -146,9 +251,11 @@ def test_corrupt_snapshots_same_batch(scrubber_db, swh_storage): ] } + assert_checked_ranges(scrubber_db, datastore, _long_ranges("snp")) + @patch_byte_ranges -def test_corrupt_snapshots_different_batches(scrubber_db, swh_storage): +def test_corrupt_snapshots_different_batches(scrubber_db, datastore, swh_storage): snapshots = list(swh_model_data.SNAPSHOTS) for i in (0, 1): snapshots[i] = attr.evolve(snapshots[i], id=bytes([i * 255]) * 20) @@ -186,9 +293,52 @@ def test_corrupt_snapshots_different_batches(scrubber_db, swh_storage): ] } + assert_checked_ranges(scrubber_db, datastore, _long_ranges("snp")) + @patch_byte_ranges -def test_no_hole(scrubber_db, swh_storage): +def test_no_recheck(scrubber_db, datastore, swh_storage): + """ + Tests that objects that were already checked are not checked again on + the next run. + """ + # Corrupt two snapshots + snapshots = list(swh_model_data.SNAPSHOTS) + for i in (0, 1): + snapshots[i] = attr.evolve(snapshots[i], id=bytes([i]) * 20) + swh_storage.snapshot_add(snapshots) + + # Mark ranges as already checked + now = datetime.datetime.now(tz=datetime.timezone.utc) + for (range_start, range_end) in EXPECTED_RANGES: + scrubber_db.checked_range_upsert(datastore, range_start, range_end, now) + + StorageChecker( + db=scrubber_db, + storage=swh_storage, + object_type="snapshot", + start_object="00" * 20, + end_object="ff" * 20, + ).run() + + corrupt_objects = list(scrubber_db.corrupt_object_iter()) + assert ( + corrupt_objects == [] + ), "Detected corrupt objects in ranges that should have been skipped." + + # Make sure the DB was not changed (in particular, that timestamps were not bumped) + ranges = [ + (str(range_start), str(range_end), date) + for (range_start, range_end, date) in scrubber_db.checked_range_iter(datastore) + ] + ranges.sort(key=str) + assert ranges == [ + (range_start, range_end, now) for (range_start, range_end) in EXPECTED_RANGES + ] + + +@patch_byte_ranges +def test_no_hole(scrubber_db, datastore, swh_storage): swh_storage.content_add([CONTENT1]) swh_storage.directory_add([DIRECTORY1, DIRECTORY2]) swh_storage.revision_add([REVISION1]) @@ -206,13 +356,15 @@ def test_no_hole(scrubber_db, swh_storage): assert list(scrubber_db.missing_object_iter()) == [] + assert_checked_ranges(scrubber_db, datastore, EXPECTED_RANGES) + @pytest.mark.parametrize( "missing_object", ["content1", "directory1", "directory2", "revision1", "release1"], ) @patch_byte_ranges -def test_one_hole(scrubber_db, swh_storage, missing_object): +def test_one_hole(scrubber_db, datastore, swh_storage, missing_object): if missing_object == "content1": missing_swhid = CONTENT1.swhid() reference_swhids = [DIRECTORY1.swhid(), DIRECTORY2.swhid()] @@ -260,9 +412,11 @@ def test_one_hole(scrubber_db, swh_storage, missing_object): for mor in scrubber_db.missing_object_reference_iter(missing_swhid) } == {(missing_swhid, reference_swhid) for reference_swhid in reference_swhids} + assert_checked_ranges(scrubber_db, datastore, EXPECTED_RANGES) + @patch_byte_ranges -def test_two_holes(scrubber_db, swh_storage): +def test_two_holes(scrubber_db, datastore, swh_storage): # missing content and revision swh_storage.directory_add([DIRECTORY1, DIRECTORY2]) swh_storage.release_add([RELEASE1]) @@ -289,3 +443,5 @@ def test_two_holes(scrubber_db, swh_storage): mor.reference_id for mor in scrubber_db.missing_object_reference_iter(REVISION1.swhid()) } == {RELEASE1.swhid()} + + assert_checked_ranges(scrubber_db, datastore, EXPECTED_RANGES)