diff --git a/PKG-INFO b/PKG-INFO index caf33326e64936b810d5b70475ed0a9fe100199b..0c672ccc3cfe47bd47e45366ab20078286bf907c 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: swh.model -Version: 0.9.0 +Version: 0.10.0 Summary: Software Heritage data model Home-page: https://forge.softwareheritage.org/diffusion/DMOD/ Author: Software Heritage developers diff --git a/swh.model.egg-info/PKG-INFO b/swh.model.egg-info/PKG-INFO index caf33326e64936b810d5b70475ed0a9fe100199b..0c672ccc3cfe47bd47e45366ab20078286bf907c 100644 --- a/swh.model.egg-info/PKG-INFO +++ b/swh.model.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: swh.model -Version: 0.9.0 +Version: 0.10.0 Summary: Software Heritage data model Home-page: https://forge.softwareheritage.org/diffusion/DMOD/ Author: Software Heritage developers diff --git a/swh/model/from_disk.py b/swh/model/from_disk.py index c46bcd6c873f66d6ee36fc30a75612cdc4bbe809..ce170798ced7af70dcc3e1423dafa98fd1c639e4 100644 --- a/swh/model/from_disk.py +++ b/swh/model/from_disk.py @@ -25,7 +25,7 @@ from .identifiers import identifier_to_str as id_to_str from .merkle import MerkleLeaf, MerkleNode -@attr.s +@attr.s(frozen=True, slots=True) class DiskBackedContent(model.BaseContent): """Content-like class, which allows lazy-loading data from the disk.""" diff --git a/swh/model/hypothesis_strategies.py b/swh/model/hypothesis_strategies.py index 0c54a994e7d6dbb0df372868faea5f8a2c81be58..da04769df07274b2c41fcb4642097a9cbe11e560 100644 --- a/swh/model/hypothesis_strategies.py +++ b/swh/model/hypothesis_strategies.py @@ -174,6 +174,7 @@ def origin_visit_statuses_d(): dict, visit=integers(1, 1000), origin=urls(), + type=optional(sampled_from(["git", "svn", "pypi", "debian"])), status=sampled_from(["created", "ongoing", "full", "partial"]), date=aware_datetimes(), snapshot=optional(sha1_git()), diff --git a/swh/model/identifiers.py b/swh/model/identifiers.py index 274cb3563b907cbcea74f661ea0bc17b128677fa..98843a5a92ed2abc10d48e6330d0b0f3919c86f3 100644 --- a/swh/model/identifiers.py +++ b/swh/model/identifiers.py @@ -29,6 +29,16 @@ SWHID_VERSION = 1 SWHID_TYPES = ["ori", "snp", "rel", "rev", "dir", "cnt"] SWHID_SEP = ":" SWHID_CTXT_SEP = ";" +SWHID_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"} + +SWHID_RE_RAW = ( + f"(?P<scheme>{SWHID_NAMESPACE})" + f"{SWHID_SEP}(?P<version>{SWHID_VERSION})" + f"{SWHID_SEP}(?P<object_type>{'|'.join(SWHID_TYPES)})" + f"{SWHID_SEP}(?P<object_id>[0-9a-f]{{40}})" + f"({SWHID_CTXT_SEP}(?P<qualifiers>\\S+))?" +) +SWHID_RE = re.compile(SWHID_RE_RAW) @lru_cache() @@ -677,6 +687,15 @@ _object_type_map = { CONTENT: {"short_name": "cnt", "key_id": "sha1_git"}, } +_swhid_type_map = { + "ori": ORIGIN, + "snp": SNAPSHOT, + "rel": RELEASE, + "rev": REVISION, + "dir": DIRECTORY, + "cnt": CONTENT, +} + @attr.s(frozen=True) class SWHID: @@ -717,8 +736,8 @@ class SWHID: # 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0' """ - namespace = attr.ib(type=str, default="swh") - scheme_version = attr.ib(type=int, default=1) + namespace = attr.ib(type=str, default=SWHID_NAMESPACE) + scheme_version = attr.ib(type=int, default=SWHID_VERSION) object_type = attr.ib(type=str, default="") object_id = attr.ib(type=str, converter=hash_to_hex, default="") # type: ignore metadata = attr.ib( @@ -729,28 +748,43 @@ class SWHID: def check_namespace(self, attribute, value): if value != SWHID_NAMESPACE: raise ValidationError( - f"Invalid SWHID: namespace is '{value}' but must be '{SWHID_NAMESPACE}'" + "Invalid SWHID: invalid namespace: %(namespace)s", + params={"namespace": value}, ) @scheme_version.validator def check_scheme_version(self, attribute, value): if value != SWHID_VERSION: raise ValidationError( - f"Invalid SWHID: version is {value} but must be {SWHID_VERSION}" + "Invalid SWHID: invalid version: %(version)s", params={"version": value} ) @object_type.validator def check_object_type(self, attribute, value): if value not in _object_type_map: - supported_types = ", ".join(_object_type_map.keys()) raise ValidationError( - f"Invalid SWHID: object type is {value} but must be " - f"one of {supported_types}" + "Invalid SWHID: invalid type: %(object_type)s)", + params={"object_type": value}, ) @object_id.validator def check_object_id(self, attribute, value): - validate_sha1(value) # can raise if invalid hash + try: + validate_sha1(value) # can raise if invalid hash + except ValidationError: + raise ValidationError( + "Invalid SWHID: invalid checksum: %(object_id)s", + params={"object_id": value}, + ) from None + + @metadata.validator + def check_qualifiers(self, attribute, value): + for k in value: + if k not in SWHID_QUALIFIERS: + raise ValidationError( + "Invalid SWHID: unknown qualifier: %(qualifier)s", + params={"qualifier": k}, + ) def to_dict(self) -> Dict[str, Any]: return attr.asdict(self) @@ -801,77 +835,44 @@ def swhid( return str(swhid) -CONTEXT_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"} - - def parse_swhid(swhid: str) -> SWHID: - """Parse :ref:`persistent-identifiers`. + """Parse a Software Heritage identifier (SWHID) from string (see: + :ref:`persistent-identifiers`.) Args: swhid (str): A persistent identifier - Raises: - swh.model.exceptions.ValidationError: in case of: - - * missing mandatory values (4) - * invalid namespace supplied - * invalid version supplied - * invalid type supplied - * missing hash - * invalid hash identifier supplied - Returns: a named tuple holding the parsing result - """ - if re.search(r"[ \t\n\r\f\v]", swhid): - raise ValidationError("Invalid SwHID: SWHIDs cannot contain whitespaces") - - # <swhid>;<contextual-information> - swhid_parts = swhid.split(SWHID_CTXT_SEP) - swhid_data = swhid_parts.pop(0).split(":") - - if len(swhid_data) != 4: - raise ValidationError( - "Invalid SWHID, format must be 'swh:1:OBJECT_TYPE:OBJECT_ID'" - ) - - # Checking for parsing errors - _ns, _version, _type, _id = swhid_data - - for otype, data in _object_type_map.items(): - if _type == data["short_name"]: - _type = otype - break + Raises: + swh.model.exceptions.ValidationError: if passed string is not a valid SWHID - if not _id: + """ + m = SWHID_RE.fullmatch(swhid) + if not m: raise ValidationError( - "Invalid SWHID: missing OBJECT_ID (as a 40 hex digit string)" - ) - - _metadata = {} - for part in swhid_parts: - try: - qualifier, val = part.split("=") - _metadata[qualifier] = val - except Exception: - raise ValidationError( - "Invalid SWHID: contextual data must be a ;-separated list of " - "key=value pairs" - ) - - wrong_qualifiers = set(_metadata) - set(CONTEXT_QUALIFIERS) - if wrong_qualifiers: - error_msg = ( - f"Invalid SWHID: Wrong qualifiers {', '.join(wrong_qualifiers)}. " - f"The qualifiers must be one of {', '.join(CONTEXT_QUALIFIERS)}" + "Invalid SWHID: invalid syntax: %(swhid)s", params={"swhid": swhid} ) - raise ValidationError(error_msg) + parts = m.groupdict() + + _qualifiers = {} + qualifiers_raw = parts["qualifiers"] + if qualifiers_raw: + for qualifier in qualifiers_raw.split(SWHID_CTXT_SEP): + try: + k, v = qualifier.split("=") + except ValueError: + raise ValidationError( + "Invalid SWHID: invalid qualifier: %(qualifier)s", + params={"qualifier": qualifier}, + ) + _qualifiers[k] = v return SWHID( - _ns, - int(_version), - _type, - _id, - _metadata, # type: ignore # mypy can't properly unify types + parts["scheme"], + int(parts["version"]), + _swhid_type_map[parts["object_type"]], + parts["object_id"], + _qualifiers, # type: ignore # mypy can't properly unify types ) diff --git a/swh/model/model.py b/swh/model/model.py index 6c49c5f968c9e8ab51df599b6bd659b38c6acf80..e373e2065be760b86168ef93e3fa34f65f90daef 100644 --- a/swh/model/model.py +++ b/swh/model/model.py @@ -84,6 +84,8 @@ class BaseModel: Provides serialization/deserialization to/from Python dictionaries, that are suitable for JSON/msgpack-like formats.""" + __slots__ = () + def to_dict(self): """Wrapper of `attr.asdict` that can be overridden by subclasses that have special handling of some of the fields.""" @@ -112,6 +114,8 @@ class HashableObject(metaclass=ABCMeta): """Mixin to automatically compute object identifier hash when the associated model is instantiated.""" + __slots__ = () + @abstractmethod def compute_hash(self) -> bytes: """Derived model classes must implement this to compute @@ -131,7 +135,7 @@ class HashableObject(metaclass=ABCMeta): return self.id # type: ignore -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class Person(BaseModel): """Represents the author/committer of a revision or release.""" @@ -185,7 +189,7 @@ class Person(BaseModel): return Person(fullname=sha256(self.fullname).digest(), name=None, email=None,) -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class Timestamp(BaseModel): """Represents a naive timestamp from a VCS.""" @@ -207,7 +211,7 @@ class Timestamp(BaseModel): raise ValueError("Microseconds must be in [0, 1000000[.") -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class TimestampWithTimezone(BaseModel): """Represents a TZ-aware timestamp from a VCS.""" @@ -259,7 +263,7 @@ class TimestampWithTimezone(BaseModel): return tstz -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class Origin(BaseModel): """Represents a software source: a VCS and an URL.""" @@ -271,7 +275,7 @@ class Origin(BaseModel): return {"url": self.url} -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class OriginVisit(BaseModel): """Represents an origin visit with a given type at a given point in time, by a SWH loader.""" @@ -302,7 +306,7 @@ class OriginVisit(BaseModel): return {"origin": self.origin, "date": str(self.date)} -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class OriginVisitStatus(BaseModel): """Represents a visit update of an origin at a given point in time. @@ -319,6 +323,8 @@ class OriginVisitStatus(BaseModel): validator=attr.validators.in_(["created", "ongoing", "full", "partial"]), ) snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator()) + # Type is optional be to able to use it before adding it to the database model + type = attr.ib(type=Optional[str], validator=type_validator(), default=None) metadata = attr.ib( type=Optional[ImmutableDict[str, object]], validator=type_validator(), @@ -358,7 +364,7 @@ class ObjectType(Enum): SNAPSHOT = "snapshot" -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class SnapshotBranch(BaseModel): """Represents one of the branches of a snapshot.""" @@ -380,7 +386,7 @@ class SnapshotBranch(BaseModel): return cls(target=d["target"], target_type=TargetType(d["target_type"])) -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class Snapshot(HashableObject, BaseModel): """Represents the full state of an origin at a given point in time.""" @@ -408,7 +414,7 @@ class Snapshot(HashableObject, BaseModel): ) -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class Release(HashableObject, BaseModel): object_type: Final = "release" @@ -474,7 +480,7 @@ def tuplify_extra_headers(value: Iterable): return tuple((k, v) for k, v in value) -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class Revision(HashableObject, BaseModel): object_type: Final = "revision" @@ -552,7 +558,7 @@ class Revision(HashableObject, BaseModel): ) -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class DirectoryEntry(BaseModel): object_type: Final = "directory_entry" @@ -563,7 +569,7 @@ class DirectoryEntry(BaseModel): """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class Directory(HashableObject, BaseModel): object_type: Final = "directory" @@ -584,7 +590,7 @@ class Directory(HashableObject, BaseModel): ) -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class BaseContent(BaseModel): status = attr.ib( type=str, validator=attr.validators.in_(["visible", "hidden", "absent"]) @@ -620,7 +626,7 @@ class BaseContent(BaseModel): return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class Content(BaseContent): object_type: Final = "content" @@ -699,7 +705,7 @@ class Content(BaseContent): return self.sha1 # TODO: use a dict of hashes -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class SkippedContent(BaseContent): object_type: Final = "skipped_content" @@ -785,7 +791,7 @@ class MetadataAuthorityType(Enum): REGISTRY = "registry" -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class MetadataAuthority(BaseModel): """Represents an entity that provides metadata about an origin or software artifact.""" @@ -816,7 +822,7 @@ class MetadataAuthority(BaseModel): return {"type": self.type.value, "url": self.url} -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class MetadataFetcher(BaseModel): """Represents a software component used to fetch metadata from a metadata authority, and ingest them into the Software Heritage archive.""" @@ -853,7 +859,7 @@ class MetadataTargetType(Enum): ORIGIN = "origin" -@attr.s(frozen=True) +@attr.s(frozen=True, slots=True) class RawExtrinsicMetadata(BaseModel): object_type: Final = "raw_extrinsic_metadata" diff --git a/swh/model/tests/test_identifiers.py b/swh/model/tests/test_identifiers.py index 73515c6589cf22a91e7828291a39414b74392aee..470f8ed9927ccdb1b09cf4694b5cd1a44202cb82 100644 --- a/swh/model/tests/test_identifiers.py +++ b/swh/model/tests/test_identifiers.py @@ -5,6 +5,7 @@ import binascii import datetime +from typing import Dict import unittest import pytest @@ -23,6 +24,14 @@ from swh.model.identifiers import ( ) +def remove_id(d: Dict) -> Dict: + """Returns a (shallow) copy of a dict with the 'id' key removed.""" + d = d.copy() + if "id" in d: + del d["id"] + return d + + class UtilityFunctionsIdentifier(unittest.TestCase): def setUp(self): self.str_id = "c2e41aae41ac17bd4a650770d6ee77f62e52235b" @@ -218,6 +227,8 @@ directory_example = { ], } +dummy_qualifiers = {"origin": "https://example.com", "lines": "42"} + class DirectoryIdentifier(unittest.TestCase): def setUp(self): @@ -232,17 +243,21 @@ class DirectoryIdentifier(unittest.TestCase): self.assertEqual( identifiers.directory_identifier(self.directory), self.directory["id"] ) + self.assertEqual( + identifiers.directory_identifier(remove_id(self.directory)), + self.directory["id"], + ) def test_dir_identifier_entry_order(self): # Reverse order of entries, check the id is still the same. directory = {"entries": reversed(self.directory["entries"])} self.assertEqual( - identifiers.directory_identifier(directory), self.directory["id"] + identifiers.directory_identifier(remove_id(directory)), self.directory["id"] ) def test_dir_identifier_empty_directory(self): self.assertEqual( - identifiers.directory_identifier(self.empty_directory), + identifiers.directory_identifier(remove_id(self.empty_directory)), self.empty_directory["id"], ) @@ -460,46 +475,52 @@ dg1KdHOa34shrKDaOVzW identifiers.revision_identifier(self.revision), identifiers.identifier_to_str(self.revision["id"]), ) + self.assertEqual( + identifiers.revision_identifier(remove_id(self.revision)), + identifiers.identifier_to_str(self.revision["id"]), + ) def test_revision_identifier_none_metadata(self): self.assertEqual( - identifiers.revision_identifier(self.revision_none_metadata), + identifiers.revision_identifier(remove_id(self.revision_none_metadata)), identifiers.identifier_to_str(self.revision_none_metadata["id"]), ) def test_revision_identifier_synthetic(self): self.assertEqual( - identifiers.revision_identifier(self.synthetic_revision), + identifiers.revision_identifier(remove_id(self.synthetic_revision)), identifiers.identifier_to_str(self.synthetic_revision["id"]), ) def test_revision_identifier_with_extra_headers(self): self.assertEqual( - identifiers.revision_identifier(self.revision_with_extra_headers), + identifiers.revision_identifier( + remove_id(self.revision_with_extra_headers) + ), identifiers.identifier_to_str(self.revision_with_extra_headers["id"]), ) def test_revision_identifier_with_gpgsig(self): self.assertEqual( - identifiers.revision_identifier(self.revision_with_gpgsig), + identifiers.revision_identifier(remove_id(self.revision_with_gpgsig)), identifiers.identifier_to_str(self.revision_with_gpgsig["id"]), ) def test_revision_identifier_no_message(self): self.assertEqual( - identifiers.revision_identifier(self.revision_no_message), + identifiers.revision_identifier(remove_id(self.revision_no_message)), identifiers.identifier_to_str(self.revision_no_message["id"]), ) def test_revision_identifier_empty_message(self): self.assertEqual( - identifiers.revision_identifier(self.revision_empty_message), + identifiers.revision_identifier(remove_id(self.revision_empty_message)), identifiers.identifier_to_str(self.revision_empty_message["id"]), ) def test_revision_identifier_only_fullname(self): self.assertEqual( - identifiers.revision_identifier(self.revision_only_fullname), + identifiers.revision_identifier(remove_id(self.revision_only_fullname)), identifiers.identifier_to_str(self.revision_only_fullname["id"]), ) @@ -620,34 +641,38 @@ o6X/3T+vm8K3bf3driRr34c= identifiers.release_identifier(self.release), identifiers.identifier_to_str(self.release["id"]), ) + self.assertEqual( + identifiers.release_identifier(remove_id(self.release)), + identifiers.identifier_to_str(self.release["id"]), + ) def test_release_identifier_no_author(self): self.assertEqual( - identifiers.release_identifier(self.release_no_author), + identifiers.release_identifier(remove_id(self.release_no_author)), identifiers.identifier_to_str(self.release_no_author["id"]), ) def test_release_identifier_no_message(self): self.assertEqual( - identifiers.release_identifier(self.release_no_message), + identifiers.release_identifier(remove_id(self.release_no_message)), identifiers.identifier_to_str(self.release_no_message["id"]), ) def test_release_identifier_empty_message(self): self.assertEqual( - identifiers.release_identifier(self.release_empty_message), + identifiers.release_identifier(remove_id(self.release_empty_message)), identifiers.identifier_to_str(self.release_empty_message["id"]), ) def test_release_identifier_negative_utc(self): self.assertEqual( - identifiers.release_identifier(self.release_negative_utc), + identifiers.release_identifier(remove_id(self.release_negative_utc)), identifiers.identifier_to_str(self.release_negative_utc["id"]), ) def test_release_identifier_newline_in_author(self): self.assertEqual( - identifiers.release_identifier(self.release_newline_in_author), + identifiers.release_identifier(remove_id(self.release_newline_in_author)), identifiers.identifier_to_str(self.release_newline_in_author["id"]), ) @@ -710,32 +735,154 @@ class SnapshotIdentifier(unittest.TestCase): def test_empty_snapshot(self): self.assertEqual( - identifiers.snapshot_identifier(self.empty), + identifiers.snapshot_identifier(remove_id(self.empty)), identifiers.identifier_to_str(self.empty["id"]), ) def test_dangling_branch(self): self.assertEqual( - identifiers.snapshot_identifier(self.dangling_branch), + identifiers.snapshot_identifier(remove_id(self.dangling_branch)), identifiers.identifier_to_str(self.dangling_branch["id"]), ) def test_unresolved(self): with self.assertRaisesRegex(ValueError, "b'foo' -> b'bar'"): - identifiers.snapshot_identifier(self.unresolved) + identifiers.snapshot_identifier(remove_id(self.unresolved)) def test_unresolved_force(self): self.assertEqual( - identifiers.snapshot_identifier(self.unresolved, ignore_unresolved=True,), + identifiers.snapshot_identifier( + remove_id(self.unresolved), ignore_unresolved=True, + ), identifiers.identifier_to_str(self.unresolved["id"]), ) def test_all_types(self): self.assertEqual( - identifiers.snapshot_identifier(self.all_types), + identifiers.snapshot_identifier(remove_id(self.all_types)), identifiers.identifier_to_str(self.all_types["id"]), ) + +class OriginIdentifier(unittest.TestCase): + def setUp(self): + self.origin = { + "url": "https://github.com/torvalds/linux", + } + + def test_content_identifier(self): + self.assertEqual( + identifiers.origin_identifier(self.origin), + "b63a575fe3faab7692c9f38fb09d4bb45651bb0f", + ) + + +TS_DICTS = [ + ( + {"timestamp": 12345, "offset": 0}, + { + "timestamp": {"seconds": 12345, "microseconds": 0}, + "offset": 0, + "negative_utc": False, + }, + ), + ( + {"timestamp": 12345, "offset": 0, "negative_utc": False}, + { + "timestamp": {"seconds": 12345, "microseconds": 0}, + "offset": 0, + "negative_utc": False, + }, + ), + ( + {"timestamp": 12345, "offset": 0, "negative_utc": False}, + { + "timestamp": {"seconds": 12345, "microseconds": 0}, + "offset": 0, + "negative_utc": False, + }, + ), + ( + {"timestamp": 12345, "offset": 0, "negative_utc": None}, + { + "timestamp": {"seconds": 12345, "microseconds": 0}, + "offset": 0, + "negative_utc": False, + }, + ), + ( + {"timestamp": {"seconds": 12345}, "offset": 0, "negative_utc": None}, + { + "timestamp": {"seconds": 12345, "microseconds": 0}, + "offset": 0, + "negative_utc": False, + }, + ), + ( + { + "timestamp": {"seconds": 12345, "microseconds": 0}, + "offset": 0, + "negative_utc": None, + }, + { + "timestamp": {"seconds": 12345, "microseconds": 0}, + "offset": 0, + "negative_utc": False, + }, + ), + ( + { + "timestamp": {"seconds": 12345, "microseconds": 100}, + "offset": 0, + "negative_utc": None, + }, + { + "timestamp": {"seconds": 12345, "microseconds": 100}, + "offset": 0, + "negative_utc": False, + }, + ), + ( + {"timestamp": 12345, "offset": 0, "negative_utc": True}, + { + "timestamp": {"seconds": 12345, "microseconds": 0}, + "offset": 0, + "negative_utc": True, + }, + ), + ( + {"timestamp": 12345, "offset": 0, "negative_utc": None}, + { + "timestamp": {"seconds": 12345, "microseconds": 0}, + "offset": 0, + "negative_utc": False, + }, + ), +] + + +@pytest.mark.parametrize("dict_input,expected", TS_DICTS) +def test_normalize_timestamp_dict(dict_input, expected): + assert normalize_timestamp(dict_input) == expected + + +TS_DICTS_INVALID_TIMESTAMP = [ + {"timestamp": 1.2, "offset": 0}, + {"timestamp": "1", "offset": 0}, + # these below should really also trigger a ValueError... + # {"timestamp": {"seconds": "1"}, "offset": 0}, + # {"timestamp": {"seconds": 1.2}, "offset": 0}, + # {"timestamp": {"seconds": 1.2}, "offset": 0}, +] + + +@pytest.mark.parametrize("dict_input", TS_DICTS_INVALID_TIMESTAMP) +def test_normalize_timestamp_dict_invalid_timestamp(dict_input): + with pytest.raises(ValueError, match="non-integer timestamp"): + normalize_timestamp(dict_input) + + +class TestSwhid(unittest.TestCase): def test_swhid(self): _snapshot_id = _x("c7c108084bc0bf3d81436bf980b46e98bd338453") _release_id = "22ece559cc7cc2364edc5e5593d63ae8bd229f9f" @@ -843,7 +990,7 @@ class SnapshotIdentifier(unittest.TestCase): for _type, _hash in [ (SNAPSHOT, _snapshot_id), (SNAPSHOT, _snapshot), - ("foo", ""), + ("lines", "42"), ]: with self.assertRaises(ValidationError): identifiers.swhid(_type, _hash) @@ -928,124 +1075,6 @@ class SnapshotIdentifier(unittest.TestCase): ) -class OriginIdentifier(unittest.TestCase): - def setUp(self): - self.origin = { - "url": "https://github.com/torvalds/linux", - } - - def test_content_identifier(self): - self.assertEqual( - identifiers.origin_identifier(self.origin), - "b63a575fe3faab7692c9f38fb09d4bb45651bb0f", - ) - - -TS_DICTS = [ - ( - {"timestamp": 12345, "offset": 0}, - { - "timestamp": {"seconds": 12345, "microseconds": 0}, - "offset": 0, - "negative_utc": False, - }, - ), - ( - {"timestamp": 12345, "offset": 0, "negative_utc": False}, - { - "timestamp": {"seconds": 12345, "microseconds": 0}, - "offset": 0, - "negative_utc": False, - }, - ), - ( - {"timestamp": 12345, "offset": 0, "negative_utc": False}, - { - "timestamp": {"seconds": 12345, "microseconds": 0}, - "offset": 0, - "negative_utc": False, - }, - ), - ( - {"timestamp": 12345, "offset": 0, "negative_utc": None}, - { - "timestamp": {"seconds": 12345, "microseconds": 0}, - "offset": 0, - "negative_utc": False, - }, - ), - ( - {"timestamp": {"seconds": 12345}, "offset": 0, "negative_utc": None}, - { - "timestamp": {"seconds": 12345, "microseconds": 0}, - "offset": 0, - "negative_utc": False, - }, - ), - ( - { - "timestamp": {"seconds": 12345, "microseconds": 0}, - "offset": 0, - "negative_utc": None, - }, - { - "timestamp": {"seconds": 12345, "microseconds": 0}, - "offset": 0, - "negative_utc": False, - }, - ), - ( - { - "timestamp": {"seconds": 12345, "microseconds": 100}, - "offset": 0, - "negative_utc": None, - }, - { - "timestamp": {"seconds": 12345, "microseconds": 100}, - "offset": 0, - "negative_utc": False, - }, - ), - ( - {"timestamp": 12345, "offset": 0, "negative_utc": True}, - { - "timestamp": {"seconds": 12345, "microseconds": 0}, - "offset": 0, - "negative_utc": True, - }, - ), - ( - {"timestamp": 12345, "offset": 0, "negative_utc": None}, - { - "timestamp": {"seconds": 12345, "microseconds": 0}, - "offset": 0, - "negative_utc": False, - }, - ), -] - - -@pytest.mark.parametrize("dict_input,expected", TS_DICTS) -def test_normalize_timestamp_dict(dict_input, expected): - assert normalize_timestamp(dict_input) == expected - - -TS_DICTS_INVALID_TIMESTAMP = [ - {"timestamp": 1.2, "offset": 0}, - {"timestamp": "1", "offset": 0}, - # these below should really also trigger a ValueError... - # {"timestamp": {"seconds": "1"}, "offset": 0}, - # {"timestamp": {"seconds": 1.2}, "offset": 0}, - # {"timestamp": {"seconds": 1.2}, "offset": 0}, -] - - -@pytest.mark.parametrize("dict_input", TS_DICTS_INVALID_TIMESTAMP) -def test_normalize_timestamp_dict_invalid_timestamp(dict_input): - with pytest.raises(ValueError, match="non-integer timestamp"): - normalize_timestamp(dict_input) - - @pytest.mark.parametrize( "invalid_swhid", [ @@ -1117,17 +1146,9 @@ def test_swhid_hash(): ) assert hash( - SWHID( - object_type="directory", - object_id=object_id, - metadata={"foo": "bar", "baz": "qux"}, - ) + SWHID(object_type="directory", object_id=object_id, metadata=dummy_qualifiers,) ) == hash( - SWHID( - object_type="directory", - object_id=object_id, - metadata={"foo": "bar", "baz": "qux"}, - ) + SWHID(object_type="directory", object_id=object_id, metadata=dummy_qualifiers,) ) # Different order of the dictionary, so the underlying order of the tuple in @@ -1136,13 +1157,13 @@ def test_swhid_hash(): SWHID( object_type="directory", object_id=object_id, - metadata={"foo": "bar", "baz": "qux"}, + metadata={"origin": "https://example.com", "lines": "42"}, ) ) == hash( SWHID( object_type="directory", object_id=object_id, - metadata={"baz": "qux", "foo": "bar"}, + metadata={"lines": "42", "origin": "https://example.com"}, ) ) @@ -1155,21 +1176,9 @@ def test_swhid_eq(): ) assert SWHID( - object_type="directory", - object_id=object_id, - metadata={"foo": "bar", "baz": "qux"}, - ) == SWHID( - object_type="directory", - object_id=object_id, - metadata={"foo": "bar", "baz": "qux"}, - ) + object_type="directory", object_id=object_id, metadata=dummy_qualifiers, + ) == SWHID(object_type="directory", object_id=object_id, metadata=dummy_qualifiers,) assert SWHID( - object_type="directory", - object_id=object_id, - metadata={"foo": "bar", "baz": "qux"}, - ) == SWHID( - object_type="directory", - object_id=object_id, - metadata={"baz": "qux", "foo": "bar"}, - ) + object_type="directory", object_id=object_id, metadata=dummy_qualifiers, + ) == SWHID(object_type="directory", object_id=object_id, metadata=dummy_qualifiers,) diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py index 902f0df631db57782dfe123ce8c40668b89f2bd0..393dcfd8c8a4243f39c6989875c68f32092065bb 100644 --- a/swh/model/tests/test_model.py +++ b/swh/model/tests/test_model.py @@ -782,12 +782,13 @@ _metadata_authority = MetadataAuthority( _metadata_fetcher = MetadataFetcher(name="test-fetcher", version="0.0.1",) _content_swhid = parse_swhid("swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2") _origin_url = "https://forge.softwareheritage.org/source/swh-model.git" +_dummy_qualifiers = {"origin": "https://example.com", "lines": "42"} _common_metadata_fields = dict( discovery_date=datetime.datetime.now(tz=datetime.timezone.utc), authority=_metadata_authority, fetcher=_metadata_fetcher, format="json", - metadata=b'{"foo": "bar"}', + metadata=b'{"origin": "https://example.com", "lines": "42"}', ) @@ -815,7 +816,7 @@ def test_metadata_to_dict(): "fetcher": {"name": "test-fetcher", "version": "0.0.1",}, "discovery_date": _common_metadata_fields["discovery_date"], "format": "json", - "metadata": b'{"foo": "bar"}', + "metadata": b'{"origin": "https://example.com", "lines": "42"}', } m = RawExtrinsicMetadata( @@ -893,7 +894,7 @@ def test_metadata_invalid_target(): target=SWHID( object_type="content", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", - metadata={"foo": "bar"}, + metadata=_dummy_qualifiers, ), **_common_metadata_fields, ) @@ -1018,7 +1019,7 @@ def test_metadata_validate_context_snapshot(): snapshot=SWHID( object_type="snapshot", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", - metadata={"foo": "bar"}, + metadata=_dummy_qualifiers, ), **_common_metadata_fields, ) @@ -1073,7 +1074,7 @@ def test_metadata_validate_context_release(): release=SWHID( object_type="release", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", - metadata={"foo": "bar"}, + metadata=_dummy_qualifiers, ), **_common_metadata_fields, ) @@ -1128,7 +1129,7 @@ def test_metadata_validate_context_revision(): revision=SWHID( object_type="revision", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", - metadata={"foo": "bar"}, + metadata=_dummy_qualifiers, ), **_common_metadata_fields, ) @@ -1205,7 +1206,7 @@ def test_metadata_validate_context_directory(): directory=SWHID( object_type="directory", object_id="94a9ed024d3859793618152ea559a168bbcbb5e2", - metadata={"foo": "bar"}, + metadata=_dummy_qualifiers, ), **_common_metadata_fields, )