Skip to content
Snippets Groups Projects
Commit 88e953ab authored by vlorentz's avatar vlorentz
Browse files

Add Fixer class, which re-loads corrupt objects from origins

This is heavily inspired by https://archive.softwareheritage.org/swh:1:cnt:c6dda7698c6aecf71f744e4e4c01bc3e115db880;origin=https://forge.softwareheritage.org/source/snippets.git;visit=swh:1:snp:c2e3170b9927f3d356e120546b00f5a9d25a224c;anchor=swh:1:rev:fa9e387ba5e3470b93a5c5a8773ed598ef03d211;path=/vlorentz/analyze_consistency_failures.py
but reorganized to the database instead of ad-hoc text and pickle files.

Currently, this only implements recovering from Git origins.
parent 2ab211eb
No related branches found
No related tags found
No related merge requests found
......@@ -5,6 +5,10 @@ warn_unused_ignores = True
# 3rd party libraries without stubs (yet)
[mypy-dulwich.*]
ignore_missing_imports = True
[mypy-pkg_resources.*]
ignore_missing_imports = True
......
# Add here internal Software Heritage dependencies, one per line.
swh.core[http] >= 0.3 # [http] is required by swh.core.pytest_plugin
swh.loader.git >= 1.4.0
swh.model >= 5.0.0
swh.storage >= 1.1.0
swh.journal >= 0.9.0
......@@ -156,6 +156,8 @@ def scrubber_locate_origins(ctx, start_object: str, end_object: str):
conf = ctx.obj["config"]
if "storage" not in conf:
ctx.fail("You must have a storage configured in your config file.")
if "graph" not in conf:
ctx.fail("You must have a graph configured in your config file.")
from swh.graph.client import RemoteGraphClient
from swh.model.model import CoreSWHID
......@@ -172,3 +174,24 @@ def scrubber_locate_origins(ctx, start_object: str, end_object: str):
)
locator.run()
@scrubber_cli_group.command(name="fix")
@click.option("--start-object", default="swh:1:cnt:" + "00" * 20)
@click.option("--end-object", default="swh:1:snp:" + "ff" * 20)
@click.pass_context
def scrubber_fix_objects(ctx, start_object: str, end_object: str):
"""For each known corrupt object reported in the scrubber DB, looks up origins
that may contain this object, and records them; so they can be used later
for recovery."""
from swh.model.model import CoreSWHID
from .fixer import Fixer
fixer = Fixer(
db=ctx.obj["db"],
start_object=CoreSWHID.from_string(start_object),
end_object=CoreSWHID.from_string(end_object),
)
fixer.run()
......@@ -7,7 +7,7 @@
import dataclasses
import datetime
import functools
from typing import Iterator, List
from typing import Iterator, List, Optional
import psycopg2
......@@ -36,6 +36,14 @@ class CorruptObject:
object_: bytes
@dataclasses.dataclass(frozen=True)
class FixedObject:
id: CoreSWHID
object_: bytes
method: str
recovery_date: Optional[datetime.datetime] = None
class ScrubberDb(BaseDb):
current_version = 1
......@@ -93,14 +101,37 @@ class ScrubberDb(BaseDb):
),
)
def corrupt_object_grab(
self,
cur,
start_id: CoreSWHID = None,
end_id: CoreSWHID = None,
limit: int = 100,
def _corrupt_object_list_from_cursor(
self, cur: psycopg2.extensions.cursor
) -> List[CorruptObject]:
"""Yields a page of records in the 'corrupt_object' table."""
results = []
for row in cur:
(id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row
results.append(
CorruptObject(
id=CoreSWHID.from_string(id),
first_occurrence=first_occurrence,
object_=object_,
datastore=Datastore(
package=ds_package, cls=ds_class, instance=ds_instance
),
)
)
return results
def corrupt_object_get(
self, start_id: CoreSWHID, end_id: CoreSWHID, limit: int = 100,
) -> List[CorruptObject]:
"""Yields a page of records in the 'corrupt_object' table, ordered by id.
Arguments:
start_id: Only return objects after this id
end_id: Only return objects before this id
in_origin: An origin URL. If provided, only returns objects that may be
found in the given origin
"""
cur = self.cursor()
cur.execute(
"""
SELECT
......@@ -116,24 +147,92 @@ class ScrubberDb(BaseDb):
""",
(str(start_id), str(end_id), limit),
)
return self._corrupt_object_list_from_cursor(cur)
results = []
for row in cur:
(id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row
results.append(
CorruptObject(
id=CoreSWHID.from_string(id),
first_occurrence=first_occurrence,
object_=object_,
datastore=Datastore(
package=ds_package, cls=ds_class, instance=ds_instance
),
)
)
def corrupt_object_grab_by_id(
self,
cur: psycopg2.extensions.cursor,
start_id: CoreSWHID,
end_id: CoreSWHID,
limit: int = 100,
) -> List[CorruptObject]:
"""Returns a page of records in the 'corrupt_object' table for a fixer,
ordered by id
return results
These records are not already fixed (ie. do not have a corresponding entry
in the 'fixed_object' table), and they are selected with an exclusive update
lock.
def object_origin_add(self, cur, swhid: CoreSWHID, origins: List[str]) -> None:
Arguments:
start_id: Only return objects after this id
end_id: Only return objects before this id
"""
cur.execute(
"""
SELECT
co.id, co.first_occurrence, co.object,
ds.package, ds.class, ds.instance
FROM corrupt_object AS co
INNER JOIN datastore AS ds ON (ds.id=co.datastore)
WHERE
co.id >= %(start_id)s
AND co.id <= %(end_id)s
AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id)
ORDER BY co.id
LIMIT %(limit)s
FOR UPDATE SKIP LOCKED
""",
dict(start_id=str(start_id), end_id=str(end_id), limit=limit,),
)
return self._corrupt_object_list_from_cursor(cur)
def corrupt_object_grab_by_origin(
self,
cur: psycopg2.extensions.cursor,
origin_url: str,
start_id: Optional[CoreSWHID] = None,
end_id: Optional[CoreSWHID] = None,
limit: int = 100,
) -> List[CorruptObject]:
"""Returns a page of records in the 'corrupt_object' table for a fixer,
ordered by id
These records are not already fixed (ie. do not have a corresponding entry
in the 'fixed_object' table), and they are selected with an exclusive update
lock.
Arguments:
origin_url: only returns objects that may be found in the given origin
"""
cur.execute(
"""
SELECT
co.id, co.first_occurrence, co.object,
ds.package, ds.class, ds.instance
FROM corrupt_object AS co
INNER JOIN datastore AS ds ON (ds.id=co.datastore)
INNER JOIN object_origin AS oo ON (oo.object_id=co.id)
WHERE
(co.id >= %(start_id)s OR %(start_id)s IS NULL)
AND (co.id <= %(end_id)s OR %(end_id)s IS NULL)
AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id)
AND oo.origin_url=%(origin_url)s
ORDER BY co.id
LIMIT %(limit)s
FOR UPDATE SKIP LOCKED
""",
dict(
start_id=None if start_id is None else str(start_id),
end_id=None if end_id is None else str(end_id),
origin_url=origin_url,
limit=limit,
),
)
return self._corrupt_object_list_from_cursor(cur)
def object_origin_add(
self, cur: psycopg2.extensions.cursor, swhid: CoreSWHID, origins: List[str]
) -> None:
psycopg2.extras.execute_values(
cur,
"""
......@@ -143,3 +242,55 @@ class ScrubberDb(BaseDb):
""",
[(str(swhid), origin_url) for origin_url in origins],
)
def object_origin_get(self, after: str = "", limit: int = 1000) -> List[str]:
"""Returns origins with non-fixed corrupt objects, ordered by URL.
Arguments:
after: if given, only returns origins with an URL after this value
"""
cur = self.cursor()
cur.execute(
"""
SELECT DISTINCT origin_url
FROM object_origin
WHERE
origin_url > %(after)s
AND object_id IN (
(SELECT id FROM corrupt_object)
EXCEPT (SELECT id FROM fixed_object)
)
ORDER BY origin_url
LIMIT %(limit)s
""",
dict(after=after, limit=limit),
)
return [origin_url for (origin_url,) in cur]
def fixed_object_add(
self, cur: psycopg2.extensions.cursor, fixed_objects: List[FixedObject]
) -> None:
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO fixed_object (id, object, method)
VALUES %s
ON CONFLICT DO NOTHING
""",
[
(str(fixed_object.id), fixed_object.object_, fixed_object.method)
for fixed_object in fixed_objects
],
)
def fixed_object_iter(self) -> Iterator[FixedObject]:
cur = self.cursor()
cur.execute("SELECT id, object, method, recovery_date FROM fixed_object")
for (id, object_, method, recovery_date) in cur:
yield FixedObject(
id=CoreSWHID.from_string(id),
object_=object_,
method=method,
recovery_date=recovery_date,
)
# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Reads all known corrupts objects from the swh-scrubber database,
and tries to recover them.
Currently, only recovery from Git origins is implemented"""
import dataclasses
import functools
import logging
import os
from pathlib import Path
import subprocess
import tempfile
from typing import Dict, Optional, Type, Union
import dulwich
import dulwich.objects
import dulwich.repo
import psycopg2
from swh.journal.serializers import kafka_to_value, value_to_kafka
from swh.loader.git import converters
from swh.model.hashutil import hash_to_bytehex, hash_to_hex
from swh.model.model import BaseModel, Directory, Release, Revision, Snapshot
from swh.model.swhids import CoreSWHID, ObjectType
from .db import CorruptObject, FixedObject, ScrubberDb
from .utils import iter_corrupt_objects
logger = logging.getLogger(__name__)
ScrubbableObject = Union[Revision, Release, Snapshot, Directory]
def get_object_from_clone(
clone_path: Path, swhid: CoreSWHID
) -> Union[None, bytes, dulwich.objects.ShaFile]:
"""Reads the original object matching the ``corrupt_object`` from the given clone
if it exists, and returns a Dulwich object if possible, or a the raw manifest."""
try:
repo = dulwich.repo.Repo(str(clone_path))
except dulwich.errors.NotGitRepository:
return None
with repo: # needed to avoid packfile fd leaks
try:
return repo[hash_to_bytehex(swhid.object_id)]
except KeyError:
return None
except dulwich.errors.ObjectFormatException:
# fallback to git if dulwich can't parse it.
# Unfortunately, Dulwich does not allow fetching an object without
# parsing it into a ShaFile subclass, so we have to manually get it
# by shelling out to git.
object_type = (
subprocess.check_output(
[
"git",
"-C",
clone_path,
"cat-file",
"-t",
hash_to_hex(swhid.object_id),
]
)
.decode()
.strip()
)
manifest = subprocess.check_output(
[
"git",
"-C",
clone_path,
"cat-file",
object_type,
hash_to_hex(swhid.object_id),
]
)
manifest = f"{object_type} {len(manifest)}\x00".encode() + manifest
logger.info("Dulwich failed to parse %r", manifest)
return manifest
def get_fixed_object_from_clone(
clone_path: Path, corrupt_object: CorruptObject
) -> Optional[FixedObject]:
"""Reads the original object matching the ``corrupt_object`` from the given clone
if it exists, and returns a :class:`FixedObject` instance ready to be inserted
in the database."""
cloned_dulwich_obj_or_manifest = get_object_from_clone(
clone_path, corrupt_object.id
)
if cloned_dulwich_obj_or_manifest is None:
# Origin still exists, but object disappeared
logger.info("%s not found in origin", corrupt_object.id)
return None
elif isinstance(cloned_dulwich_obj_or_manifest, bytes):
# Dulwich could not parse it. Add as raw manifest to the existing object
d = kafka_to_value(corrupt_object.object_)
assert d.get("raw_manifest") is None, "Corrupt object has a raw_manifest"
d["raw_manifest"] = cloned_dulwich_obj_or_manifest
# Rebuild the object from the stored corrupt object + the raw manifest
# just recovered; then checksum it.
classes: Dict[ObjectType, Type[BaseModel]] = {
ObjectType.REVISION: Revision,
ObjectType.DIRECTORY: Directory,
ObjectType.RELEASE: Release,
}
cls = classes[corrupt_object.id.object_type]
recovered_obj = cls.from_dict(d)
recovered_obj.check()
return FixedObject(
id=corrupt_object.id,
object_=value_to_kafka(d),
method="manifest_from_origin",
)
else:
converter = {
ObjectType.REVISION: converters.dulwich_commit_to_revision,
ObjectType.DIRECTORY: converters.dulwich_tree_to_directory,
ObjectType.RELEASE: converters.dulwich_tag_to_release,
}[corrupt_object.id.object_type]
cloned_obj = converter(cloned_dulwich_obj_or_manifest)
# Check checksum, among others
cloned_obj.check()
return FixedObject(
id=corrupt_object.id,
object_=value_to_kafka(cloned_obj.to_dict()),
method="from_origin",
)
@dataclasses.dataclass
class Fixer:
"""Reads a chunk of corrupt objects in the swh-scrubber database, tries to recover
them through various means (brute-forcing fields and re-downloading from the origin)
recomputes checksums, and writes them back to the swh-scrubber database
if successful.
"""
db: ScrubberDb
"""Database to read from and write to."""
start_object: CoreSWHID = CoreSWHID.from_string("swh:1:cnt:" + "00" * 20)
"""Minimum SWHID to check (in alphabetical order)"""
end_object: CoreSWHID = CoreSWHID.from_string("swh:1:snp:" + "ff" * 20)
"""Maximum SWHID to check (in alphabetical order)"""
def run(self):
# TODO: currently only support re-downloading from the origin:
# we should try brute-forcing for objects with no known origin (or when
# all origins fail)
after = ""
while True:
new_origins = self.db.object_origin_get(after=after)
if not new_origins:
break
for origin_url in new_origins:
self.recover_objects_from_origin(origin_url)
after = new_origins[-1]
def recover_objects_from_origin(self, origin_url):
"""Clones an origin, and cherry-picks original objects that are known to be
corrupt in the database."""
with tempfile.TemporaryDirectory(prefix=__name__ + ".") as tempdir:
clone_path = Path(tempdir) / "repository.git"
try:
subprocess.run(
["git", "clone", "--bare", origin_url, clone_path],
env={"PATH": os.environ["PATH"], "GIT_TERMINAL_PROMPT": "0"},
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
)
except Exception:
logger.exception("Failed to clone %s", origin_url)
return
iter_corrupt_objects(
self.db,
self.start_object,
self.end_object,
origin_url,
functools.partial(self.recover_corrupt_object, clone_path=clone_path),
)
def recover_corrupt_object(
self,
corrupt_object: CorruptObject,
cur: psycopg2.extensions.cursor,
clone_path: Path,
) -> None:
fixed_object = get_fixed_object_from_clone(clone_path, corrupt_object)
if fixed_object is not None:
self.db.fixed_object_add(cur, [fixed_object])
......@@ -71,7 +71,11 @@ class OriginLocator:
def run(self):
iter_corrupt_objects(
self.db, self.start_object, self.end_object, self.handle_corrupt_object
self.db,
self.start_object,
self.end_object,
None,
self.handle_corrupt_object,
)
def handle_corrupt_object(
......
......@@ -35,3 +35,16 @@ create table object_origin
);
comment on table object_origin is 'Maps objects to origins they might be found in.';
create table fixed_object
(
id swhid not null,
object bytea not null,
method text,
recovery_date timestamptz not null default now()
);
comment on table fixed_object is 'Each row identifies an object that was found to be corrupt, along with the original version of the object';
comment on column fixed_object.object is 'The recovered object itself, as a msgpack-encoded dict';
comment on column fixed_object.recovery_date is 'Moment the object was recovered.';
comment on column fixed_object.method is 'How the object was recovered. For example: "from_origin", "negative_utc", "capitalized_revision_parent".';
......@@ -22,3 +22,8 @@ create index concurrently object_origin_by_origin on object_origin (origin_url,
-- FIXME: not valid, because corrupt_object(id) is not unique
-- alter table object_origin add constraint object_origin_object_fkey foreign key (object_id) references corrupt_object(id) not valid;
-- alter table object_origin validate constraint object_origin_object_fkey;
-- fixed_object
create unique index concurrently fixed_object_pkey on fixed_object(id);
alter table fixed_object add primary key using index fixed_object_pkey;
......@@ -138,3 +138,22 @@ def test_locate_origins(mocker, scrubber_db, swh_storage):
end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20),
)
assert origin_locator.method_calls == [call.run()]
def test_fix_objects(mocker, scrubber_db):
fixer = MagicMock()
Fixer = mocker.patch("swh.scrubber.fixer.Fixer", return_value=fixer)
get_scrubber_db = mocker.patch(
"swh.scrubber.get_scrubber_db", return_value=scrubber_db
)
result = invoke(scrubber_db, ["fix"])
assert result.exit_code == 0, result.output
assert result.output == ""
get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn)
Fixer.assert_called_once_with(
db=scrubber_db,
start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20),
)
assert fixer.method_calls == [call.run()]
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import logging
from pathlib import Path
import subprocess
from unittest.mock import MagicMock
import zlib
import attr
from swh.journal.serializers import kafka_to_value, value_to_kafka
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Directory, DirectoryEntry
from swh.model.tests.swh_model_data import DIRECTORIES
from swh.scrubber.db import CorruptObject, Datastore, FixedObject, ScrubberDb
from swh.scrubber.fixer import Fixer
(DIRECTORY,) = [dir_ for dir_ in DIRECTORIES if len(dir_.entries) > 1]
# ORIGINAL_DIRECTORY represents a directory with entries in non-canonical order,
# and a consistent hash. Its entries' were canonically reordered, but the original
# order is still present in the raw manifest.
_DIR = Directory(entries=tuple(reversed(DIRECTORY.entries)))
ORIGINAL_DIRECTORY = Directory(
entries=(
DirectoryEntry(
name=b"dir1",
type="dir",
target=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"),
perms=0o040755,
),
DirectoryEntry(
name=b"file1.ext",
type="file",
target=hash_to_bytes("86bc6b377e9d25f9d26777a4a28d08e63e7c5779"),
perms=0o644,
),
DirectoryEntry(
name=b"subprepo1",
type="rev",
target=hash_to_bytes("c7f96242d73c267adc77c2908e64e0c1cb6a4431"),
perms=0o160000,
),
),
raw_manifest=(
b"tree 102\x00"
b"160000 subprepo1\x00\xc7\xf9bB\xd7<&z\xdcw\xc2\x90\x8ed\xe0\xc1\xcbjD1"
b"644 file1.ext\x00\x86\xbck7~\x9d%\xf9\xd2gw\xa4\xa2\x8d\x08\xe6>|Wy"
b"40755 dir1\x00K\x82]\xc6B\xcbn\xb9\xa0`\xe5K\xf8\xd6\x92\x88\xfb\xeeI\x04"
),
)
# A directory with its entries in canonical order, but a hash computed as if
# computed in the reverse order.
# This happens when entries get normalized (either by the loader or accidentally
# in swh-storage)
CORRUPT_DIRECTORY = attr.evolve(ORIGINAL_DIRECTORY, raw_manifest=None)
assert ORIGINAL_DIRECTORY != CORRUPT_DIRECTORY
assert (
hash_to_bytes("61992617462fff81509bda4a24b54c96ea74a007")
== ORIGINAL_DIRECTORY.id
== CORRUPT_DIRECTORY.id
)
assert (
hash_to_bytes("81fda5b242e65fc81201e590d0f0ce5f582fbcdd")
== CORRUPT_DIRECTORY.compute_hash()
!= CORRUPT_DIRECTORY.id
)
assert ORIGINAL_DIRECTORY.entries == CORRUPT_DIRECTORY.entries
DATASTORE = Datastore(package="storage", cls="postgresql", instance="service=swh")
CORRUPT_OBJECT = CorruptObject(
id=ORIGINAL_DIRECTORY.swhid(),
datastore=DATASTORE,
first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc),
object_=value_to_kafka(CORRUPT_DIRECTORY.to_dict()),
)
def test_no_object(scrubber_db: ScrubberDb, mocker) -> None:
"""There is no object to recover -> nothing happens"""
fixer = Fixer(db=scrubber_db)
fixer.run()
with scrubber_db.conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM fixed_object")
assert cur.fetchone() == (0,)
def test_no_origin(scrubber_db: ScrubberDb, mocker) -> None:
"""There is no origin to recover objects from -> nothing happens"""
scrubber_db.corrupt_object_add(
CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_
)
fixer = Fixer(db=scrubber_db)
fixer.run()
with scrubber_db.conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM fixed_object")
assert cur.fetchone() == (0,)
def test_already_fixed(scrubber_db: ScrubberDb, mocker) -> None:
"""All corrupt objects are already fixed -> nothing happens"""
fixed_object = FixedObject(
id=CORRUPT_OBJECT.id,
object_=value_to_kafka(ORIGINAL_DIRECTORY.to_dict()),
method="whatever means necessary",
)
scrubber_db.corrupt_object_add(
CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_
)
with scrubber_db.cursor() as cur:
scrubber_db.object_origin_add(cur, CORRUPT_OBJECT.id, ["http://example.org/"])
scrubber_db.fixed_object_add(cur, [fixed_object])
subprocess_run = mocker.patch("subprocess.run")
scrubber_db = MagicMock(wraps=scrubber_db)
fixer = Fixer(db=scrubber_db)
fixer.run()
# Check the Fixer did not try to fix the object again
scrubber_db.fixed_object_add.assert_not_called()
subprocess_run.assert_not_called()
with scrubber_db.conn.cursor() as cur:
cur.execute("SELECT id, method FROM fixed_object")
assert list(cur) == [(str(fixed_object.id), fixed_object.method)]
def _run_fixer_with_clone(
scrubber_db: ScrubberDb,
mocker,
caplog,
corrupt_object: CorruptObject,
subprocess_run_side_effect,
) -> None:
"""Helper for all tests that involve running the fixer with a clone:
adds a corrupt object and an origin to the DB, mocks subprocess.run with the
given function, and runs the fixer with caplog"""
scrubber_db.corrupt_object_add(
corrupt_object.id, corrupt_object.datastore, corrupt_object.object_
)
with scrubber_db.cursor() as cur:
scrubber_db.object_origin_add(cur, corrupt_object.id, ["http://example.org/"])
subprocess_run = mocker.patch(
"subprocess.run", side_effect=subprocess_run_side_effect
)
fixer = Fixer(db=scrubber_db)
with caplog.at_level(logging.CRITICAL):
with caplog.at_level(logging.INFO, logger="swh.scrubber.fixer"):
fixer.run()
subprocess_run.assert_called()
def test_failed_clone(scrubber_db: ScrubberDb, mocker, caplog) -> None:
"""Corrupt object found with an origin, but the origin's clone is broken somehow"""
scrubber_db = MagicMock(wraps=scrubber_db)
_run_fixer_with_clone(
scrubber_db,
mocker,
caplog,
corrupt_object=CORRUPT_OBJECT,
subprocess_run_side_effect=subprocess.CalledProcessError(1, "foo"),
)
scrubber_db.fixed_object_add.assert_not_called()
with scrubber_db.conn.cursor() as cur:
cur.execute("SELECT id, method FROM fixed_object")
assert list(cur) == []
assert (
"swh.scrubber.fixer",
logging.ERROR,
"Failed to clone http://example.org/",
) in caplog.record_tuples
def test_empty_origin(scrubber_db: ScrubberDb, mocker, caplog) -> None:
"""Corrupt object found with an origin, but the origin's clone is missing
the object"""
scrubber_db = MagicMock(wraps=scrubber_db)
real_subprocess_run = subprocess.run
def subprocess_run(args, **kwargs):
(*head, path) = args
assert head == ["git", "clone", "--bare", "http://example.org/"]
real_subprocess_run(["git", "init", "--bare", path])
_run_fixer_with_clone(
scrubber_db,
mocker,
caplog,
corrupt_object=CORRUPT_OBJECT,
subprocess_run_side_effect=subprocess_run,
)
scrubber_db.fixed_object_add.assert_not_called()
with scrubber_db.conn.cursor() as cur:
cur.execute("SELECT id, method FROM fixed_object")
assert list(cur) == []
assert (
"swh.scrubber.fixer",
logging.INFO,
"swh:1:dir:61992617462fff81509bda4a24b54c96ea74a007 not found in origin",
) in caplog.record_tuples
def test_parseable_directory_from_origin(
scrubber_db: ScrubberDb, mocker, caplog
) -> None:
"""Corrupt object found with an origin, and the object is found in the origin's
clone as expected."""
scrubber_db = MagicMock(wraps=scrubber_db)
real_subprocess_run = subprocess.run
def subprocess_run(args, **kwargs):
(*head, path) = args
assert head == ["git", "clone", "--bare", "http://example.org/"]
real_subprocess_run(["git", "init", "--bare", path])
object_dir_path = Path(path) / "objects/61"
object_path = object_dir_path / "992617462fff81509bda4a24b54c96ea74a007"
object_dir_path.mkdir()
with open(object_path, "wb") as fd:
fd.write(zlib.compress(ORIGINAL_DIRECTORY.raw_manifest))
_run_fixer_with_clone(
scrubber_db,
mocker,
caplog,
corrupt_object=CORRUPT_OBJECT,
subprocess_run_side_effect=subprocess_run,
)
scrubber_db.fixed_object_add.assert_called_once()
fixed_objects = list(scrubber_db.fixed_object_iter())
assert len(fixed_objects) == 1
assert fixed_objects[0].id == ORIGINAL_DIRECTORY.swhid()
assert fixed_objects[0].method == "from_origin"
assert (
Directory.from_dict(kafka_to_value(fixed_objects[0].object_))
== ORIGINAL_DIRECTORY
)
assert caplog.record_tuples == []
def test_unparseable_directory(scrubber_db: ScrubberDb, mocker, caplog) -> None:
"""Corrupt object found with an origin, and the object is found in the origin's
clone as expected; but Dulwich cannot parse it.
It was probably loaded by an old version of the loader that was more permissive,
by using libgit2."""
scrubber_db = MagicMock(wraps=scrubber_db)
real_subprocess_run = subprocess.run
raw_manifest = b"this is not a parseable manifest"
raw_manifest = f"tree {len(raw_manifest)}\x00".encode() + raw_manifest
original_directory = Directory(
entries=(
DirectoryEntry(
name=b"dir1",
type="dir",
target=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"),
perms=0o040755,
),
),
raw_manifest=raw_manifest,
)
assert original_directory.id.hex() == "a518fa6b46bad74e95588d2bfdf4455398a2216a"
corrupt_directory = attr.evolve(original_directory, raw_manifest=None)
corrupt_object = CorruptObject(
id=original_directory.swhid(),
datastore=DATASTORE,
object_=value_to_kafka(corrupt_directory.to_dict()),
first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc),
)
def subprocess_run(args, **kwargs):
(*head, path) = args
if head[0:2] != ["git", "clone"]:
return real_subprocess_run(args, **kwargs)
assert head == ["git", "clone", "--bare", "http://example.org/"]
real_subprocess_run(["git", "init", "--bare", path])
object_dir_path = Path(path) / "objects/a5"
object_path = object_dir_path / "18fa6b46bad74e95588d2bfdf4455398a2216a"
object_dir_path.mkdir()
with open(object_path, "wb") as fd:
fd.write(zlib.compress(raw_manifest))
_run_fixer_with_clone(
scrubber_db,
mocker,
caplog,
corrupt_object=corrupt_object,
subprocess_run_side_effect=subprocess_run,
)
scrubber_db.fixed_object_add.assert_called_once()
fixed_objects = list(scrubber_db.fixed_object_iter())
assert len(fixed_objects) == 1
assert fixed_objects[0].id == original_directory.swhid()
assert fixed_objects[0].method == "manifest_from_origin"
assert (
Directory.from_dict(kafka_to_value(fixed_objects[0].object_))
== original_directory
)
assert caplog.record_tuples == [
(
"swh.scrubber.fixer",
logging.INFO,
r"Dulwich failed to parse b'tree 32\x00this is not a parseable manifest'",
)
]
......@@ -3,7 +3,7 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Callable
from typing import Callable, Optional
import psycopg2
......@@ -16,11 +16,23 @@ def iter_corrupt_objects(
db: ScrubberDb,
start_object: CoreSWHID,
end_object: CoreSWHID,
origin_url: Optional[str],
cb: Callable[[CorruptObject, psycopg2.extensions.cursor], None],
) -> None:
"""Fetches objects and calls ``cb`` on each of them.
objects are fetched with an update lock, with the same transaction as ``cb``,
which is automatically committed after ``cb`` runs."""
while True:
with db.conn, db.cursor() as cur:
corrupt_objects = db.corrupt_object_grab(cur, start_object, end_object,)
if origin_url:
corrupt_objects = db.corrupt_object_grab_by_origin(
cur, origin_url, start_object, end_object
)
else:
corrupt_objects = db.corrupt_object_grab_by_id(
cur, start_object, end_object
)
if corrupt_objects and corrupt_objects[0].id == start_object:
# TODO: don't needlessly fetch duplicate objects
del corrupt_objects[0]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment