diff --git a/PKG-INFO b/PKG-INFO index f9303105f82ade6db51d464ca50906ca6fafb0ea..d6afa6e9ae14e5cf27e10ac25b9a506b8ea77373 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: swh.model -Version: 3.0.0 +Version: 3.1.0 Summary: Software Heritage data model Home-page: https://forge.softwareheritage.org/diffusion/DMOD/ Author: Software Heritage developers diff --git a/swh.model.egg-info/PKG-INFO b/swh.model.egg-info/PKG-INFO index f9303105f82ade6db51d464ca50906ca6fafb0ea..d6afa6e9ae14e5cf27e10ac25b9a506b8ea77373 100644 --- a/swh.model.egg-info/PKG-INFO +++ b/swh.model.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: swh.model -Version: 3.0.0 +Version: 3.1.0 Summary: Software Heritage data model Home-page: https://forge.softwareheritage.org/diffusion/DMOD/ Author: Software Heritage developers diff --git a/swh/model/hashutil.py b/swh/model/hashutil.py index eaacb2305eb53946736d6ad5fb1c18c4086d8879..86ecc6f0b64e67c0f9d070901e36e9f3b85db136 100644 --- a/swh/model/hashutil.py +++ b/swh/model/hashutil.py @@ -58,7 +58,7 @@ from io import BytesIO import os from typing import Callable, Dict, Optional -ALGORITHMS = set(["sha1", "sha256", "sha1_git", "blake2s256", "blake2b512"]) +ALGORITHMS = set(["sha1", "sha256", "sha1_git", "blake2s256", "blake2b512", "md5"]) """Hashing algorithms supported by this module""" DEFAULT_ALGORITHMS = set(["sha1", "sha256", "sha1_git", "blake2s256"]) diff --git a/swh/model/model.py b/swh/model/model.py index 0167eae597af309533cee10c7f06433a97448424..735ce46866e496f51bca7a79c97da67b48bb7c4e 100644 --- a/swh/model/model.py +++ b/swh/model/model.py @@ -22,14 +22,14 @@ import hashlib from typing import Any, Dict, Iterable, Optional, Tuple, TypeVar, Union import attr -from attrs_strict import type_validator +from attrs_strict import AttributeTypeError import dateutil.parser import iso8601 from typing_extensions import Final from . import git_objects from .collections import ImmutableDict -from .hashutil import DEFAULT_ALGORITHMS, MultiHash +from .hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_hex from .swhids import CoreSWHID from .swhids import ExtendedObjectType as SwhidExtendedObjectType from .swhids import ExtendedSWHID @@ -58,6 +58,13 @@ KT = TypeVar("KT") VT = TypeVar("VT") +def hash_repr(h: bytes) -> str: + if h is None: + return "None" + else: + return f"hash_to_bytes('{hash_to_hex(h)}')" + + def freeze_optional_dict( d: Union[None, Dict[KT, VT], ImmutableDict[KT, VT]] # type: ignore ) -> Optional[ImmutableDict[KT, VT]]: @@ -83,6 +90,71 @@ def dictify(value): return value +def _check_type(type_, value): + if type_ is object or type_ is Any: + return True + + origin = getattr(type_, "__origin__", None) + + # Non-generic type, check it directly + if origin is None: + # This is functionally equivalent to using just this: + # return isinstance(value, type) + # but using type equality before isinstance allows very quick checks + # when the exact class is used (which is the overwhelming majority of cases) + # while still allowing subclasses to be used. + return type(value) == type_ or isinstance(value, type_) + + # Check the type of the value itself + # + # For the same reason as above, this condition is functionally equivalent to: + # if origin is not Union and not isinstance(value, origin): + if origin is not Union and type(value) != origin and not isinstance(value, origin): + return False + + # Then, if it's a container, check its items. + if origin is tuple: + args = type_.__args__ + if len(args) == 2 and args[1] is Ellipsis: + # Infinite tuple + return all(_check_type(args[0], item) for item in value) + else: + # Finite tuple + if len(args) != len(value): + return False + + return all( + _check_type(item_type, item) for (item_type, item) in zip(args, value) + ) + elif origin is Union: + args = type_.__args__ + return any(_check_type(variant, value) for variant in args) + elif origin is ImmutableDict: + (key_type, value_type) = type_.__args__ + return all( + _check_type(key_type, key) and _check_type(value_type, value) + for (key, value) in value.items() + ) + else: + # No need to check dict or list. because they are converted to ImmutableDict + # and tuple respectively. + raise NotImplementedError(f"Type-checking {type_}") + + +def type_validator(): + """Like attrs_strict.type_validator(), but stricter. + + It is an attrs validator, which checks attributes have the specified type, + using type equality instead of ``isinstance()``, for improved performance + """ + + def validator(instance, attribute, value): + if not _check_type(attribute.type, value): + raise AttributeTypeError(value, attribute) + + return validator + + ModelType = TypeVar("ModelType", bound="BaseModel") @@ -426,7 +498,9 @@ class OriginVisitStatus(BaseModel): ["created", "ongoing", "full", "partial", "not_found", "failed"] ), ) - snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator()) + snapshot = attr.ib( + type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr + ) # Type is optional be to able to use it before adding it to the database model type = attr.ib(type=Optional[str], validator=type_validator(), default=None) metadata = attr.ib( @@ -457,6 +531,9 @@ class TargetType(Enum): SNAPSHOT = "snapshot" ALIAS = "alias" + def __repr__(self): + return f"TargetType.{self.name}" + class ObjectType(Enum): """The type of content pointed to by a release. Usually a revision""" @@ -467,6 +544,9 @@ class ObjectType(Enum): RELEASE = "release" SNAPSHOT = "snapshot" + def __repr__(self): + return f"ObjectType.{self.name}" + @attr.s(frozen=True, slots=True) class SnapshotBranch(BaseModel): @@ -474,7 +554,7 @@ class SnapshotBranch(BaseModel): object_type: Final = "snapshot_branch" - target = attr.ib(type=bytes, validator=type_validator()) + target = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) target_type = attr.ib(type=TargetType, validator=type_validator()) @target.validator @@ -501,7 +581,7 @@ class Snapshot(HashableObject, BaseModel): validator=type_validator(), converter=freeze_optional_dict, ) - id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") + id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) def compute_hash(self) -> bytes: git_object = git_objects.snapshot_git_object(self) @@ -529,7 +609,7 @@ class Release(HashableObject, BaseModel): name = attr.ib(type=bytes, validator=type_validator()) message = attr.ib(type=Optional[bytes], validator=type_validator()) - target = attr.ib(type=Optional[Sha1Git], validator=type_validator()) + target = attr.ib(type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr) target_type = attr.ib(type=ObjectType, validator=type_validator()) synthetic = attr.ib(type=bool, validator=type_validator()) author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) @@ -542,7 +622,7 @@ class Release(HashableObject, BaseModel): converter=freeze_optional_dict, default=None, ) - id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") + id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) def compute_hash(self) -> bytes: git_object = git_objects.release_git_object(self) @@ -591,6 +671,9 @@ class RevisionType(Enum): CVS = "cvs" BAZAAR = "bzr" + def __repr__(self): + return f"RevisionType.{self.name}" + def tuplify_extra_headers(value: Iterable): return tuple((k, v) for k, v in value) @@ -608,7 +691,7 @@ class Revision(HashableObject, BaseModel): type=Optional[TimestampWithTimezone], validator=type_validator() ) type = attr.ib(type=RevisionType, validator=type_validator()) - directory = attr.ib(type=Sha1Git, validator=type_validator()) + directory = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) synthetic = attr.ib(type=bool, validator=type_validator()) metadata = attr.ib( type=Optional[ImmutableDict[str, object]], @@ -617,7 +700,7 @@ class Revision(HashableObject, BaseModel): default=None, ) parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=()) - id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") + id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) extra_headers = attr.ib( type=Tuple[Tuple[bytes, bytes], ...], validator=type_validator(), @@ -685,22 +768,37 @@ class DirectoryEntry(BaseModel): name = attr.ib(type=bytes, validator=type_validator()) type = attr.ib(type=str, validator=attr.validators.in_(["file", "dir", "rev"])) - target = attr.ib(type=Sha1Git, validator=type_validator()) - perms = attr.ib(type=int, validator=type_validator()) + target = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) + perms = attr.ib(type=int, validator=type_validator(), converter=int, repr=oct) """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" + @name.validator + def check_name(self, attribute, value): + if b"/" in value: + raise ValueError("{value!r} is not a valid directory entry name.") + @attr.s(frozen=True, slots=True) class Directory(HashableObject, BaseModel): object_type: Final = "directory" entries = attr.ib(type=Tuple[DirectoryEntry, ...], validator=type_validator()) - id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") + id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) def compute_hash(self) -> bytes: git_object = git_objects.directory_git_object(self) return hashlib.new("sha1", git_object).digest() + @entries.validator + def check_entries(self, attribute, value): + seen = set() + for entry in value: + if entry.name in seen: + raise ValueError( + "{self.swhid()} has duplicated entry name: {entry.name!r}" + ) + seen.add(entry.name) + @classmethod def from_dict(cls, d): d = d.copy() @@ -756,10 +854,10 @@ class BaseContent(BaseModel): class Content(BaseContent): object_type: Final = "content" - sha1 = attr.ib(type=bytes, validator=type_validator()) - sha1_git = attr.ib(type=Sha1Git, validator=type_validator()) - sha256 = attr.ib(type=bytes, validator=type_validator()) - blake2s256 = attr.ib(type=bytes, validator=type_validator()) + sha1 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) + sha1_git = attr.ib(type=Sha1Git, validator=type_validator(), repr=hash_repr) + sha256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) + blake2s256 = attr.ib(type=bytes, validator=type_validator(), repr=hash_repr) length = attr.ib(type=int, validator=type_validator()) @@ -839,10 +937,14 @@ class Content(BaseContent): class SkippedContent(BaseContent): object_type: Final = "skipped_content" - sha1 = attr.ib(type=Optional[bytes], validator=type_validator()) - sha1_git = attr.ib(type=Optional[Sha1Git], validator=type_validator()) - sha256 = attr.ib(type=Optional[bytes], validator=type_validator()) - blake2s256 = attr.ib(type=Optional[bytes], validator=type_validator()) + sha1 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) + sha1_git = attr.ib( + type=Optional[Sha1Git], validator=type_validator(), repr=hash_repr + ) + sha256 = attr.ib(type=Optional[bytes], validator=type_validator(), repr=hash_repr) + blake2s256 = attr.ib( + type=Optional[bytes], validator=type_validator(), repr=hash_repr + ) length = attr.ib(type=Optional[int], validator=type_validator()) @@ -920,6 +1022,9 @@ class MetadataAuthorityType(Enum): FORGE = "forge" REGISTRY = "registry" + def __repr__(self): + return f"MetadataAuthorityType.{self.name}" + @attr.s(frozen=True, slots=True) class MetadataAuthority(BaseModel): @@ -1025,7 +1130,7 @@ class RawExtrinsicMetadata(HashableObject, BaseModel): type=Optional[CoreSWHID], default=None, validator=type_validator() ) - id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") + id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) def compute_hash(self) -> bytes: git_object = git_objects.raw_extrinsic_metadata_git_object(self) @@ -1217,7 +1322,7 @@ class ExtID(HashableObject, BaseModel): target = attr.ib(type=CoreSWHID, validator=type_validator()) extid_version = attr.ib(type=int, validator=type_validator(), default=0) - id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") + id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"", repr=hash_repr) @classmethod def from_dict(cls, d): diff --git a/swh/model/swhids.py b/swh/model/swhids.py index ee1be2004376e318f907e5f8c4623653be5f4716..b1283c132cc0518fd9dae1c07263fc194c21c1d5 100644 --- a/swh/model/swhids.py +++ b/swh/model/swhids.py @@ -79,7 +79,7 @@ _TObjectType = TypeVar("_TObjectType", ObjectType, ExtendedObjectType) _TSWHID = TypeVar("_TSWHID", bound="_BaseSWHID") -@attr.s(frozen=True, kw_only=True) +@attr.s(frozen=True, kw_only=True, repr=False) class _BaseSWHID(Generic[_TObjectType]): """Common base class for CoreSWHID, QualifiedSWHID, and ExtendedSWHID. @@ -132,6 +132,9 @@ class _BaseSWHID(Generic[_TObjectType]): ] ) + def __repr__(self) -> str: + return f"{self.__class__.__name__}.from_string('{self}')" + @classmethod def from_string(cls: Type[_TSWHID], s: str) -> _TSWHID: parts = _parse_swhid(s) @@ -145,7 +148,7 @@ class _BaseSWHID(Generic[_TObjectType]): ) from None -@attr.s(frozen=True, kw_only=True) +@attr.s(frozen=True, kw_only=True, repr=False) class CoreSWHID(_BaseSWHID[ObjectType]): """ Dataclass holding the relevant info associated to a SoftWare Heritage @@ -223,7 +226,7 @@ def _parse_path_qualifier(path: Union[str, bytes, None]) -> Optional[bytes]: return urllib.parse.unquote_to_bytes(path) -@attr.s(frozen=True, kw_only=True) +@attr.s(frozen=True, kw_only=True, repr=False) class QualifiedSWHID(_BaseSWHID[ObjectType]): """ Dataclass holding the relevant info associated to a SoftWare Heritage @@ -361,6 +364,9 @@ class QualifiedSWHID(_BaseSWHID[ObjectType]): swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v) return swhid + def __repr__(self) -> str: + return super().__repr__() + @classmethod def from_string(cls, s: str) -> QualifiedSWHID: parts = _parse_swhid(s) @@ -379,7 +385,7 @@ class QualifiedSWHID(_BaseSWHID[ObjectType]): ) from None -@attr.s(frozen=True, kw_only=True) +@attr.s(frozen=True, kw_only=True, repr=False) class ExtendedSWHID(_BaseSWHID[ExtendedObjectType]): """ Dataclass holding the relevant info associated to a SoftWare Heritage diff --git a/swh/model/tests/test_hashutil.py b/swh/model/tests/test_hashutil.py index 59787a2289439fe5813b6f5ea3a42730d303ed98..c864bd8f9efbca0714e6e12a39b79c3512d8aed8 100644 --- a/swh/model/tests/test_hashutil.py +++ b/swh/model/tests/test_hashutil.py @@ -8,11 +8,12 @@ import hashlib import io import os import tempfile -import unittest from unittest.mock import patch +import pytest + from swh.model import hashutil -from swh.model.hashutil import MultiHash +from swh.model.hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytehex @contextlib.contextmanager @@ -26,13 +27,18 @@ def patch_blake2(function_name): hashutil._blake2_hash_cache.clear() -class BaseHashutil(unittest.TestCase): - def setUp(self): - # Reset function cache - hashutil._blake2_hash_cache = {} +@pytest.fixture(autouse=True) +def blake2_hash_cache_reset(): + # Reset function cache + hashutil._blake2_hash_cache = {} + - self.data = b"1984\n" - self.hex_checksums = { +@pytest.fixture +def hash_test_data(): + class HashTestData: + + data = b"1984\n" + hex_checksums = { "sha1": "62be35bf00ff0c624f4a621e2ea5595a049e0731", "sha1_git": "568aaf43d83b2c3df8067f3bedbb97d83260be6d", "sha256": "26602113b4b9afd9d55466b08580d3c2" @@ -41,238 +47,282 @@ class BaseHashutil(unittest.TestCase): "c9422f9f2dc8906", } - self.checksums = { - type: bytes.fromhex(cksum) for type, cksum in self.hex_checksums.items() + checksums = { + type: bytes.fromhex(cksum) for type, cksum in hex_checksums.items() } - self.bytehex_checksums = { - type: hashutil.hash_to_bytehex(cksum) - for type, cksum in self.checksums.items() + bytehex_checksums = { + type: hashutil.hash_to_bytehex(cksum) for type, cksum in checksums.items() } - self.git_hex_checksums = { - "blob": self.hex_checksums["sha1_git"], + git_hex_checksums = { + "blob": hex_checksums["sha1_git"], "tree": "5b2e883aa33d2efab98442693ea4dd5f1b8871b0", "commit": "79e4093542e72f0fcb7cbd75cb7d270f9254aa8f", "tag": "d6bf62466f287b4d986c545890716ce058bddf67", } - self.git_checksums = { - type: bytes.fromhex(cksum) for type, cksum in self.git_hex_checksums.items() + git_checksums = { + type: bytes.fromhex(cksum) for type, cksum in git_hex_checksums.items() } + return HashTestData -class MultiHashTest(BaseHashutil): - def test_multi_hash_data(self): - checksums = MultiHash.from_data(self.data).digest() - self.assertEqual(checksums, self.checksums) - self.assertFalse("length" in checksums) - def test_multi_hash_data_with_length(self): - expected_checksums = self.checksums.copy() - expected_checksums["length"] = len(self.data) +def test_multi_hash_data(hash_test_data): + checksums = MultiHash.from_data(hash_test_data.data).digest() + assert checksums == hash_test_data.checksums + assert "length" not in checksums - algos = set(["length"]).union(hashutil.DEFAULT_ALGORITHMS) - checksums = MultiHash.from_data(self.data, hash_names=algos).digest() - self.assertEqual(checksums, expected_checksums) - self.assertTrue("length" in checksums) +def test_multi_hash_data_with_length(hash_test_data): + expected_checksums = hash_test_data.checksums.copy() + expected_checksums["length"] = len(hash_test_data.data) - def test_multi_hash_data_unknown_hash(self): - with self.assertRaises(ValueError) as cm: - MultiHash.from_data(self.data, ["unknown-hash"]) + algos = set(["length"]).union(hashutil.DEFAULT_ALGORITHMS) + checksums = MultiHash.from_data(hash_test_data.data, hash_names=algos).digest() - self.assertIn("Unexpected hashing algorithm", cm.exception.args[0]) - self.assertIn("unknown-hash", cm.exception.args[0]) + assert checksums == expected_checksums + assert "length" in checksums - def test_multi_hash_file(self): - fobj = io.BytesIO(self.data) - checksums = MultiHash.from_file(fobj, length=len(self.data)).digest() - self.assertEqual(checksums, self.checksums) +def test_multi_hash_data_unknown_hash(hash_test_data): + with pytest.raises(ValueError, match="Unexpected hashing algorithm.*unknown-hash"): + MultiHash.from_data(hash_test_data.data, ["unknown-hash"]) - def test_multi_hash_file_hexdigest(self): - fobj = io.BytesIO(self.data) - length = len(self.data) - checksums = MultiHash.from_file(fobj, length=length).hexdigest() - self.assertEqual(checksums, self.hex_checksums) - def test_multi_hash_file_bytehexdigest(self): - fobj = io.BytesIO(self.data) - length = len(self.data) - checksums = MultiHash.from_file(fobj, length=length).bytehexdigest() - self.assertEqual(checksums, self.bytehex_checksums) +def test_multi_hash_file(hash_test_data): + fobj = io.BytesIO(hash_test_data.data) - def test_multi_hash_file_missing_length(self): - fobj = io.BytesIO(self.data) - with self.assertRaises(ValueError) as cm: - MultiHash.from_file(fobj, hash_names=["sha1_git"]) + checksums = MultiHash.from_file(fobj, length=len(hash_test_data.data)).digest() + assert checksums == hash_test_data.checksums - self.assertIn("Missing length", cm.exception.args[0]) - def test_multi_hash_path(self): - with tempfile.NamedTemporaryFile(delete=False) as f: - f.write(self.data) +def test_multi_hash_file_hexdigest(hash_test_data): + fobj = io.BytesIO(hash_test_data.data) + length = len(hash_test_data.data) + checksums = MultiHash.from_file(fobj, length=length).hexdigest() + assert checksums == hash_test_data.hex_checksums - hashes = MultiHash.from_path(f.name).digest() - os.remove(f.name) - self.assertEqual(self.checksums, hashes) +def test_multi_hash_file_bytehexdigest(hash_test_data): + fobj = io.BytesIO(hash_test_data.data) + length = len(hash_test_data.data) + checksums = MultiHash.from_file(fobj, length=length).bytehexdigest() + assert checksums == hash_test_data.bytehex_checksums -class Hashutil(BaseHashutil): - def test_hash_git_data(self): - checksums = { - git_type: hashutil.hash_git_data(self.data, git_type) - for git_type in self.git_checksums - } +def test_multi_hash_file_with_md5(hash_test_data): + fobj = io.BytesIO(hash_test_data.data) - self.assertEqual(checksums, self.git_checksums) - - def test_hash_git_data_unknown_git_type(self): - with self.assertRaises(ValueError) as cm: - hashutil.hash_git_data(self.data, "unknown-git-type") - - self.assertIn("Unexpected git object type", cm.exception.args[0]) - self.assertIn("unknown-git-type", cm.exception.args[0]) - - def test_hash_to_hex(self): - for type in self.checksums: - hex = self.hex_checksums[type] - hash = self.checksums[type] - self.assertEqual(hashutil.hash_to_hex(hex), hex) - self.assertEqual(hashutil.hash_to_hex(hash), hex) - - def test_hash_to_bytes(self): - for type in self.checksums: - hex = self.hex_checksums[type] - hash = self.checksums[type] - self.assertEqual(hashutil.hash_to_bytes(hex), hash) - self.assertEqual(hashutil.hash_to_bytes(hash), hash) - - def test_hash_to_bytehex(self): - for algo in self.checksums: - self.assertEqual( - self.hex_checksums[algo].encode("ascii"), - hashutil.hash_to_bytehex(self.checksums[algo]), - ) - - def test_bytehex_to_hash(self): - for algo in self.checksums: - self.assertEqual( - self.checksums[algo], - hashutil.bytehex_to_hash(self.hex_checksums[algo].encode()), - ) - - def test_new_hash_unsupported_hashing_algorithm(self): - try: - hashutil._new_hash("blake2:10") - except ValueError as e: - self.assertEqual( - str(e), - "Unexpected hashing algorithm blake2:10, " - "expected one of blake2b512, blake2s256, " - "sha1, sha1_git, sha256", - ) - - @patch("hashlib.new") - def test_new_hash_blake2b_blake2b512_builtin(self, mock_hashlib_new): - if "blake2b512" not in hashlib.algorithms_available: - self.skipTest("blake2b512 not built-in") - mock_hashlib_new.return_value = sentinel = object() + checksums = MultiHash.from_file( + fobj, hash_names=DEFAULT_ALGORITHMS | {"md5"}, length=len(hash_test_data.data) + ).digest() + md5sum = {"md5": hashlib.md5(hash_test_data.data).digest()} + assert checksums == {**hash_test_data.checksums, **md5sum} - h = hashutil._new_hash("blake2b512") - self.assertIs(h, sentinel) - mock_hashlib_new.assert_called_with("blake2b512") +def test_multi_hash_file_hexdigest_with_md5(hash_test_data): + fobj = io.BytesIO(hash_test_data.data) + length = len(hash_test_data.data) + checksums = MultiHash.from_file( + fobj, hash_names=DEFAULT_ALGORITHMS | {"md5"}, length=length + ).hexdigest() + md5sum = {"md5": hashlib.md5(hash_test_data.data).hexdigest()} + assert checksums == {**hash_test_data.hex_checksums, **md5sum} - @patch("hashlib.new") - def test_new_hash_blake2s_blake2s256_builtin(self, mock_hashlib_new): - if "blake2s256" not in hashlib.algorithms_available: - self.skipTest("blake2s256 not built-in") - mock_hashlib_new.return_value = sentinel = object() - h = hashutil._new_hash("blake2s256") +def test_multi_hash_file_bytehexdigest_with_md5(hash_test_data): + fobj = io.BytesIO(hash_test_data.data) + length = len(hash_test_data.data) + checksums = MultiHash.from_file( + fobj, hash_names=DEFAULT_ALGORITHMS | {"md5"}, length=length + ).bytehexdigest() + md5sum = {"md5": hash_to_bytehex(hashlib.md5(hash_test_data.data).digest())} + assert checksums == {**hash_test_data.bytehex_checksums, **md5sum} + + +def test_multi_hash_file_missing_length(hash_test_data): + fobj = io.BytesIO(hash_test_data.data) + with pytest.raises(ValueError, match="Missing length"): + MultiHash.from_file(fobj, hash_names=["sha1_git"]) + + +def test_multi_hash_path(hash_test_data): + with tempfile.NamedTemporaryFile(delete=False) as f: + f.write(hash_test_data.data) + + hashes = MultiHash.from_path(f.name).digest() + os.remove(f.name) + + assert hash_test_data.checksums == hashes + + +def test_hash_git_data(hash_test_data): + checksums = { + git_type: hashutil.hash_git_data(hash_test_data.data, git_type) + for git_type in hash_test_data.git_checksums + } + + assert checksums == hash_test_data.git_checksums + + +def test_hash_git_data_unknown_git_type(hash_test_data): + with pytest.raises( + ValueError, match="Unexpected git object type.*unknown-git-type" + ): + hashutil.hash_git_data(hash_test_data.data, "unknown-git-type") + + +def test_hash_to_hex(hash_test_data): + for type in hash_test_data.checksums: + hex = hash_test_data.hex_checksums[type] + hash = hash_test_data.checksums[type] + assert hashutil.hash_to_hex(hex) == hex + assert hashutil.hash_to_hex(hash) == hex + + +def test_hash_to_bytes(hash_test_data): + for type in hash_test_data.checksums: + hex = hash_test_data.hex_checksums[type] + hash = hash_test_data.checksums[type] + assert hashutil.hash_to_bytes(hex) == hash + assert hashutil.hash_to_bytes(hash) == hash + + +def test_hash_to_bytehex(hash_test_data): + for algo in hash_test_data.checksums: + hex_checksum = hash_test_data.hex_checksums[algo].encode("ascii") + assert hex_checksum == hashutil.hash_to_bytehex(hash_test_data.checksums[algo]) + + +def test_bytehex_to_hash(hash_test_data): + for algo in hash_test_data.checksums: + assert hash_test_data.checksums[algo] == hashutil.bytehex_to_hash( + hash_test_data.hex_checksums[algo].encode() + ) - self.assertIs(h, sentinel) - mock_hashlib_new.assert_called_with("blake2s256") - def test_new_hash_blake2b_builtin(self): - removed_hash = False +def test_new_hash_unsupported_hashing_algorithm(): + expected_message = ( + "Unexpected hashing algorithm blake2:10, " + "expected one of blake2b512, blake2s256, " + "md5, sha1, sha1_git, sha256" + ) + with pytest.raises(ValueError, match=expected_message): + hashutil._new_hash("blake2:10") - try: - if "blake2b512" in hashlib.algorithms_available: - removed_hash = True - hashlib.algorithms_available.remove("blake2b512") - if "blake2b" not in hashlib.algorithms_available: - self.skipTest("blake2b not built in") - with patch_blake2("hashlib.blake2b") as mock_blake2b: - mock_blake2b.return_value = sentinel = object() +@pytest.mark.skipif( + "blake2b512" not in hashlib.algorithms_available, reason="blake2b512 not built-in" +) +@patch("hashlib.new") +def test_new_hash_blake2b_blake2b512_builtin(mock_hashlib_new): + mock_hashlib_new.return_value = sentinel = object() - h = hashutil._new_hash("blake2b512") + h = hashutil._new_hash("blake2b512") - self.assertIs(h, sentinel) - mock_blake2b.assert_called_with(digest_size=512 // 8) - finally: - if removed_hash: - hashlib.algorithms_available.add("blake2b512") + assert h is sentinel + mock_hashlib_new.assert_called_with("blake2b512") - def test_new_hash_blake2s_builtin(self): - removed_hash = False - try: - if "blake2s256" in hashlib.algorithms_available: - removed_hash = True - hashlib.algorithms_available.remove("blake2s256") - if "blake2s" not in hashlib.algorithms_available: - self.skipTest("blake2s not built in") +@pytest.mark.skipif( + "blake2s256" not in hashlib.algorithms_available, reason="blake2s256 not built-in" +) +@patch("hashlib.new") +def test_new_hash_blake2s_blake2s256_builtin(mock_hashlib_new): + mock_hashlib_new.return_value = sentinel = object() - with patch_blake2("hashlib.blake2s") as mock_blake2s: - mock_blake2s.return_value = sentinel = object() + h = hashutil._new_hash("blake2s256") - h = hashutil._new_hash("blake2s256") + assert h is sentinel + mock_hashlib_new.assert_called_with("blake2s256") - self.assertIs(h, sentinel) - mock_blake2s.assert_called_with(digest_size=256 // 8) - finally: - if removed_hash: - hashlib.algorithms_available.add("blake2s256") - def test_new_hash_blake2b_pyblake2(self): +@pytest.mark.skipif( + "blake2b" not in hashlib.algorithms_available, reason="blake2b not built-in" +) +def test_new_hash_blake2b_builtin(): + removed_hash = False + + try: if "blake2b512" in hashlib.algorithms_available: - self.skipTest("blake2b512 built in") - if "blake2b" in hashlib.algorithms_available: - self.skipTest("blake2b built in") + removed_hash = True + hashlib.algorithms_available.remove("blake2b512") - with patch_blake2("pyblake2.blake2b") as mock_blake2b: + with patch_blake2("hashlib.blake2b") as mock_blake2b: mock_blake2b.return_value = sentinel = object() h = hashutil._new_hash("blake2b512") - self.assertIs(h, sentinel) + assert h is sentinel mock_blake2b.assert_called_with(digest_size=512 // 8) + finally: + if removed_hash: + hashlib.algorithms_available.add("blake2b512") + + +@pytest.mark.skipif( + "blake2s" not in hashlib.algorithms_available, reason="blake2s not built-in" +) +def test_new_hash_blake2s_builtin(): + removed_hash = False - def test_new_hash_blake2s_pyblake2(self): + try: if "blake2s256" in hashlib.algorithms_available: - self.skipTest("blake2s256 built in") - if "blake2s" in hashlib.algorithms_available: - self.skipTest("blake2s built in") + removed_hash = True + hashlib.algorithms_available.remove("blake2s256") - with patch_blake2("pyblake2.blake2s") as mock_blake2s: + with patch_blake2("hashlib.blake2s") as mock_blake2s: mock_blake2s.return_value = sentinel = object() h = hashutil._new_hash("blake2s256") - self.assertIs(h, sentinel) + assert h is sentinel mock_blake2s.assert_called_with(digest_size=256 // 8) + finally: + if removed_hash: + hashlib.algorithms_available.add("blake2s256") + + +@pytest.mark.skipif( + "blake2b512" in hashlib.algorithms_available, reason="blake2b512 built-in" +) +@pytest.mark.skipif( + "blake2b" in hashlib.algorithms_available, reason="blake2b built-in" +) +def test_new_hash_blake2b_pyblake2(): + with patch_blake2("pyblake2.blake2b") as mock_blake2b: + mock_blake2b.return_value = sentinel = object() + + h = hashutil._new_hash("blake2b512") + assert h is sentinel + mock_blake2b.assert_called_with(digest_size=512 // 8) -class HashlibGit(unittest.TestCase): - def setUp(self): - self.blob_data = b"42\n" - self.tree_data = b"".join( +@pytest.mark.skipif( + "blake2s256" in hashlib.algorithms_available, reason="blake2s256 built-in" +) +@pytest.mark.skipif( + "blake2s" in hashlib.algorithms_available, reason="blake2s built-in" +) +def test_new_hash_blake2s_pyblake2(): + with patch_blake2("pyblake2.blake2s") as mock_blake2s: + mock_blake2s.return_value = sentinel = object() + + h = hashutil._new_hash("blake2s256") + + assert h is sentinel + mock_blake2s.assert_called_with(digest_size=256 // 8) + + +@pytest.fixture +def hashgit_test_data(): + class HashGitTestData: + blob_data = b"42\n" + + tree_data = b"".join( [ b"40000 barfoo\0", bytes.fromhex("c3020f6bf135a38c6df" "3afeb5fb38232c5e07087"), @@ -283,14 +333,15 @@ class HashlibGit(unittest.TestCase): ] ) - self.commit_data = b"""\ + commit_data = b"""\ tree 1c61f7259dcb770f46b194d941df4f08ff0a3970 author Antoine R. Dumont (@ardumont) <antoine.romain.dumont@gmail.com> 1444054085 +0200 committer Antoine R. Dumont (@ardumont) <antoine.romain.dumont@gmail.com> 1444054085 +0200 initial """ # noqa - self.tag_data = """object 24d012aaec0bc5a4d2f62c56399053d6cc72a241 + + tag_data = """object 24d012aaec0bc5a4d2f62c56399053d6cc72a241 type commit tag 0.0.1 tagger Antoine R. Dumont (@ardumont) <antoine.romain.dumont@gmail.com> 1444225145 +0200 @@ -300,7 +351,7 @@ blah "utf-8" ) # NOQA - self.checksums = { + checksums = { "blob_sha1_git": bytes.fromhex( "d81cc0710eb6cf9efd5b920a8453e1" "e07157b6cd" ), @@ -315,36 +366,43 @@ blah ), } - def test_unknown_header_type(self): - with self.assertRaises(ValueError) as cm: - hashutil.hash_git_data(b"any-data", "some-unknown-type") + return HashGitTestData + + +def test_unknown_header_type(): + with pytest.raises(ValueError, match="Unexpected git object type"): + hashutil.hash_git_data(b"any-data", "some-unknown-type") + + +def test_hashdata_content(hashgit_test_data): + # when + actual_hash = hashutil.hash_git_data(hashgit_test_data.blob_data, git_type="blob") + + # then + assert actual_hash == hashgit_test_data.checksums["blob_sha1_git"] - self.assertIn("Unexpected git object type", cm.exception.args[0]) - def test_hashdata_content(self): - # when - actual_hash = hashutil.hash_git_data(self.blob_data, git_type="blob") +def test_hashdata_tree(hashgit_test_data): + # when + actual_hash = hashutil.hash_git_data(hashgit_test_data.tree_data, git_type="tree") - # then - self.assertEqual(actual_hash, self.checksums["blob_sha1_git"]) + # then + assert actual_hash == hashgit_test_data.checksums["tree_sha1_git"] - def test_hashdata_tree(self): - # when - actual_hash = hashutil.hash_git_data(self.tree_data, git_type="tree") - # then - self.assertEqual(actual_hash, self.checksums["tree_sha1_git"]) +def test_hashdata_revision(hashgit_test_data): + # when + actual_hash = hashutil.hash_git_data( + hashgit_test_data.commit_data, git_type="commit" + ) - def test_hashdata_revision(self): - # when - actual_hash = hashutil.hash_git_data(self.commit_data, git_type="commit") + # then + assert actual_hash == hashgit_test_data.checksums["commit_sha1_git"] - # then - self.assertEqual(actual_hash, self.checksums["commit_sha1_git"]) - def test_hashdata_tag(self): - # when - actual_hash = hashutil.hash_git_data(self.tag_data, git_type="tag") +def test_hashdata_tag(hashgit_test_data): + # when + actual_hash = hashutil.hash_git_data(hashgit_test_data.tag_data, git_type="tag") - # then - self.assertEqual(actual_hash, self.checksums["tag_sha1_git"]) + # then + assert actual_hash == hashgit_test_data.checksums["tag_sha1_git"] diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py index 781cfa46608aa72b563f6c0b8f40dbb6e6879025..47f6d3c781de4ec09729509a68b083488e9dda81 100644 --- a/swh/model/tests/test_model.py +++ b/swh/model/tests/test_model.py @@ -3,21 +3,28 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import collections import copy import datetime +from typing import Any, List, Optional, Tuple, Union import attr from attrs_strict import AttributeTypeError +import dateutil from hypothesis import given from hypothesis.strategies import binary import pytest +from swh.model.collections import ImmutableDict +from swh.model.from_disk import DentryPerms from swh.model.hashutil import MultiHash, hash_to_bytes import swh.model.hypothesis_strategies as strategies +import swh.model.model from swh.model.model import ( BaseModel, Content, Directory, + DirectoryEntry, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, @@ -31,9 +38,12 @@ from swh.model.model import ( Revision, SkippedContent, Snapshot, + TargetType, Timestamp, TimestampWithTimezone, + type_validator, ) +import swh.model.swhids from swh.model.swhids import CoreSWHID, ExtendedSWHID, ObjectType from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.model.tests.test_identifiers import ( @@ -69,6 +79,199 @@ def test_todict_inverse_fromdict(objtype_and_obj): assert obj_as_dict == type(obj).from_dict(obj_as_dict).to_dict() +@given(strategies.objects()) +def test_repr(objtype_and_obj): + """Checks every model object has a working repr(), and that it can be eval()uated + (so that printed objects can be copy-pasted to write test cases.)""" + (obj_type, obj) = objtype_and_obj + + r = repr(obj) + env = { + "tzutc": lambda: datetime.timezone.utc, + "tzfile": dateutil.tz.tzfile, + "hash_to_bytes": hash_to_bytes, + **swh.model.swhids.__dict__, + **swh.model.model.__dict__, + } + assert eval(r, env) == obj + + +@attr.s +class Cls1: + pass + + +@attr.s +class Cls2(Cls1): + pass + + +_custom_namedtuple = collections.namedtuple("_custom_namedtuple", "a b") + + +class _custom_tuple(tuple): + pass + + +# List of (type, valid_values, invalid_values) +_TYPE_VALIDATOR_PARAMETERS: List[Tuple[Any, List[Any], List[Any]]] = [ + # base types: + ( + bool, + [True, False], + [-1, 0, 1, 42, 1000, None, "123", 0.0, (), ("foo",), ImmutableDict()], + ), + ( + int, + [-1, 0, 1, 42, 1000, DentryPerms.directory, True, False], + [None, "123", 0.0, (), ImmutableDict()], + ), + ( + float, + [-1.0, 0.0, 1.0, float("infinity"), float("NaN")], + [True, False, None, 1, "1.2", (), ImmutableDict()], + ), + ( + bytes, + [b"", b"123"], + [None, bytearray(b"\x12\x34"), "123", 0, 123, (), (1, 2, 3), ImmutableDict()], + ), + (str, ["", "123"], [None, b"123", b"", 0, (), (1, 2, 3), ImmutableDict()]), + # unions: + ( + Optional[int], + [None, -1, 0, 1, 42, 1000, DentryPerms.directory], + ["123", 0.0, (), ImmutableDict()], + ), + ( + Optional[bytes], + [None, b"", b"123"], + ["123", "", 0, (), (1, 2, 3), ImmutableDict()], + ), + ( + Union[str, bytes], + ["", "123", b"123", b""], + [None, 0, (), (1, 2, 3), ImmutableDict()], + ), + ( + Union[str, bytes, None], + ["", "123", b"123", b"", None], + [0, (), (1, 2, 3), ImmutableDict()], + ), + # tuples + ( + Tuple[str, str], + [("foo", "bar"), ("", ""), _custom_namedtuple("", ""), _custom_tuple(("", ""))], + [("foo",), ("foo", "bar", "baz"), ("foo", 42), (42, "foo")], + ), + ( + Tuple[str, ...], + [ + ("foo",), + ("foo", "bar"), + ("", ""), + ("foo", "bar", "baz"), + _custom_namedtuple("", ""), + _custom_tuple(("", "")), + ], + [("foo", 42), (42, "foo")], + ), + # composite generic: + ( + Tuple[Union[str, int], Union[str, int]], + [("foo", "foo"), ("foo", 42), (42, "foo"), (42, 42)], + [("foo", b"bar"), (b"bar", "foo")], + ), + ( + Union[Tuple[str, str], Tuple[int, int]], + [("foo", "foo"), (42, 42)], + [("foo", b"bar"), (b"bar", "foo"), ("foo", 42), (42, "foo")], + ), + ( + Tuple[Tuple[bytes, bytes], ...], + [(), ((b"foo", b"bar"),), ((b"foo", b"bar"), (b"baz", b"qux"))], + [((b"foo", "bar"),), ((b"foo", b"bar"), ("baz", b"qux"))], + ), + # standard types: + ( + datetime.datetime, + [datetime.datetime.now(), datetime.datetime.now(tz=datetime.timezone.utc)], + [None, 123], + ), + # ImmutableDict + ( + ImmutableDict[str, int], + [ + ImmutableDict(), + ImmutableDict({"foo": 42}), + ImmutableDict({"foo": 42, "bar": 123}), + ], + [ImmutableDict({"foo": "bar"}), ImmutableDict({42: 123})], + ), + # Any: + (object, [-1, 0, 1, 42, 1000, None, "123", 0.0, (), ImmutableDict()], [],), + (Any, [-1, 0, 1, 42, 1000, None, "123", 0.0, (), ImmutableDict()], [],), + ( + ImmutableDict[Any, int], + [ + ImmutableDict(), + ImmutableDict({"foo": 42}), + ImmutableDict({"foo": 42, "bar": 123}), + ImmutableDict({42: 123}), + ], + [ImmutableDict({"foo": "bar"})], + ), + ( + ImmutableDict[str, Any], + [ + ImmutableDict(), + ImmutableDict({"foo": 42}), + ImmutableDict({"foo": "bar"}), + ImmutableDict({"foo": 42, "bar": 123}), + ], + [ImmutableDict({42: 123})], + ), + # attr objects: + ( + Timestamp, + [Timestamp(seconds=123, microseconds=0),], + [None, "2021-09-28T11:27:59", 123], + ), + (Cls1, [Cls1(), Cls2()], [None, b"abcd"],), + # enums: + ( + TargetType, + [TargetType.CONTENT, TargetType.ALIAS], + ["content", "alias", 123, None], + ), +] + + +@pytest.mark.parametrize( + "type_,value", + [ + pytest.param(type_, value, id=f"type={type_}, value={value}") + for (type_, values, _) in _TYPE_VALIDATOR_PARAMETERS + for value in values + ], +) +def test_type_validator_valid(type_, value): + type_validator()(None, attr.ib(type=type_), value) + + +@pytest.mark.parametrize( + "type_,value", + [ + pytest.param(type_, value, id=f"type={type_}, value={value}") + for (type_, _, values) in _TYPE_VALIDATOR_PARAMETERS + for value in values + ], +) +def test_type_validator_invalid(type_, value): + with pytest.raises(AttributeTypeError): + type_validator()(None, attr.ib(type=type_), value) + + @pytest.mark.parametrize("object_type, objects", TEST_OBJECTS.items()) def test_swh_model_todict_fromdict(object_type, objects): """checks model objects in swh_model_data are in correct shape""" @@ -535,6 +738,30 @@ def test_skipped_content_naive_datetime(): ) +# Directory + + +def test_directory_entry_name_validation(): + with pytest.raises(ValueError, match="valid directory entry name."): + DirectoryEntry(name=b"foo/", type="dir", target=b"\x00" * 20, perms=0), + + +def test_directory_duplicate_entry_name(): + entries = ( + DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0), + DirectoryEntry(name=b"foo", type="dir", target=b"\x01" * 20, perms=1), + ) + with pytest.raises(ValueError, match="duplicated entry name"): + Directory(entries=entries) + + entries = ( + DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0), + DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0), + ) + with pytest.raises(ValueError, match="duplicated entry name"): + Directory(entries=entries) + + # Revision