From 798f749e66c883fa3a7c962681690166586c730f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Gom=C3=A8s?= <rgomes@octobus.net> Date: Thu, 22 Sep 2022 12:09:23 +0200 Subject: [PATCH] Use a Merkle discovery algorithm with archives "Discovery" is the term used to find out the differences between two Merkle graphs. Using such an algorithm is useful in that it drastically reduces the amount of data that needs to be transferred. This commit introduces an efficient but simple algorithm that is a good starting point for improved performance: random sampling of directories, the details of which are explained in the docstrings. Mercurial uses a more sophisticated algorithm for its discovery, but it is quite a bit more involved and would introduce too much complexity at once. Also, the constraints for speed that Mercurial has (in the order of milliseconds) don't apply as obviously to this context without further investigation. Benchmarks ========== Setup ----- - With a local postgresql storage (so no network overhead), a local tmpfs obstorage on a fast NVME SSD, all of which should make this improvement look less good than it will be in production - With a tarball of the linux kernel at commit d96d875ef5dd372f533059a44f98e92de9cf0d42 already loaded - Loading a tarball of 20 commits earlier (bf3f401db6cbe010095fe3d1e233a5fde54e8b78) - Only taking into account the loading (not the downloading of the tarball, or its decompression) Result ------ before: ~30s after: ~17s Reproduced 4 times. --- swh/loader/core/discovery.py | 207 ++++++++++++++++++ .../package/archive/tests/test_archive.py | 123 +++++++++++ swh/loader/package/loader.py | 7 + 3 files changed, 337 insertions(+) create mode 100644 swh/loader/core/discovery.py diff --git a/swh/loader/core/discovery.py b/swh/loader/core/discovery.py new file mode 100644 index 00000000..794e3566 --- /dev/null +++ b/swh/loader/core/discovery.py @@ -0,0 +1,207 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Primitives for finding the unknown parts of disk contents efficiently.""" + +from collections import namedtuple +import itertools +import logging +import random +from typing import Any, Iterable, List, Mapping, NamedTuple, Set, Union + +from swh.model.from_disk import model +from swh.model.model import Sha1Git +from swh.storage.interface import StorageInterface + +logger = logging.getLogger(__name__) + +# Maximum amount when sampling from the undecided set of directory entries +SAMPLE_SIZE = 1000 + +# Sets of sha1 of contents, skipped contents and directories respectively +Sample: NamedTuple = namedtuple( + "Sample", ["contents", "skipped_contents", "directories"] +) + + +class BaseDiscoveryGraph: + """Creates the base structures and methods needed for discovery algorithms. + Subclasses should override ``get_sample`` to affect how the discovery is made.""" + + def __init__(self, contents, skipped_contents, directories): + self._all_contents: Mapping[ + Sha1Git, Union[model.Content, model.SkippedContent] + ] = {} + self._undecided_directories: Set[Sha1Git] = set() + self._children: Mapping[Sha1Git, model.DirectoryEntry] = {} + self._parents: Mapping[model.DirectoryEntry, Sha1Git] = {} + self.undecided: Set[Sha1Git] = set() + + for content in itertools.chain(contents, skipped_contents): + self.undecided.add(content.sha1_git) + self._all_contents[content.sha1_git] = content + + for directory in directories: + self.undecided.add(directory.id) + self._undecided_directories.add(directory.id) + self._children[directory.id] = {c.target for c in directory.entries} + for child in directory.entries: + self._parents.setdefault(child.target, set()).add(directory.id) + + self.undecided |= self._undecided_directories + self.known: Set[Sha1Git] = set() + self.unknown: Set[Sha1Git] = set() + + def mark_known(self, entries: Iterable[Sha1Git]): + """Mark ``entries`` and those they imply as known in the SWH archive""" + self._mark_entries(entries, self._children, self.known) + + def mark_unknown(self, entries: Iterable[Sha1Git]): + """Mark ``entries`` and those they imply as unknown in the SWH archive""" + self._mark_entries(entries, self._parents, self.unknown) + + def _mark_entries( + self, + entries: Iterable[Sha1Git], + transitive_mapping: Mapping[Any, Any], + target_set: Set[Any], + ): + """Use Merkle graph properties to mark a directory entry as known or unknown. + + If an entry is known, then all of its descendants are known. If it's + unknown, then all of its ancestors are unknown. + + - ``entries``: directory entries to mark along with their ancestors/descendants + where applicable. + - ``transitive_mapping``: mapping from an entry to the next entries to mark + in the hierarchy, if any. + - ``target_set``: set where marked entries will be added. + + """ + to_process = set(entries) + while to_process: + current = to_process.pop() + target_set.add(current) + self.undecided.discard(current) + self._undecided_directories.discard(current) + next_entries = transitive_mapping.get(current, set()) & self.undecided + to_process.update(next_entries) + + def get_sample( + self, + ) -> Sample: + """Return a three-tuple of samples from the undecided sets of contents, + skipped contents and directories respectively. + These samples will be queried against the storage which will tell us + which are known.""" + raise NotImplementedError() + + def do_query(self, swh_storage: StorageInterface, sample: Sample) -> None: + """Given a three-tuple of samples, ask the storage which are known or + unknown and mark them as such.""" + contents_sample, skipped_contents_sample, directories_sample = sample + + # TODO unify those APIs, and make it so only have to manipulate hashes + if contents_sample: + known = set(contents_sample) + unknown = set(swh_storage.content_missing_per_sha1_git(contents_sample)) + known -= unknown + + self.mark_known(known) + self.mark_unknown(unknown) + + if skipped_contents_sample: + known = set(skipped_contents_sample) + as_dicts = [ + self._all_contents[s].to_dict() for s in skipped_contents_sample + ] + unknown = { + d["sha1_git"] for d in swh_storage.skipped_content_missing(as_dicts) + } + known -= unknown + + self.mark_known(known) + self.mark_unknown(unknown) + + if directories_sample: + known = set(directories_sample) + unknown = set(swh_storage.directory_missing(directories_sample)) + known -= unknown + + self.mark_known(known) + self.mark_unknown(unknown) + + +class RandomDirSamplingDiscoveryGraph(BaseDiscoveryGraph): + """Use a random sampling using only directories. + + This allows us to find a statistically good spread of entries in the graph + with a smaller population than using all types of entries. When there are + no more directories, only contents or skipped contents are undecided if any + are left: we send them directly to the storage since they should be few and + their structure flat.""" + + def get_sample(self) -> Sample: + if self._undecided_directories: + if len(self._undecided_directories) <= SAMPLE_SIZE: + return Sample( + contents=set(), + skipped_contents=set(), + directories=set(self._undecided_directories), + ) + sample = random.sample(self._undecided_directories, SAMPLE_SIZE) + directories = {o for o in sample} + return Sample( + contents=set(), skipped_contents=set(), directories=directories + ) + + contents = set() + skipped_contents = set() + + for sha1 in self.undecided: + obj = self._all_contents[sha1] + obj_type = obj.object_type + if obj_type == model.Content.object_type: + contents.add(sha1) + elif obj_type == model.SkippedContent.object_type: + skipped_contents.add(sha1) + else: + raise TypeError(f"Unexpected object type {obj_type}") + + return Sample( + contents=contents, skipped_contents=skipped_contents, directories=set() + ) + + +def filter_known_objects( + swh_storage: StorageInterface, + contents: List[model.Content], + skipped_contents: List[model.SkippedContent], + directories: List[model.Directory], +): + """Filter ``contents``, ``skipped_contents`` and ``directories`` to only return + those that are unknown to the SWH archive using a discovery algorithm.""" + contents_count = len(contents) + skipped_contents_count = len(skipped_contents) + directories_count = len(directories) + + graph = RandomDirSamplingDiscoveryGraph(contents, skipped_contents, directories) + + while graph.undecided: + sample = graph.get_sample() + graph.do_query(swh_storage, sample) + + contents = [c for c in contents if c.sha1_git in graph.unknown] + skipped_contents = [c for c in skipped_contents if c.sha1_git in graph.unknown] + directories = [c for c in directories if c.id in graph.unknown] + + logger.debug( + "Filtered out %d contents, %d skipped contents and %d directories", + contents_count - len(contents), + skipped_contents_count - len(skipped_contents), + directories_count - len(directories), + ) + + return (contents, skipped_contents, directories) diff --git a/swh/loader/package/archive/tests/test_archive.py b/swh/loader/package/archive/tests/test_archive.py index 7a32b2c3..a000d3dd 100644 --- a/swh/loader/package/archive/tests/test_archive.py +++ b/swh/loader/package/archive/tests/test_archive.py @@ -93,6 +93,13 @@ _expected_new_releases_first_visit = { } +@pytest.fixture(autouse=True, scope="function") +def lower_sample_rate(mocker): + """Lower the number of entries per discovery sample so the minimum threshold + for discovery is hit in tests without creating huge test data""" + mocker.patch("swh.loader.package.loader.discovery.SAMPLE_SIZE", 1) + + def test_archive_visit_with_no_artifact_found(swh_storage, requests_mock_datadir): url = URL unknown_artifact_url = "https://ftp.g.o/unknown/8sync-0.1.0.tar.gz" @@ -129,6 +136,122 @@ def test_archive_visit_with_no_artifact_found(swh_storage, requests_mock_datadir assert_last_visit_matches(swh_storage, url, status="partial", type="tar") +def test_archive_visit_with_skipped_content(swh_storage, requests_mock_datadir): + """With no prior visit, load a gnu project and set the max content size + to something low to check that the loader skips "big" content.""" + loader = ArchiveLoader( + swh_storage, URL, artifacts=GNU_ARTIFACTS[:1], max_content_size=10 * 1024 + ) + + actual_load_status = loader.load() + assert actual_load_status["status"] == "eventful" + + expected_snapshot_first_visit_id = hash_to_bytes( + "9efecc835e8f99254934f256b5301b94f348fd17" + ) + + assert actual_load_status["snapshot_id"] == hash_to_hex( + expected_snapshot_first_visit_id + ) + + assert_last_visit_matches(swh_storage, URL, status="full", type="tar") + + _expected_new_non_skipped_contents_first_visit = [ + "ae9be03bd2a06ed8f4f118d3fe76330bb1d77f62", + "809788434b433eb2e3cfabd5d591c9a659d5e3d8", + "1572607d456d7f633bc6065a2b3048496d679a31", + "27de3b3bc6545d2a797aeeb4657c0e215a0c2e55", + "fbd27c3f41f2668624ffc80b7ba5db9b92ff27ac", + "4f9709e64a9134fe8aefb36fd827b84d8b617ab5", + "84fb589b554fcb7f32b806951dcf19518d67b08f", + "3046e5d1f70297e2a507b98224b6222c9688d610", + "e08441aeab02704cfbd435d6445f7c072f8f524e", + "49d4c0ce1a16601f1e265d446b6c5ea6b512f27c", + "7d149b28eaa228b3871c91f0d5a95a2fa7cb0c68", + "f0c97052e567948adf03e641301e9983c478ccff", + "2e6db43f5cd764e677f416ff0d0c78c7a82ef19b", + "e9258d81faf5881a2f96a77ba609396f82cb97ad", + "7350628ccf194c2c3afba4ac588c33e3f3ac778d", + "0057bec9b5422aff9256af240b177ac0e3ac2608", + "6b5cc594ac466351450f7f64a0b79fdaf4435ad3", + ] + + _expected_new_skipped_contents_first_visit = [ + "1170cf105b04b7e2822a0e09d2acf71da7b9a130", + "2b8d0d0b43a1078fc708930c8ddc2956a86c566e", + "edeb33282b2bffa0e608e9d2fd960fd08093c0ea", + "d64e64d4c73679323f8d4cde2643331ba6c20af9", + "7a756602914be889c0a2d3952c710144b3e64cb0", + "8624bcdae55baeef00cd11d5dfcfa60f68710a02", + "f67935bc3a83a67259cda4b2d43373bd56703844", + "7d7c6c8c5ebaeff879f61f37083a3854184f6c41", + "b99fec102eb24bffd53ab61fc30d59e810f116a2", + "7fb724242e2b62b85ca64190c31dcae5303e19b3", + "0bb892d9391aa706dc2c3b1906567df43cbe06a2", + ] + + # Check that the union of both sets make up the original set (without skipping) + union = set(_expected_new_non_skipped_contents_first_visit) | set( + _expected_new_skipped_contents_first_visit + ) + assert union == set(_expected_new_contents_first_visit) + + stats = get_stats(swh_storage) + assert { + "content": len(_expected_new_non_skipped_contents_first_visit), + "directory": len(_expected_new_directories_first_visit), + "origin": 1, + "origin_visit": 1, + "release": len(_expected_new_releases_first_visit), + "revision": 0, + "skipped_content": len(_expected_new_skipped_contents_first_visit), + "snapshot": 1, + } == stats + + release_id = hash_to_bytes(list(_expected_new_releases_first_visit)[0]) + expected_snapshot = Snapshot( + id=expected_snapshot_first_visit_id, + branches={ + b"HEAD": SnapshotBranch( + target_type=TargetType.ALIAS, + target=b"releases/0.1.0", + ), + b"releases/0.1.0": SnapshotBranch( + target_type=TargetType.RELEASE, + target=release_id, + ), + }, + ) + check_snapshot(expected_snapshot, swh_storage) + + assert swh_storage.release_get([release_id])[0] == Release( + id=release_id, + name=b"0.1.0", + message=( + b"Synthetic release for archive at " + b"https://ftp.gnu.org/gnu/8sync/8sync-0.1.0.tar.gz\n" + ), + target=hash_to_bytes("3aebc29ed1fccc4a6f2f2010fb8e57882406b528"), + target_type=ObjectType.DIRECTORY, + synthetic=True, + author=Person.from_fullname(b""), + date=TimestampWithTimezone.from_datetime( + datetime.datetime(1999, 12, 9, 8, 53, 30, tzinfo=datetime.timezone.utc) + ), + ) + + expected_contents = map( + hash_to_bytes, _expected_new_non_skipped_contents_first_visit + ) + assert list(swh_storage.content_missing_per_sha1(expected_contents)) == [] + + expected_dirs = map(hash_to_bytes, _expected_new_directories_first_visit) + assert list(swh_storage.directory_missing(expected_dirs)) == [] + + expected_rels = map(hash_to_bytes, _expected_new_releases_first_visit) + assert list(swh_storage.release_missing(expected_rels)) == [] + + def test_archive_visit_with_release_artifact_no_prior_visit( swh_storage, requests_mock_datadir ): diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py index 4cc64302..52a083d7 100644 --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -31,6 +31,7 @@ from requests.exceptions import ContentDecodingError import sentry_sdk from swh.core.tarball import uncompress +from swh.loader.core import discovery from swh.loader.core.loader import BaseLoader from swh.loader.exception import NotFound from swh.loader.package.utils import download @@ -817,6 +818,12 @@ class PackageLoader(BaseLoader, Generic[TPackageInfo]): contents, skipped_contents, directories = from_disk.iter_directory(directory) + # Instead of sending everything from the bottom up to the storage, + # use a Merkle graph discovery algorithm to filter out known objects. + contents, skipped_contents, directories = discovery.filter_known_objects( + self.storage, contents, skipped_contents, directories + ) + logger.debug("Number of skipped contents: %s", len(skipped_contents)) self.storage.skipped_content_add(skipped_contents) logger.debug("Number of contents: %s", len(contents)) -- GitLab