Skip to content
Snippets Groups Projects
Commit 8b16c4fc authored by Jenkins for Software Heritage's avatar Jenkins for Software Heritage
Browse files

Update upstream source from tag 'debian/upstream/0.0.2'

Update to upstream version '0.0.2'
with Debian dir a2bdca1ea9acd3b2181bba54e116252db2bb6751
parents 208cad6c 8f651d9d
No related branches found
No related tags found
No related merge requests found
Showing
with 606 additions and 40 deletions
# python: Reformat code with black 22.3.0
73eee4e307c59c4930a97cab9f787ae8e284b163
......@@ -43,6 +43,6 @@ repos:
- id: isort
- repo: https://github.com/python/black
rev: 19.10b0
rev: 22.3.0
hooks:
- id: black
Metadata-Version: 2.1
Name: swh.scrubber
Version: 0.0.1
Version: 0.0.2
Summary: Software Heritage Datastore Scrubber
Home-page: https://forge.softwareheritage.org/diffusion/swh-scrubber
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-scrubber
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scrubber/
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
......@@ -61,5 +59,3 @@ Reinjection
Finally, when an original object is recovered, it is reinjected in the original
data store, replacing the corrupt one.
......@@ -5,6 +5,10 @@ warn_unused_ignores = True
# 3rd party libraries without stubs (yet)
[mypy-dulwich.*]
ignore_missing_imports = True
[mypy-pkg_resources.*]
ignore_missing_imports = True
......
[pytest]
norecursedirs = build docs .*
asyncio_mode = strict
# Add here internal Software Heritage dependencies, one per line.
swh.core[http] >= 0.3 # [http] is required by swh.core.pytest_plugin
swh.loader.git >= 1.4.0
swh.model >= 5.0.0
swh.storage >= 1.1.0
swh.journal >= 0.9.0
pytest < 7.0.0 # v7.0.0 removed _pytest.tmpdir.TempdirFactory, which is used by some of the pytest plugins we use
pytest
pytest-mock
pyyaml
swh.graph
......
[flake8]
ignore = E203,E231,W503
select = C,E,F,W,B950
ignore = E203,E231,E501,W503
max-line-length = 88
[egg_info]
......
Metadata-Version: 2.1
Name: swh.scrubber
Version: 0.0.2
Summary: Software Heritage Datastore Scrubber
Home-page: https://forge.softwareheritage.org/diffusion/swh-scrubber
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-scrubber
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scrubber/
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 3 - Alpha
Requires-Python: >=3.7
Description-Content-Type: text/x-rst
Provides-Extra: testing
License-File: LICENSE
License-File: AUTHORS
Software Heritage - Datastore Scrubber
======================================
Tools to periodically checks data integrity in swh-storage and swh-objstorage,
reports errors, and (try to) fix them.
This is a work in progress; some of the components described below do not
exist yet (cassandra storage checker, objstorage checker, recovery, and reinjection)
The Scrubber package is made of the following parts:
Checking
--------
Highly parallel processes continuously read objects from a data store,
compute checksums, and write any failure in a database, along with the data of
the corrupt object.
There is one "checker" for each datastore package: storage (postgresql and cassandra),
journal (kafka), and objstorage.
Recovery
--------
Then, from time to time, jobs go through the list of known corrupt objects,
and try to recover the original objects, through various means:
* Brute-forcing variations until they match their checksum
* Recovering from another data store
* As a last resort, recovering from known origins, if any
Reinjection
-----------
Finally, when an original object is recovered, it is reinjected in the original
data store, replacing the corrupt one.
.git-blame-ignore-revs
.gitignore
.pre-commit-config.yaml
AUTHORS
CODE_OF_CONDUCT.md
CONTRIBUTORS
LICENSE
MANIFEST.in
Makefile
README.rst
conftest.py
mypy.ini
pyproject.toml
pytest.ini
requirements-swh.txt
requirements-test.txt
requirements.txt
setup.cfg
setup.py
tox.ini
docs/.gitignore
docs/Makefile
docs/README.rst
docs/conf.py
docs/index.rst
docs/_static/.placeholder
docs/_templates/.placeholder
swh/__init__.py
swh.scrubber.egg-info/PKG-INFO
swh.scrubber.egg-info/SOURCES.txt
swh.scrubber.egg-info/dependency_links.txt
swh.scrubber.egg-info/entry_points.txt
swh.scrubber.egg-info/requires.txt
swh.scrubber.egg-info/top_level.txt
swh/scrubber/__init__.py
swh/scrubber/cli.py
swh/scrubber/db.py
swh/scrubber/fixer.py
swh/scrubber/journal_checker.py
swh/scrubber/origin_locator.py
swh/scrubber/py.typed
swh/scrubber/storage_checker.py
swh/scrubber/utils.py
swh/scrubber/sql/20-enums.sql
swh/scrubber/sql/30-schema.sql
swh/scrubber/sql/60-indexes.sql
swh/scrubber/tests/__init__.py
swh/scrubber/tests/conftest.py
swh/scrubber/tests/test_cli.py
swh/scrubber/tests/test_fixer.py
swh/scrubber/tests/test_journal_kafka.py
swh/scrubber/tests/test_origin_locator.py
swh/scrubber/tests/test_storage_postgresql.py
\ No newline at end of file
[swh.cli.subcommands]
scrubber = swh.scrubber.cli
swh.core[http]>=0.3
swh.loader.git>=1.4.0
swh.model>=5.0.0
swh.storage>=1.1.0
swh.journal>=0.9.0
[testing]
pytest
pytest-mock
pyyaml
swh.graph
types-pyyaml
swh
......@@ -17,7 +17,10 @@ from swh.core.cli import swh as swh_cli_group
"--config-file",
"-C",
default=None,
type=click.Path(exists=True, dir_okay=False,),
type=click.Path(
exists=True,
dir_okay=False,
),
help="Configuration file.",
)
@click.pass_context
......@@ -83,8 +86,7 @@ def scrubber_cli_group(ctx, config_file: Optional[str]) -> None:
@scrubber_cli_group.group(name="check")
@click.pass_context
def scrubber_check_cli_group(ctx):
"""group of commands which read from data stores and report errors.
"""
"""group of commands which read from data stores and report errors."""
pass
......@@ -140,7 +142,10 @@ def scrubber_check_journal(ctx) -> None:
from .journal_checker import JournalChecker
checker = JournalChecker(db=ctx.obj["db"], journal_client=conf["journal_client"],)
checker = JournalChecker(
db=ctx.obj["db"],
journal_client=conf["journal_client"],
)
checker.run()
......@@ -156,6 +161,8 @@ def scrubber_locate_origins(ctx, start_object: str, end_object: str):
conf = ctx.obj["config"]
if "storage" not in conf:
ctx.fail("You must have a storage configured in your config file.")
if "graph" not in conf:
ctx.fail("You must have a graph configured in your config file.")
from swh.graph.client import RemoteGraphClient
from swh.model.model import CoreSWHID
......@@ -172,3 +179,24 @@ def scrubber_locate_origins(ctx, start_object: str, end_object: str):
)
locator.run()
@scrubber_cli_group.command(name="fix")
@click.option("--start-object", default="swh:1:cnt:" + "00" * 20)
@click.option("--end-object", default="swh:1:snp:" + "ff" * 20)
@click.pass_context
def scrubber_fix_objects(ctx, start_object: str, end_object: str):
"""For each known corrupt object reported in the scrubber DB, looks up origins
that may contain this object, and records them; so they can be used later
for recovery."""
from swh.model.model import CoreSWHID
from .fixer import Fixer
fixer = Fixer(
db=ctx.obj["db"],
start_object=CoreSWHID.from_string(start_object),
end_object=CoreSWHID.from_string(end_object),
)
fixer.run()
......@@ -7,7 +7,7 @@
import dataclasses
import datetime
import functools
from typing import Iterator, List
from typing import Iterator, List, Optional
import psycopg2
......@@ -36,6 +36,14 @@ class CorruptObject:
object_: bytes
@dataclasses.dataclass(frozen=True)
class FixedObject:
id: CoreSWHID
object_: bytes
method: str
recovery_date: Optional[datetime.datetime] = None
class ScrubberDb(BaseDb):
current_version = 1
......@@ -45,18 +53,35 @@ class ScrubberDb(BaseDb):
cur = self.cursor()
cur.execute(
"""
INSERT INTO datastore (package, class, instance)
VALUES (%s, %s, %s)
ON CONFLICT DO NOTHING
RETURNING id
WITH inserted AS (
INSERT INTO datastore (package, class, instance)
VALUES (%(package)s, %(cls)s, %(instance)s)
ON CONFLICT DO NOTHING
RETURNING id
)
SELECT id
FROM inserted
UNION (
-- If the datastore already exists, we need to fetch its id
SELECT id
FROM datastore
WHERE
package=%(package)s
AND class=%(cls)s
AND instance=%(instance)s
)
LIMIT 1
""",
(datastore.package, datastore.cls, datastore.instance),
(dataclasses.asdict(datastore)),
)
(id_,) = cur.fetchone()
return id_
def corrupt_object_add(
self, id: CoreSWHID, datastore: Datastore, serialized_object: bytes,
self,
id: CoreSWHID,
datastore: Datastore,
serialized_object: bytes,
) -> None:
datastore_id = self.datastore_get_or_add(datastore)
cur = self.cursor()
......@@ -93,14 +118,40 @@ class ScrubberDb(BaseDb):
),
)
def corrupt_object_grab(
def _corrupt_object_list_from_cursor(
self, cur: psycopg2.extensions.cursor
) -> List[CorruptObject]:
results = []
for row in cur:
(id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row
results.append(
CorruptObject(
id=CoreSWHID.from_string(id),
first_occurrence=first_occurrence,
object_=object_,
datastore=Datastore(
package=ds_package, cls=ds_class, instance=ds_instance
),
)
)
return results
def corrupt_object_get(
self,
cur,
start_id: CoreSWHID = None,
end_id: CoreSWHID = None,
start_id: CoreSWHID,
end_id: CoreSWHID,
limit: int = 100,
) -> List[CorruptObject]:
"""Yields a page of records in the 'corrupt_object' table."""
"""Yields a page of records in the 'corrupt_object' table, ordered by id.
Arguments:
start_id: Only return objects after this id
end_id: Only return objects before this id
in_origin: An origin URL. If provided, only returns objects that may be
found in the given origin
"""
cur = self.cursor()
cur.execute(
"""
SELECT
......@@ -116,24 +167,96 @@ class ScrubberDb(BaseDb):
""",
(str(start_id), str(end_id), limit),
)
return self._corrupt_object_list_from_cursor(cur)
results = []
for row in cur:
(id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row
results.append(
CorruptObject(
id=CoreSWHID.from_string(id),
first_occurrence=first_occurrence,
object_=object_,
datastore=Datastore(
package=ds_package, cls=ds_class, instance=ds_instance
),
)
)
def corrupt_object_grab_by_id(
self,
cur: psycopg2.extensions.cursor,
start_id: CoreSWHID,
end_id: CoreSWHID,
limit: int = 100,
) -> List[CorruptObject]:
"""Returns a page of records in the 'corrupt_object' table for a fixer,
ordered by id
return results
These records are not already fixed (ie. do not have a corresponding entry
in the 'fixed_object' table), and they are selected with an exclusive update
lock.
Arguments:
start_id: Only return objects after this id
end_id: Only return objects before this id
"""
cur.execute(
"""
SELECT
co.id, co.first_occurrence, co.object,
ds.package, ds.class, ds.instance
FROM corrupt_object AS co
INNER JOIN datastore AS ds ON (ds.id=co.datastore)
WHERE
co.id >= %(start_id)s
AND co.id <= %(end_id)s
AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id)
ORDER BY co.id
LIMIT %(limit)s
FOR UPDATE SKIP LOCKED
""",
dict(
start_id=str(start_id),
end_id=str(end_id),
limit=limit,
),
)
return self._corrupt_object_list_from_cursor(cur)
def corrupt_object_grab_by_origin(
self,
cur: psycopg2.extensions.cursor,
origin_url: str,
start_id: Optional[CoreSWHID] = None,
end_id: Optional[CoreSWHID] = None,
limit: int = 100,
) -> List[CorruptObject]:
"""Returns a page of records in the 'corrupt_object' table for a fixer,
ordered by id
These records are not already fixed (ie. do not have a corresponding entry
in the 'fixed_object' table), and they are selected with an exclusive update
lock.
Arguments:
origin_url: only returns objects that may be found in the given origin
"""
cur.execute(
"""
SELECT
co.id, co.first_occurrence, co.object,
ds.package, ds.class, ds.instance
FROM corrupt_object AS co
INNER JOIN datastore AS ds ON (ds.id=co.datastore)
INNER JOIN object_origin AS oo ON (oo.object_id=co.id)
WHERE
(co.id >= %(start_id)s OR %(start_id)s IS NULL)
AND (co.id <= %(end_id)s OR %(end_id)s IS NULL)
AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id)
AND oo.origin_url=%(origin_url)s
ORDER BY co.id
LIMIT %(limit)s
FOR UPDATE SKIP LOCKED
""",
dict(
start_id=None if start_id is None else str(start_id),
end_id=None if end_id is None else str(end_id),
origin_url=origin_url,
limit=limit,
),
)
return self._corrupt_object_list_from_cursor(cur)
def object_origin_add(self, cur, swhid: CoreSWHID, origins: List[str]) -> None:
def object_origin_add(
self, cur: psycopg2.extensions.cursor, swhid: CoreSWHID, origins: List[str]
) -> None:
psycopg2.extras.execute_values(
cur,
"""
......@@ -143,3 +266,55 @@ class ScrubberDb(BaseDb):
""",
[(str(swhid), origin_url) for origin_url in origins],
)
def object_origin_get(self, after: str = "", limit: int = 1000) -> List[str]:
"""Returns origins with non-fixed corrupt objects, ordered by URL.
Arguments:
after: if given, only returns origins with an URL after this value
"""
cur = self.cursor()
cur.execute(
"""
SELECT DISTINCT origin_url
FROM object_origin
WHERE
origin_url > %(after)s
AND object_id IN (
(SELECT id FROM corrupt_object)
EXCEPT (SELECT id FROM fixed_object)
)
ORDER BY origin_url
LIMIT %(limit)s
""",
dict(after=after, limit=limit),
)
return [origin_url for (origin_url,) in cur]
def fixed_object_add(
self, cur: psycopg2.extensions.cursor, fixed_objects: List[FixedObject]
) -> None:
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO fixed_object (id, object, method)
VALUES %s
ON CONFLICT DO NOTHING
""",
[
(str(fixed_object.id), fixed_object.object_, fixed_object.method)
for fixed_object in fixed_objects
],
)
def fixed_object_iter(self) -> Iterator[FixedObject]:
cur = self.cursor()
cur.execute("SELECT id, object, method, recovery_date FROM fixed_object")
for (id, object_, method, recovery_date) in cur:
yield FixedObject(
id=CoreSWHID.from_string(id),
object_=object_,
method=method,
recovery_date=recovery_date,
)
# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Reads all known corrupts objects from the swh-scrubber database,
and tries to recover them.
Currently, only recovery from Git origins is implemented"""
import dataclasses
import functools
import logging
import os
from pathlib import Path
import subprocess
import tempfile
from typing import Dict, Optional, Type, Union
import dulwich
import dulwich.objects
import dulwich.repo
import psycopg2
from swh.journal.serializers import kafka_to_value, value_to_kafka
from swh.loader.git import converters
from swh.model.hashutil import hash_to_bytehex, hash_to_hex
from swh.model.model import BaseModel, Directory, Release, Revision, Snapshot
from swh.model.swhids import CoreSWHID, ObjectType
from .db import CorruptObject, FixedObject, ScrubberDb
from .utils import iter_corrupt_objects
logger = logging.getLogger(__name__)
ScrubbableObject = Union[Revision, Release, Snapshot, Directory]
def get_object_from_clone(
clone_path: Path, swhid: CoreSWHID
) -> Union[None, bytes, dulwich.objects.ShaFile]:
"""Reads the original object matching the ``corrupt_object`` from the given clone
if it exists, and returns a Dulwich object if possible, or a the raw manifest."""
try:
repo = dulwich.repo.Repo(str(clone_path))
except dulwich.errors.NotGitRepository:
return None
with repo: # needed to avoid packfile fd leaks
try:
return repo[hash_to_bytehex(swhid.object_id)]
except KeyError:
return None
except dulwich.errors.ObjectFormatException:
# fallback to git if dulwich can't parse it.
# Unfortunately, Dulwich does not allow fetching an object without
# parsing it into a ShaFile subclass, so we have to manually get it
# by shelling out to git.
object_type = (
subprocess.check_output(
[
"git",
"-C",
clone_path,
"cat-file",
"-t",
hash_to_hex(swhid.object_id),
]
)
.decode()
.strip()
)
manifest = subprocess.check_output(
[
"git",
"-C",
clone_path,
"cat-file",
object_type,
hash_to_hex(swhid.object_id),
]
)
manifest = f"{object_type} {len(manifest)}\x00".encode() + manifest
logger.info("Dulwich failed to parse %r", manifest)
return manifest
def get_fixed_object_from_clone(
clone_path: Path, corrupt_object: CorruptObject
) -> Optional[FixedObject]:
"""Reads the original object matching the ``corrupt_object`` from the given clone
if it exists, and returns a :class:`FixedObject` instance ready to be inserted
in the database."""
cloned_dulwich_obj_or_manifest = get_object_from_clone(
clone_path, corrupt_object.id
)
if cloned_dulwich_obj_or_manifest is None:
# Origin still exists, but object disappeared
logger.info("%s not found in origin", corrupt_object.id)
return None
elif isinstance(cloned_dulwich_obj_or_manifest, bytes):
# Dulwich could not parse it. Add as raw manifest to the existing object
d = kafka_to_value(corrupt_object.object_)
assert d.get("raw_manifest") is None, "Corrupt object has a raw_manifest"
d["raw_manifest"] = cloned_dulwich_obj_or_manifest
# Rebuild the object from the stored corrupt object + the raw manifest
# just recovered; then checksum it.
classes: Dict[ObjectType, Type[BaseModel]] = {
ObjectType.REVISION: Revision,
ObjectType.DIRECTORY: Directory,
ObjectType.RELEASE: Release,
}
cls = classes[corrupt_object.id.object_type]
recovered_obj = cls.from_dict(d)
recovered_obj.check()
return FixedObject(
id=corrupt_object.id,
object_=value_to_kafka(d),
method="manifest_from_origin",
)
else:
converter = {
ObjectType.REVISION: converters.dulwich_commit_to_revision,
ObjectType.DIRECTORY: converters.dulwich_tree_to_directory,
ObjectType.RELEASE: converters.dulwich_tag_to_release,
}[corrupt_object.id.object_type]
cloned_obj = converter(cloned_dulwich_obj_or_manifest)
# Check checksum, among others
cloned_obj.check()
return FixedObject(
id=corrupt_object.id,
object_=value_to_kafka(cloned_obj.to_dict()),
method="from_origin",
)
@dataclasses.dataclass
class Fixer:
"""Reads a chunk of corrupt objects in the swh-scrubber database, tries to recover
them through various means (brute-forcing fields and re-downloading from the origin)
recomputes checksums, and writes them back to the swh-scrubber database
if successful.
"""
db: ScrubberDb
"""Database to read from and write to."""
start_object: CoreSWHID = CoreSWHID.from_string("swh:1:cnt:" + "00" * 20)
"""Minimum SWHID to check (in alphabetical order)"""
end_object: CoreSWHID = CoreSWHID.from_string("swh:1:snp:" + "ff" * 20)
"""Maximum SWHID to check (in alphabetical order)"""
def run(self):
# TODO: currently only support re-downloading from the origin:
# we should try brute-forcing for objects with no known origin (or when
# all origins fail)
after = ""
while True:
new_origins = self.db.object_origin_get(after=after)
if not new_origins:
break
for origin_url in new_origins:
self.recover_objects_from_origin(origin_url)
after = new_origins[-1]
def recover_objects_from_origin(self, origin_url):
"""Clones an origin, and cherry-picks original objects that are known to be
corrupt in the database."""
with tempfile.TemporaryDirectory(prefix=__name__ + ".") as tempdir:
clone_path = Path(tempdir) / "repository.git"
try:
subprocess.run(
["git", "clone", "--bare", origin_url, clone_path],
env={"PATH": os.environ["PATH"], "GIT_TERMINAL_PROMPT": "0"},
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
)
except Exception:
logger.exception("Failed to clone %s", origin_url)
return
iter_corrupt_objects(
self.db,
self.start_object,
self.end_object,
origin_url,
functools.partial(self.recover_corrupt_object, clone_path=clone_path),
)
def recover_corrupt_object(
self,
corrupt_object: CorruptObject,
cur: psycopg2.extensions.cursor,
clone_path: Path,
) -> None:
fixed_object = get_fixed_object_from_clone(clone_path, corrupt_object)
if fixed_object is not None:
self.db.fixed_object_add(cur, [fixed_object])
......@@ -71,7 +71,11 @@ class OriginLocator:
def run(self):
iter_corrupt_objects(
self.db, self.start_object, self.end_object, self.handle_corrupt_object
self.db,
self.start_object,
self.end_object,
None,
self.handle_corrupt_object,
)
def handle_corrupt_object(
......
......@@ -35,3 +35,16 @@ create table object_origin
);
comment on table object_origin is 'Maps objects to origins they might be found in.';
create table fixed_object
(
id swhid not null,
object bytea not null,
method text,
recovery_date timestamptz not null default now()
);
comment on table fixed_object is 'Each row identifies an object that was found to be corrupt, along with the original version of the object';
comment on column fixed_object.object is 'The recovered object itself, as a msgpack-encoded dict';
comment on column fixed_object.recovery_date is 'Moment the object was recovered.';
comment on column fixed_object.method is 'How the object was recovered. For example: "from_origin", "negative_utc", "capitalized_revision_parent".';
......@@ -22,3 +22,8 @@ create index concurrently object_origin_by_origin on object_origin (origin_url,
-- FIXME: not valid, because corrupt_object(id) is not unique
-- alter table object_origin add constraint object_origin_object_fkey foreign key (object_id) references corrupt_object(id) not valid;
-- alter table object_origin validate constraint object_origin_object_fkey;
-- fixed_object
create unique index concurrently fixed_object_pkey on fixed_object(id);
alter table fixed_object add primary key using index fixed_object_pkey;
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment