From a32f687151025289568fd0f410cf93df6cb304e0 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz <vlorentz@softwareheritage.org> Date: Thu, 25 Mar 2021 19:17:17 +0100 Subject: [PATCH] package.loader: Lookup packages from the ExtID storage To check which packages are already downloaded. For now, this lookup is done in addition to checking the artifacts from the last snapshot's revisions' metadata, because we did not start writing ExtIDs yet. But the ExtID lookup will eventually replace the artifact-based lookup. This will finally allow us to drop the 'metadata' field of Revision objects. --- .pre-commit-config.yaml | 1 + swh/loader/package/loader.py | 122 +++++++++++++- swh/loader/package/tests/test_loader.py | 208 +++++++++++++++++++++++- 3 files changed, 328 insertions(+), 3 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c1c35cb0..7e4b2966 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,6 +16,7 @@ repos: hooks: - id: codespell exclude: ^(swh/loader/package/.*[/]+tests/data/.*)$ + entry: codespell --ignore-words-list=iff - repo: local hooks: diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py index 66702a65..603b8649 100644 --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -22,6 +22,7 @@ from typing import ( Mapping, Optional, Sequence, + Set, Tuple, TypeVar, ) @@ -287,6 +288,93 @@ class PackageLoader(BaseLoader, Generic[TPackageInfo]): return None + def _get_known_extids( + self, packages_info: List[TPackageInfo] + ) -> Dict[PartialExtID, List[CoreSWHID]]: + """Compute the ExtIDs from new PackageInfo objects, searches which are already + loaded in the archive, and returns them if any.""" + + # Compute the ExtIDs of all the new packages, grouped by extid type + new_extids: Dict[str, List[bytes]] = {} + for p_info in packages_info: + res = p_info.extid() + if res is not None: + (extid_type, extid_extid) = res + new_extids.setdefault(extid_type, []).append(extid_extid) + + # For each extid type, call extid_get_from_extid() with all the extids of + # that type, and store them in the '(type, extid) -> target' map. + known_extids: Dict[PartialExtID, List[CoreSWHID]] = {} + for (extid_type, extids) in new_extids.items(): + for extid in self.storage.extid_get_from_extid(extid_type, extids): + if extid is not None: + key = (extid.extid_type, extid.extid) + known_extids.setdefault(key, []).append(extid.target) + + return known_extids + + def resolve_revision_from_extids( + self, + known_extids: Dict[PartialExtID, List[CoreSWHID]], + p_info: TPackageInfo, + revision_whitelist: Set[Sha1Git], + ) -> Optional[Sha1Git]: + """Resolve the revision from known ExtIDs and a package info object. + + If the artifact has already been downloaded, this will return the + existing revision targeting that uncompressed artifact directory. + Otherwise, this returns None. + + Args: + known_extids: Dict built from a list of ExtID, with the target as value + p_info: Package information + revision_whitelist: Any ExtID with target not in this set is filtered out + + Returns: + None or revision identifier + + """ + new_extid = p_info.extid() + if new_extid is None: + return None + + for extid_target in known_extids.get(new_extid, []): + if extid_target.object_id not in revision_whitelist: + # There is a known ExtID for this package, but its target is not + # in the snapshot. + # This can happen for three reasons: + # + # 1. a loader crashed after writing the ExtID, but before writing + # the snapshot + # 2. some other loader loaded the same artifact, but produced + # a different revision, causing an additional ExtID object + # to be written. We will probably find this loader's ExtID + # in a future iteration of this loop. + # Note that for now, this is impossible, as each loader has a + # completely different extid_type, but this is an implementation + # detail of each loader. + # 3. we took a snapshot, then the package disappeared, + # then we took another snapshot, and the package reappeared + # + # In case of 1, we must actually load the package now, + # so let's do it. + # TODO: detect when we are in case 3 using revision_missing instead + # of the snapshot. + continue + elif extid_target.object_type != ObjectType.REVISION: + # We only support revisions for now. + # Note that this case should never be reached unless there is a + # collision between a revision hash and some non-revision object's + # hash, but better safe than sorry. + logger.warning( + "%s is in the revision whitelist, but is not a revision.", + hash_to_hex(extid_target.object_type), + ) + continue + return extid_target.object_id + + return None + def download_package( self, p_info: TPackageInfo, tmpdir: str ) -> List[Tuple[str, Mapping]]: @@ -441,6 +529,8 @@ class PackageLoader(BaseLoader, Generic[TPackageInfo]): sentry_sdk.capture_exception(e) return {"status": "failed"} + # Get the previous snapshot for this origin. It is then used to see which + # of the package's versions are already loaded in the archive. try: last_snapshot = self.last_snapshot() logger.debug("last snapshot: %s", last_snapshot) @@ -459,6 +549,7 @@ class PackageLoader(BaseLoader, Generic[TPackageInfo]): load_exceptions: List[Exception] = [] + # Get the list of all version names try: versions = self.get_versions() except NotFound: @@ -478,19 +569,46 @@ class PackageLoader(BaseLoader, Generic[TPackageInfo]): status_load="failed", ) - packages_info = [ + # Get the metadata of each version's package + packages_info: List[Tuple[str, str, TPackageInfo]] = [ (version, branch_name, p_info) for version in versions for (branch_name, p_info) in self.get_package_info(version) ] + # Compute the ExtID of each of these packages + known_extids = self._get_known_extids( + [p_info for (_, _, p_info) in packages_info] + ) + + if last_snapshot is None: + last_snapshot_targets: Set[Sha1Git] = set() + else: + last_snapshot_targets = { + branch.target for branch in last_snapshot.branches.values() + } + tmp_revisions: Dict[str, List[Tuple[str, Sha1Git]]] = { version: [] for version in versions } for (version, branch_name, p_info) in packages_info: logger.debug("package_info: %s", p_info) - revision_id = self.resolve_revision_from_artifacts(known_artifacts, p_info) + + # Check if the package was already loaded, using its ExtID + revision_id = self.resolve_revision_from_extids( + known_extids, p_info, last_snapshot_targets + ) + + if revision_id is None: + # No existing revision found from an acceptable ExtID, + # search in the artifact data instead. + # TODO: remove this after we finished migrating to ExtIDs. + revision_id = self.resolve_revision_from_artifacts( + known_artifacts, p_info + ) + if revision_id is None: + # No matching revision found in the last snapshot, load it. try: res = self._load_revision(p_info, origin) if res: diff --git a/swh/loader/package/tests/test_loader.py b/swh/loader/package/tests/test_loader.py index 5d96ba4b..0e3c8164 100644 --- a/swh/loader/package/tests/test_loader.py +++ b/swh/loader/package/tests/test_loader.py @@ -3,14 +3,27 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime import hashlib import string -from unittest.mock import Mock +from unittest.mock import Mock, call, patch import attr import pytest from swh.loader.package.loader import BasePackageInfo, PackageLoader +from swh.model.identifiers import CoreSWHID, ObjectType +from swh.model.model import ( + ExtID, + Origin, + OriginVisit, + OriginVisitStatus, + Snapshot, + SnapshotBranch, + TargetType, +) +from swh.storage import get_storage +from swh.storage.algos.snapshot import snapshot_get_latest class FakeStorage: @@ -29,6 +42,31 @@ class FakeStorage2(FakeStorage): raise ValueError("We refuse to add an origin visit") +class StubPackageInfo(BasePackageInfo): + pass + + +class StubPackageLoader(PackageLoader[StubPackageInfo]): + def get_versions(self): + return ["v1.0", "v2.0", "v3.0", "v4.0"] + + def get_package_info(self, version): + p_info = StubPackageInfo("http://example.org", f"example-{version}.tar") + extid_type = "extid-type1" if version in ("v1.0", "v2.0") else "extid-type2" + # Versions 1.0 and 2.0 have an extid of a given type, v3.0 has an extid + # of a different type + patch.object( + p_info, + "extid", + return_value=(extid_type, f"extid-of-{version}".encode()), + autospec=True, + ).start() + yield (f"branch-{version}", p_info) + + def _load_revision(self, p_info, origin): + return None + + def test_loader_origin_visit_failure(swh_storage): """Failure to add origin or origin visit should failed immediately @@ -90,6 +128,174 @@ def test_resolve_revision_from_artifacts() -> None: loader.known_artifact_to_extid.assert_called_once_with({"key": "extid-of-aaaa"}) +def test_resolve_revision_from_extids() -> None: + loader = PackageLoader(None, None) # type: ignore + + p_info = Mock(wraps=BasePackageInfo(None, None)) # type: ignore + + # The PackageInfo does not support extids + p_info.extid.return_value = None + known_extids = { + ("extid-type", b"extid-of-aaaa"): [ + CoreSWHID(object_type=ObjectType.REVISION, object_id=b"a" * 20), + ] + } + revision_whitelist = {b"unused"} + assert ( + loader.resolve_revision_from_extids(known_extids, p_info, revision_whitelist) + is None + ) + + # Some known extid, and the PackageInfo is not one of them (ie. cache miss) + p_info.extid.return_value = ("extid-type", b"extid-of-cccc") + assert ( + loader.resolve_revision_from_extids(known_extids, p_info, revision_whitelist) + is None + ) + + # Some known extid, and the PackageInfo is one of them (ie. cache hit), + # but the target revision was not in the previous snapshot + p_info.extid.return_value = ("extid-type", b"extid-of-aaaa") + assert ( + loader.resolve_revision_from_extids(known_extids, p_info, revision_whitelist) + is None + ) + + # Some known extid, and the PackageInfo is one of them (ie. cache hit), + # and the target revision was in the previous snapshot + revision_whitelist = {b"a" * 20} + assert ( + loader.resolve_revision_from_extids(known_extids, p_info, revision_whitelist) + == b"a" * 20 + ) + + # Same as before, but there is more than one extid, and only one is an allowed + # revision + revision_whitelist = {b"a" * 20} + known_extids = { + ("extid-type", b"extid-of-aaaa"): [ + CoreSWHID(object_type=ObjectType.REVISION, object_id=b"b" * 20), + CoreSWHID(object_type=ObjectType.REVISION, object_id=b"a" * 20), + ] + } + assert ( + loader.resolve_revision_from_extids(known_extids, p_info, revision_whitelist) + == b"a" * 20 + ) + + +def test_load_get_known_extids() -> None: + """Checks PackageLoader.load() fetches known extids efficiently""" + storage = Mock(wraps=get_storage("memory")) + + loader = StubPackageLoader(storage, "http://example.org") + + loader.load() + + # Calls should be grouped by extid type + storage.extid_get_from_extid.assert_has_calls( + [ + call("extid-type1", [b"extid-of-v1.0", b"extid-of-v2.0"]), + call("extid-type2", [b"extid-of-v3.0", b"extid-of-v4.0"]), + ], + any_order=True, + ) + + +def test_load_skip_extids() -> None: + """Checks PackageLoader.load() skips iff it should.""" + storage = get_storage("memory") + + origin = "http://example.org" + rev1_swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=b"a" * 20) + rev2_swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=b"b" * 20) + rev3_swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=b"c" * 20) + rev4_swhid = CoreSWHID(object_type=ObjectType.REVISION, object_id=b"d" * 20) + dir_swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=b"e" * 20) + + loader = StubPackageLoader(storage, "http://example.org") + patch.object( + loader, + "_load_revision", + return_value=(rev4_swhid.object_id, dir_swhid.object_id), + autospec=True, + ).start() + + # Results of a previous load + storage.extid_add( + [ + ExtID("extid-type1", b"extid-of-v1.0", rev1_swhid), + ExtID("extid-type2", b"extid-of-v2.0", rev2_swhid), + ] + ) + last_snapshot = Snapshot( + branches={ + b"v1.0": SnapshotBranch( + target_type=TargetType.REVISION, target=rev1_swhid.object_id + ), + b"v2.0": SnapshotBranch( + target_type=TargetType.REVISION, target=rev2_swhid.object_id + ), + b"v3.0": SnapshotBranch( + target_type=TargetType.REVISION, target=rev3_swhid.object_id + ), + } + ) + storage.snapshot_add([last_snapshot]) + date = datetime.datetime.now(tz=datetime.timezone.utc) + storage.origin_add([Origin(url=origin)]) + storage.origin_visit_add( + [OriginVisit(origin="http://example.org", visit=1, date=date, type="tar")] + ) + storage.origin_visit_status_add( + [ + OriginVisitStatus( + origin=origin, + visit=1, + status="full", + date=date, + snapshot=last_snapshot.id, + ) + ] + ) + + loader.load() + + assert loader._load_revision.mock_calls == [ # type: ignore + # v1.0: not loaded because there is already its (extid_type, extid, rev) + # in the storage. + # v2.0: loaded, because there is already a similar extid, but different type + call(StubPackageInfo(origin, "example-v2.0.tar"), Origin(url=origin)), + # v3.0: loaded despite having an (extid_type, extid) in storage, because + # the target of the extid is not in the previous snapshot + call(StubPackageInfo(origin, "example-v3.0.tar"), Origin(url=origin)), + # v4.0: loaded, because there isn't its extid + call(StubPackageInfo(origin, "example-v4.0.tar"), Origin(url=origin)), + ] + + # then check the snapshot has all the branches. + # versions 2.0 to 4.0 all point to rev4_swhid (instead of the value of the last + # snapshot), because they had to be loaded (mismatched extid), and the mocked + # _load_revision always returns rev4_swhid. + snapshot = Snapshot( + branches={ + b"branch-v1.0": SnapshotBranch( + target_type=TargetType.REVISION, target=rev1_swhid.object_id + ), + b"branch-v2.0": SnapshotBranch( + target_type=TargetType.REVISION, target=rev4_swhid.object_id + ), + b"branch-v3.0": SnapshotBranch( + target_type=TargetType.REVISION, target=rev4_swhid.object_id + ), + b"branch-v4.0": SnapshotBranch( + target_type=TargetType.REVISION, target=rev4_swhid.object_id + ), + } + ) + assert snapshot_get_latest(storage, origin) == snapshot + + def test_manifest_extid(): """Compute primary key should return the right identity -- GitLab