Skip to content
Snippets Groups Projects
Commit 7fadf727 authored by Jenkins for Software Heritage's avatar Jenkins for Software Heritage
Browse files

New upstream version 6.7.0

parents c4e85c2a 48a46285
No related branches found
Tags debian/upstream/6.7.0
No related merge requests found
Metadata-Version: 2.1 Metadata-Version: 2.1
Name: swh.model Name: swh.model
Version: 6.6.3 Version: 6.7.0
Summary: Software Heritage data model Summary: Software Heritage data model
Home-page: https://forge.softwareheritage.org/diffusion/DMOD/ Home-page: https://forge.softwareheritage.org/diffusion/DMOD/
Author: Software Heritage developers Author: Software Heritage developers
......
Metadata-Version: 2.1 Metadata-Version: 2.1
Name: swh.model Name: swh.model
Version: 6.6.3 Version: 6.7.0
Summary: Software Heritage data model Summary: Software Heritage data model
Home-page: https://forge.softwareheritage.org/diffusion/DMOD/ Home-page: https://forge.softwareheritage.org/diffusion/DMOD/
Author: Software Heritage developers Author: Software Heritage developers
......
...@@ -858,6 +858,14 @@ class OriginVisitStatus(BaseModel): ...@@ -858,6 +858,14 @@ class OriginVisitStatus(BaseModel):
def unique_key(self) -> KeyType: def unique_key(self) -> KeyType:
return {"origin": self.origin, "visit": str(self.visit), "date": str(self.date)} return {"origin": self.origin, "visit": str(self.visit), "date": str(self.date)}
def origin_swhid(self) -> ExtendedSWHID:
return Origin(url=self.origin).swhid()
def snapshot_swhid(self) -> Optional[CoreSWHID]:
if self.snapshot is None:
return None
return CoreSWHID(object_type=SwhidObjectType.SNAPSHOT, object_id=self.snapshot)
class TargetType(Enum): class TargetType(Enum):
"""The type of content pointed to by a snapshot branch. Usually a """The type of content pointed to by a snapshot branch. Usually a
...@@ -910,6 +918,15 @@ class SnapshotBranch(BaseModel): ...@@ -910,6 +918,15 @@ class SnapshotBranch(BaseModel):
def from_dict(cls, d): def from_dict(cls, d):
return cls(target=d["target"], target_type=TargetType(d["target_type"])) return cls(target=d["target"], target_type=TargetType(d["target_type"]))
def swhid(self) -> Optional[CoreSWHID]:
"""Returns a SWHID for the current branch or None if the branch has no
target or is an alias."""
if self.target is None or self.target_type == TargetType.ALIAS:
return None
return CoreSWHID(
object_id=self.target, object_type=SwhidObjectType[self.target_type.name]
)
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class Snapshot(HashableObject, BaseModel): class Snapshot(HashableObject, BaseModel):
...@@ -1005,6 +1022,14 @@ class Release(HashableObjectWithManifest, BaseModel): ...@@ -1005,6 +1022,14 @@ class Release(HashableObjectWithManifest, BaseModel):
"""Returns a SWHID representing this object.""" """Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.RELEASE, object_id=self.id) return CoreSWHID(object_type=SwhidObjectType.RELEASE, object_id=self.id)
def target_swhid(self) -> Optional[CoreSWHID]:
"""Returns the SWHID for the target of this release or None if unset."""
if self.target is None:
return None
return CoreSWHID(
object_id=self.target, object_type=SwhidObjectType[self.target_type.name]
)
def anonymize(self) -> "Release": def anonymize(self) -> "Release":
"""Returns an anonymized version of the Release object. """Returns an anonymized version of the Release object.
...@@ -1133,6 +1158,19 @@ class Revision(HashableObjectWithManifest, BaseModel): ...@@ -1133,6 +1158,19 @@ class Revision(HashableObjectWithManifest, BaseModel):
"""Returns a SWHID representing this object.""" """Returns a SWHID representing this object."""
return CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=self.id) return CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=self.id)
def directory_swhid(self) -> CoreSWHID:
"""Returns the SWHID for the directory referenced by the revision."""
return CoreSWHID(
object_type=SwhidObjectType.DIRECTORY, object_id=self.directory
)
def parent_swhids(self) -> List[CoreSWHID]:
"""Returns a list of SWHID for the parent revisions."""
return [
CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=parent)
for parent in self.parents
]
def anonymize(self) -> "Revision": def anonymize(self) -> "Revision":
"""Returns an anonymized version of the Revision object. """Returns an anonymized version of the Revision object.
...@@ -1159,6 +1197,12 @@ class DirectoryEntry(BaseModel): ...@@ -1159,6 +1197,12 @@ class DirectoryEntry(BaseModel):
perms = attr.ib(type=int, validator=generic_type_validator, converter=int, repr=oct) perms = attr.ib(type=int, validator=generic_type_validator, converter=int, repr=oct)
"""Usually one of the values of `swh.model.from_disk.DentryPerms`.""" """Usually one of the values of `swh.model.from_disk.DentryPerms`."""
DIR_ENTRY_TYPE_TO_SWHID_OBJECT_TYPE = {
"file": SwhidObjectType.CONTENT,
"dir": SwhidObjectType.DIRECTORY,
"rev": SwhidObjectType.REVISION,
}
@name.validator @name.validator
def check_name(self, attribute, value): def check_name(self, attribute, value):
if value.__class__ is not bytes: if value.__class__ is not bytes:
...@@ -1166,6 +1210,13 @@ class DirectoryEntry(BaseModel): ...@@ -1166,6 +1210,13 @@ class DirectoryEntry(BaseModel):
if b"/" in value: if b"/" in value:
raise ValueError(f"{value!r} is not a valid directory entry name.") raise ValueError(f"{value!r} is not a valid directory entry name.")
def swhid(self) -> CoreSWHID:
"""Returns a SWHID for this directory entry"""
return CoreSWHID(
object_type=DirectoryEntry.DIR_ENTRY_TYPE_TO_SWHID_OBJECT_TYPE[self.type],
object_id=self.target,
)
@attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators) @attr.s(frozen=True, slots=True, field_transformer=optimize_all_validators)
class Directory(HashableObjectWithManifest, BaseModel): class Directory(HashableObjectWithManifest, BaseModel):
...@@ -1505,6 +1556,12 @@ class SkippedContent(BaseContent): ...@@ -1505,6 +1556,12 @@ class SkippedContent(BaseContent):
def unique_key(self) -> KeyType: def unique_key(self) -> KeyType:
return self.hashes() return self.hashes()
def swhid(self) -> Optional[CoreSWHID]:
"""Returns a SWHID representing this object or None if unset."""
if self.sha1_git is None:
return None
return CoreSWHID(object_type=SwhidObjectType.CONTENT, object_id=self.sha1_git)
class MetadataAuthorityType(Enum): class MetadataAuthorityType(Enum):
DEPOSIT_CLIENT = "deposit_client" DEPOSIT_CLIENT = "deposit_client"
......
...@@ -426,6 +426,30 @@ def test_origin_visit_status_naive_datetime(): ...@@ -426,6 +426,30 @@ def test_origin_visit_status_naive_datetime():
) )
@pytest.fixture
def origin_visit_status_example():
tz = datetime.timezone(datetime.timedelta(minutes=+60))
return OriginVisitStatus(
origin="http://foo/",
visit=42,
date=datetime.datetime.now(tz=tz),
status="full",
snapshot=hash_to_bytes("6e65b86363953b780d92b0a928f3e8fcdd10db36"),
)
def test_origin_visit_status_snapshot_swhid(origin_visit_status_example):
assert origin_visit_status_example.snapshot_swhid() == CoreSWHID.from_string(
"swh:1:snp:6e65b86363953b780d92b0a928f3e8fcdd10db36"
)
def test_origin_visit_status_origin_swhid(origin_visit_status_example):
assert origin_visit_status_example.origin_swhid() == ExtendedSWHID.from_string(
"swh:1:ori:e0cee4b024ab93b037a1c182865942f5430c6fa4"
)
# Timestamp # Timestamp
...@@ -895,6 +919,13 @@ def test_skipped_content_naive_datetime(): ...@@ -895,6 +919,13 @@ def test_skipped_content_naive_datetime():
) )
def test_skipped_content_swhid():
skipped_content = SkippedContent.from_data(b"foo", reason="reason")
assert skipped_content.swhid() == CoreSWHID.from_string(
"swh:1:cnt:19102815663d23f8b75a47e7a01965dcdc96468c"
)
# Directory # Directory
...@@ -1092,6 +1123,42 @@ def test_directory_from_possibly_duplicated_entries__preserve_manifest(): ...@@ -1092,6 +1123,42 @@ def test_directory_from_possibly_duplicated_entries__preserve_manifest():
assert dir_.raw_manifest == b"blah" assert dir_.raw_manifest == b"blah"
@pytest.fixture
def directory_with_every_possible_type():
return Directory.from_dict(
{
"entries": [
{
"type": "file",
"perms": 33188,
"name": b"README",
"target": hash_to_bytes("37ec8ea2110c0b7a32fbb0e872f6e7debbf95e21"),
},
{
"type": "dir",
"perms": 16384,
"name": b"src",
"target": hash_to_bytes("61e6e867f5d7ba3b40540869bc050b0c4fed9e95"),
},
{
"type": "rev",
"perms": 57344,
"name": b"submodule",
"target": hash_to_bytes("3d531e169db92a16a9a8974f0ae6edf52e52659e"),
},
],
}
)
def test_directory_entry_swhids(directory_with_every_possible_type):
assert [entry.swhid() for entry in directory_with_every_possible_type.entries] == [
CoreSWHID.from_string("swh:1:cnt:37ec8ea2110c0b7a32fbb0e872f6e7debbf95e21"),
CoreSWHID.from_string("swh:1:dir:61e6e867f5d7ba3b40540869bc050b0c4fed9e95"),
CoreSWHID.from_string("swh:1:rev:3d531e169db92a16a9a8974f0ae6edf52e52659e"),
]
# Release # Release
...@@ -1129,6 +1196,13 @@ def test_release_raw_manifest(release): ...@@ -1129,6 +1196,13 @@ def test_release_raw_manifest(release):
release2.check() release2.check()
def test_release_target_swhid():
release = Release.from_dict(release_example)
assert release.target_swhid() == CoreSWHID.from_string(
"swh:1:rev:741b2252a5e14d6c60a913c77a6099abe73a854a"
)
# Revision # Revision
...@@ -1379,6 +1453,55 @@ def test_revision_none_author_or_committer(): ...@@ -1379,6 +1453,55 @@ def test_revision_none_author_or_committer():
Revision.from_dict(rev_dict) Revision.from_dict(rev_dict)
def test_revision_directory_swhid():
revision = Revision.from_dict(revision_example)
assert revision.directory_swhid() == CoreSWHID.from_string(
"swh:1:dir:85a74718d377195e1efd0843ba4f3260bad4fe07"
)
def test_revision_parent_swhids():
revision_d = revision_example.copy()
revision_d["parents"].append(
hash_to_bytes("b2a7e1260492e344fab3cbf91bc13c91e05426fd")
)
revision = Revision.from_dict(revision_d)
assert revision.parent_swhids() == [
CoreSWHID.from_string("swh:1:rev:01e2d0627a9a6edb24c37db45db5ecb31e9de808"),
CoreSWHID.from_string("swh:1:rev:b2a7e1260492e344fab3cbf91bc13c91e05426fd"),
]
@pytest.fixture
def snapshot_with_all_types():
return Snapshot.from_dict(snapshot_example)
def test_snapshot_branch_swhids(snapshot_with_all_types):
assert {
name: branch and branch.swhid()
for (name, branch) in snapshot_with_all_types.branches.items()
} == {
b"directory": CoreSWHID.from_string(
"swh:1:dir:1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8"
),
b"content": CoreSWHID.from_string(
"swh:1:cnt:fe95a46679d128ff167b7c55df5d02356c5a1ae1"
),
b"alias": None,
b"revision": CoreSWHID.from_string(
"swh:1:rev:aafb16d69fd30ff58afdd69036a26047f3aebdc6"
),
b"release": CoreSWHID.from_string(
"swh:1:rel:7045404f3d1c54e6473c71bbb716529fbad4be24"
),
b"snapshot": CoreSWHID.from_string(
"swh:1:snp:1a8893e6a86f444e8be8e7bda6cb34fb1735a00e"
),
b"dangling": None,
}
@given(strategies.objects(split_content=True)) @given(strategies.objects(split_content=True))
def test_object_type(objtype_and_obj): def test_object_type(objtype_and_obj):
obj_type, obj = objtype_and_obj obj_type, obj = objtype_and_obj
......
...@@ -43,7 +43,7 @@ commands = ...@@ -43,7 +43,7 @@ commands =
extras = extras =
testing testing
deps = deps =
mypy==1.0 mypy==1.0.1
commands = commands =
mypy swh mypy swh
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment