From 48a46285dde83233c92f6caca67ede8f1e802095 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20Bobbio=20=28Lunar=29?= <lunar@softwareheritage.org> Date: Tue, 14 Mar 2023 15:06:25 +0100 Subject: [PATCH] Add several helper methods returning SWHIDs This adds several helper methods returning SWHIDs to model objects, namely: - SkippedContent.swhid() - DirectoryEntry.swhid() - SnapshotBranch.swhid() - Release.target_swhid() - Revision.directory_swhid() and Release.parent_swhids() - OriginVisitStatus.origin_swhid() and OriginVisitStatus.snapshot_swhid() --- swh/model/model.py | 57 ++++++++++++++++ swh/model/tests/test_model.py | 123 ++++++++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+) diff --git a/swh/model/model.py b/swh/model/model.py index 3572284d..03b35d6c 100644 --- a/swh/model/model.py +++ b/swh/model/model.py @@ -858,6 +858,14 @@ class OriginVisitStatus(BaseModel): def unique_key(self) -> KeyType: return {"origin": self.origin, "visit": str(self.visit), "date": str(self.date)} + def origin_swhid(self) -> ExtendedSWHID: + return Origin(url=self.origin).swhid() + + def snapshot_swhid(self) -> Optional[CoreSWHID]: + if self.snapshot is None: + return None + return CoreSWHID(object_type=SwhidObjectType.SNAPSHOT, object_id=self.snapshot) + class TargetType(Enum): """The type of content pointed to by a snapshot branch. Usually a @@ -910,6 +918,15 @@ class SnapshotBranch(BaseModel): def from_dict(cls, d): return cls(target=d["target"], target_type=TargetType(d["target_type"])) + def swhid(self) -> Optional[CoreSWHID]: + """Returns a SWHID for the current branch or None if the branch has no + target or is an alias.""" + if self.target is None or self.target_type == TargetType.ALIAS: + return None + return CoreSWHID( + object_id=self.target, object_type=SwhidObjectType[self.target_type.name] + ) + @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) class Snapshot(HashableObject, BaseModel): @@ -1005,6 +1022,14 @@ class Release(HashableObjectWithManifest, BaseModel): """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.RELEASE, object_id=self.id) + def target_swhid(self) -> Optional[CoreSWHID]: + """Returns the SWHID for the target of this release or None if unset.""" + if self.target is None: + return None + return CoreSWHID( + object_id=self.target, object_type=SwhidObjectType[self.target_type.name] + ) + def anonymize(self) -> "Release": """Returns an anonymized version of the Release object. @@ -1133,6 +1158,19 @@ class Revision(HashableObjectWithManifest, BaseModel): """Returns a SWHID representing this object.""" return CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=self.id) + def directory_swhid(self) -> CoreSWHID: + """Returns the SWHID for the directory referenced by the revision.""" + return CoreSWHID( + object_type=SwhidObjectType.DIRECTORY, object_id=self.directory + ) + + def parent_swhids(self) -> List[CoreSWHID]: + """Returns a list of SWHID for the parent revisions.""" + return [ + CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=parent) + for parent in self.parents + ] + def anonymize(self) -> "Revision": """Returns an anonymized version of the Revision object. @@ -1159,6 +1197,12 @@ class DirectoryEntry(BaseModel): perms = attr.ib(type=int, validator=generic_type_validator, converter=int, repr=oct) """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" + DIR_ENTRY_TYPE_TO_SWHID_OBJECT_TYPE = { + "file": SwhidObjectType.CONTENT, + "dir": SwhidObjectType.DIRECTORY, + "rev": SwhidObjectType.REVISION, + } + @name.validator def check_name(self, attribute, value): if value.__class__ is not bytes: @@ -1166,6 +1210,13 @@ class DirectoryEntry(BaseModel): if b"/" in value: raise ValueError(f"{value!r} is not a valid directory entry name.") + def swhid(self) -> CoreSWHID: + """Returns a SWHID for this directory entry""" + return CoreSWHID( + object_type=DirectoryEntry.DIR_ENTRY_TYPE_TO_SWHID_OBJECT_TYPE[self.type], + object_id=self.target, + ) + @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) class Directory(HashableObjectWithManifest, BaseModel): @@ -1505,6 +1556,12 @@ class SkippedContent(BaseContent): def unique_key(self) -> KeyType: return self.hashes() + def swhid(self) -> Optional[CoreSWHID]: + """Returns a SWHID representing this object or None if unset.""" + if self.sha1_git is None: + return None + return CoreSWHID(object_type=SwhidObjectType.CONTENT, object_id=self.sha1_git) + class MetadataAuthorityType(Enum): DEPOSIT_CLIENT = "deposit_client" diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py index 409cc8bb..920c8dd5 100644 --- a/swh/model/tests/test_model.py +++ b/swh/model/tests/test_model.py @@ -426,6 +426,30 @@ def test_origin_visit_status_naive_datetime(): ) +@pytest.fixture +def origin_visit_status_example(): + tz = datetime.timezone(datetime.timedelta(minutes=+60)) + return OriginVisitStatus( + origin="http://foo/", + visit=42, + date=datetime.datetime.now(tz=tz), + status="full", + snapshot=hash_to_bytes("6e65b86363953b780d92b0a928f3e8fcdd10db36"), + ) + + +def test_origin_visit_status_snapshot_swhid(origin_visit_status_example): + assert origin_visit_status_example.snapshot_swhid() == CoreSWHID.from_string( + "swh:1:snp:6e65b86363953b780d92b0a928f3e8fcdd10db36" + ) + + +def test_origin_visit_status_origin_swhid(origin_visit_status_example): + assert origin_visit_status_example.origin_swhid() == ExtendedSWHID.from_string( + "swh:1:ori:e0cee4b024ab93b037a1c182865942f5430c6fa4" + ) + + # Timestamp @@ -895,6 +919,13 @@ def test_skipped_content_naive_datetime(): ) +def test_skipped_content_swhid(): + skipped_content = SkippedContent.from_data(b"foo", reason="reason") + assert skipped_content.swhid() == CoreSWHID.from_string( + "swh:1:cnt:19102815663d23f8b75a47e7a01965dcdc96468c" + ) + + # Directory @@ -1092,6 +1123,42 @@ def test_directory_from_possibly_duplicated_entries__preserve_manifest(): assert dir_.raw_manifest == b"blah" +@pytest.fixture +def directory_with_every_possible_type(): + return Directory.from_dict( + { + "entries": [ + { + "type": "file", + "perms": 33188, + "name": b"README", + "target": hash_to_bytes("37ec8ea2110c0b7a32fbb0e872f6e7debbf95e21"), + }, + { + "type": "dir", + "perms": 16384, + "name": b"src", + "target": hash_to_bytes("61e6e867f5d7ba3b40540869bc050b0c4fed9e95"), + }, + { + "type": "rev", + "perms": 57344, + "name": b"submodule", + "target": hash_to_bytes("3d531e169db92a16a9a8974f0ae6edf52e52659e"), + }, + ], + } + ) + + +def test_directory_entry_swhids(directory_with_every_possible_type): + assert [entry.swhid() for entry in directory_with_every_possible_type.entries] == [ + CoreSWHID.from_string("swh:1:cnt:37ec8ea2110c0b7a32fbb0e872f6e7debbf95e21"), + CoreSWHID.from_string("swh:1:dir:61e6e867f5d7ba3b40540869bc050b0c4fed9e95"), + CoreSWHID.from_string("swh:1:rev:3d531e169db92a16a9a8974f0ae6edf52e52659e"), + ] + + # Release @@ -1129,6 +1196,13 @@ def test_release_raw_manifest(release): release2.check() +def test_release_target_swhid(): + release = Release.from_dict(release_example) + assert release.target_swhid() == CoreSWHID.from_string( + "swh:1:rev:741b2252a5e14d6c60a913c77a6099abe73a854a" + ) + + # Revision @@ -1379,6 +1453,55 @@ def test_revision_none_author_or_committer(): Revision.from_dict(rev_dict) +def test_revision_directory_swhid(): + revision = Revision.from_dict(revision_example) + assert revision.directory_swhid() == CoreSWHID.from_string( + "swh:1:dir:85a74718d377195e1efd0843ba4f3260bad4fe07" + ) + + +def test_revision_parent_swhids(): + revision_d = revision_example.copy() + revision_d["parents"].append( + hash_to_bytes("b2a7e1260492e344fab3cbf91bc13c91e05426fd") + ) + revision = Revision.from_dict(revision_d) + assert revision.parent_swhids() == [ + CoreSWHID.from_string("swh:1:rev:01e2d0627a9a6edb24c37db45db5ecb31e9de808"), + CoreSWHID.from_string("swh:1:rev:b2a7e1260492e344fab3cbf91bc13c91e05426fd"), + ] + + +@pytest.fixture +def snapshot_with_all_types(): + return Snapshot.from_dict(snapshot_example) + + +def test_snapshot_branch_swhids(snapshot_with_all_types): + assert { + name: branch and branch.swhid() + for (name, branch) in snapshot_with_all_types.branches.items() + } == { + b"directory": CoreSWHID.from_string( + "swh:1:dir:1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8" + ), + b"content": CoreSWHID.from_string( + "swh:1:cnt:fe95a46679d128ff167b7c55df5d02356c5a1ae1" + ), + b"alias": None, + b"revision": CoreSWHID.from_string( + "swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6" + ), + b"release": CoreSWHID.from_string( + "swh:1:rel:7045404f3d1c54e6473c71bbb716529fbad4be24" + ), + b"snapshot": CoreSWHID.from_string( + "swh:1:snp:1a8893e6a86f444e8be8e7bda6cb34fb1735a00e" + ), + b"dangling": None, + } + + @given(strategies.objects(split_content=True)) def test_object_type(objtype_and_obj): obj_type, obj = objtype_and_obj -- GitLab