Skip to content
Snippets Groups Projects
Commit 48a46285 authored by Jérémy Bobbio (Lunar)'s avatar Jérémy Bobbio (Lunar)
Browse files

Add several helper methods returning SWHIDs

This adds several helper methods returning SWHIDs to model objects,
namely:

- SkippedContent.swhid()
- DirectoryEntry.swhid()
- SnapshotBranch.swhid()
- Release.target_swhid()
- Revision.directory_swhid() and Release.parent_swhids()
- OriginVisitStatus.origin_swhid() and
  OriginVisitStatus.snapshot_swhid()
parent 85ec820b
No related branches found
Tags v6.7.0
No related merge requests found
...@@ -858,6 +858,14 @@ class OriginVisitStatus(BaseModel): ...@@ -858,6 +858,14 @@ class OriginVisitStatus(BaseModel):
def unique_key(self) -> KeyType: def unique_key(self) -> KeyType:
return {"origin": self.origin, "visit": str(self.visit), "date": str(self.date)} return {"origin": self.origin, "visit": str(self.visit), "date": str(self.date)}
def origin_swhid(self) -> ExtendedSWHID:
return Origin(url=self.origin).swhid()
def snapshot_swhid(self) -> Optional[CoreSWHID]:
if self.snapshot is None:
return None
return CoreSWHID(object_type=SwhidObjectType.SNAPSHOT, object_id=self.snapshot)
class TargetType(Enum): class TargetType(Enum):
"""The type of content pointed to by a snapshot branch. Usually a """The type of content pointed to by a snapshot branch. Usually a
...@@ -910,6 +918,15 @@ class SnapshotBranch(BaseModel): ...@@ -910,6 +918,15 @@ class SnapshotBranch(BaseModel):
def from_dict(cls, d): def from_dict(cls, d):
return cls(target=d["target"], target_type=TargetType(d["target_type"])) return cls(target=d["target"], target_type=TargetType(d["target_type"]))
def swhid(self) -> Optional[CoreSWHID]:
"""Returns a SWHID for the current branch or None if the branch has no
target or is an alias."""
if self.target is None or self.target_type == TargetType.ALIAS:
return None
return CoreSWHID(
object_id=self.target, object_type=SwhidObjectType[self.target_type.name]
)
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class Snapshot(HashableObject, BaseModel): class Snapshot(HashableObject, BaseModel):
...@@ -1005,6 +1022,14 @@ class Release(HashableObjectWithManifest, BaseModel): ...@@ -1005,6 +1022,14 @@ class Release(HashableObjectWithManifest, BaseModel):
"""Returns a SWHID representing this object.""" """Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.RELEASE, object_id=self.id) return CoreSWHID(object_type=SwhidObjectType.RELEASE, object_id=self.id)
def target_swhid(self) -> Optional[CoreSWHID]:
"""Returns the SWHID for the target of this release or None if unset."""
if self.target is None:
return None
return CoreSWHID(
object_id=self.target, object_type=SwhidObjectType[self.target_type.name]
)
def anonymize(self) -> "Release": def anonymize(self) -> "Release":
"""Returns an anonymized version of the Release object. """Returns an anonymized version of the Release object.
...@@ -1133,6 +1158,19 @@ class Revision(HashableObjectWithManifest, BaseModel): ...@@ -1133,6 +1158,19 @@ class Revision(HashableObjectWithManifest, BaseModel):
"""Returns a SWHID representing this object.""" """Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=self.id) return CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=self.id)
def directory_swhid(self) -> CoreSWHID:
"""Returns the SWHID for the directory referenced by the revision."""
return CoreSWHID(
object_type=SwhidObjectType.DIRECTORY, object_id=self.directory
)
def parent_swhids(self) -> List[CoreSWHID]:
"""Returns a list of SWHID for the parent revisions."""
return [
CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=parent)
for parent in self.parents
]
def anonymize(self) -> "Revision": def anonymize(self) -> "Revision":
"""Returns an anonymized version of the Revision object. """Returns an anonymized version of the Revision object.
...@@ -1159,6 +1197,12 @@ class DirectoryEntry(BaseModel): ...@@ -1159,6 +1197,12 @@ class DirectoryEntry(BaseModel):
perms = attr.ib(type=int, validator=generic_type_validator, converter=int, repr=oct) perms = attr.ib(type=int, validator=generic_type_validator, converter=int, repr=oct)
"""Usually one of the values of `swh.model.from_disk.DentryPerms`.""" """Usually one of the values of `swh.model.from_disk.DentryPerms`."""
DIR_ENTRY_TYPE_TO_SWHID_OBJECT_TYPE = {
"file": SwhidObjectType.CONTENT,
"dir": SwhidObjectType.DIRECTORY,
"rev": SwhidObjectType.REVISION,
}
@name.validator @name.validator
def check_name(self, attribute, value): def check_name(self, attribute, value):
if value.__class__ is not bytes: if value.__class__ is not bytes:
...@@ -1166,6 +1210,13 @@ class DirectoryEntry(BaseModel): ...@@ -1166,6 +1210,13 @@ class DirectoryEntry(BaseModel):
if b"/" in value: if b"/" in value:
raise ValueError(f"{value!r} is not a valid directory entry name.") raise ValueError(f"{value!r} is not a valid directory entry name.")
def swhid(self) -> CoreSWHID:
"""Returns a SWHID for this directory entry"""
return CoreSWHID(
object_type=DirectoryEntry.DIR_ENTRY_TYPE_TO_SWHID_OBJECT_TYPE[self.type],
object_id=self.target,
)
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class Directory(HashableObjectWithManifest, BaseModel): class Directory(HashableObjectWithManifest, BaseModel):
...@@ -1505,6 +1556,12 @@ class SkippedContent(BaseContent): ...@@ -1505,6 +1556,12 @@ class SkippedContent(BaseContent):
def unique_key(self) -> KeyType: def unique_key(self) -> KeyType:
return self.hashes() return self.hashes()
def swhid(self) -> Optional[CoreSWHID]:
"""Returns a SWHID representing this object or None if unset."""
if self.sha1_git is None:
return None
return CoreSWHID(object_type=SwhidObjectType.CONTENT, object_id=self.sha1_git)
class MetadataAuthorityType(Enum): class MetadataAuthorityType(Enum):
DEPOSIT_CLIENT = "deposit_client" DEPOSIT_CLIENT = "deposit_client"
......
...@@ -426,6 +426,30 @@ def test_origin_visit_status_naive_datetime(): ...@@ -426,6 +426,30 @@ def test_origin_visit_status_naive_datetime():
) )
@pytest.fixture
def origin_visit_status_example():
tz = datetime.timezone(datetime.timedelta(minutes=+60))
return OriginVisitStatus(
origin="http://foo/",
visit=42,
date=datetime.datetime.now(tz=tz),
status="full",
snapshot=hash_to_bytes("6e65b86363953b780d92b0a928f3e8fcdd10db36"),
)
def test_origin_visit_status_snapshot_swhid(origin_visit_status_example):
assert origin_visit_status_example.snapshot_swhid() == CoreSWHID.from_string(
"swh:1:snp:6e65b86363953b780d92b0a928f3e8fcdd10db36"
)
def test_origin_visit_status_origin_swhid(origin_visit_status_example):
assert origin_visit_status_example.origin_swhid() == ExtendedSWHID.from_string(
"swh:1:ori:e0cee4b024ab93b037a1c182865942f5430c6fa4"
)
# Timestamp # Timestamp
...@@ -895,6 +919,13 @@ def test_skipped_content_naive_datetime(): ...@@ -895,6 +919,13 @@ def test_skipped_content_naive_datetime():
) )
def test_skipped_content_swhid():
skipped_content = SkippedContent.from_data(b"foo", reason="reason")
assert skipped_content.swhid() == CoreSWHID.from_string(
"swh:1:cnt:19102815663d23f8b75a47e7a01965dcdc96468c"
)
# Directory # Directory
...@@ -1092,6 +1123,42 @@ def test_directory_from_possibly_duplicated_entries__preserve_manifest(): ...@@ -1092,6 +1123,42 @@ def test_directory_from_possibly_duplicated_entries__preserve_manifest():
assert dir_.raw_manifest == b"blah" assert dir_.raw_manifest == b"blah"
@pytest.fixture
def directory_with_every_possible_type():
return Directory.from_dict(
{
"entries": [
{
"type": "file",
"perms": 33188,
"name": b"README",
"target": hash_to_bytes("37ec8ea2110c0b7a32fbb0e872f6e7debbf95e21"),
},
{
"type": "dir",
"perms": 16384,
"name": b"src",
"target": hash_to_bytes("61e6e867f5d7ba3b40540869bc050b0c4fed9e95"),
},
{
"type": "rev",
"perms": 57344,
"name": b"submodule",
"target": hash_to_bytes("3d531e169db92a16a9a8974f0ae6edf52e52659e"),
},
],
}
)
def test_directory_entry_swhids(directory_with_every_possible_type):
assert [entry.swhid() for entry in directory_with_every_possible_type.entries] == [
CoreSWHID.from_string("swh:1:cnt:37ec8ea2110c0b7a32fbb0e872f6e7debbf95e21"),
CoreSWHID.from_string("swh:1:dir:61e6e867f5d7ba3b40540869bc050b0c4fed9e95"),
CoreSWHID.from_string("swh:1:rev:3d531e169db92a16a9a8974f0ae6edf52e52659e"),
]
# Release # Release
...@@ -1129,6 +1196,13 @@ def test_release_raw_manifest(release): ...@@ -1129,6 +1196,13 @@ def test_release_raw_manifest(release):
release2.check() release2.check()
def test_release_target_swhid():
release = Release.from_dict(release_example)
assert release.target_swhid() == CoreSWHID.from_string(
"swh:1:rev:741b2252a5e14d6c60a913c77a6099abe73a854a"
)
# Revision # Revision
...@@ -1379,6 +1453,55 @@ def test_revision_none_author_or_committer(): ...@@ -1379,6 +1453,55 @@ def test_revision_none_author_or_committer():
Revision.from_dict(rev_dict) Revision.from_dict(rev_dict)
def test_revision_directory_swhid():
revision = Revision.from_dict(revision_example)
assert revision.directory_swhid() == CoreSWHID.from_string(
"swh:1:dir:85a74718d377195e1efd0843ba4f3260bad4fe07"
)
def test_revision_parent_swhids():
revision_d = revision_example.copy()
revision_d["parents"].append(
hash_to_bytes("b2a7e1260492e344fab3cbf91bc13c91e05426fd")
)
revision = Revision.from_dict(revision_d)
assert revision.parent_swhids() == [
CoreSWHID.from_string("swh:1:rev:01e2d0627a9a6edb24c37db45db5ecb31e9de808"),
CoreSWHID.from_string("swh:1:rev:b2a7e1260492e344fab3cbf91bc13c91e05426fd"),
]
@pytest.fixture
def snapshot_with_all_types():
return Snapshot.from_dict(snapshot_example)
def test_snapshot_branch_swhids(snapshot_with_all_types):
assert {
name: branch and branch.swhid()
for (name, branch) in snapshot_with_all_types.branches.items()
} == {
b"directory": CoreSWHID.from_string(
"swh:1:dir:1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8"
),
b"content": CoreSWHID.from_string(
"swh:1:cnt:fe95a46679d128ff167b7c55df5d02356c5a1ae1"
),
b"alias": None,
b"revision": CoreSWHID.from_string(
"swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6"
),
b"release": CoreSWHID.from_string(
"swh:1:rel:7045404f3d1c54e6473c71bbb716529fbad4be24"
),
b"snapshot": CoreSWHID.from_string(
"swh:1:snp:1a8893e6a86f444e8be8e7bda6cb34fb1735a00e"
),
b"dangling": None,
}
@given(strategies.objects(split_content=True)) @given(strategies.objects(split_content=True))
def test_object_type(objtype_and_obj): def test_object_type(objtype_and_obj):
obj_type, obj = objtype_and_obj obj_type, obj = objtype_and_obj
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment