Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • anlambert/swh-model
  • lunar/swh-model
  • franckbret/swh-model
  • douardda/swh-model
  • olasd/swh-model
  • swh/devel/swh-model
  • Alphare/swh-model
  • samplet/swh-model
  • marmoute/swh-model
  • rboyer/swh-model
10 results
Show changes
Showing
with 6189 additions and 963 deletions
# Copyright (C) 2015-2017 The Software Heritage developers
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -10,31 +10,60 @@ Only a subset of hashing algorithms is supported as defined in the
ALGORITHMS set. Any provided algorithms not in that list will result
in a ValueError explaining the error.
This modules defines the following hashing functions:
This module defines a MultiHash class to ease the softwareheritage
hashing algorithms computation. This allows to compute hashes from
file object, path, data using a similar interface as what the standard
hashlib module provides.
- hash_file: Hash the contents of the given file object with the given
algorithms (defaulting to DEFAULT_ALGORITHMS if none provided).
Basic usage examples:
- hash_data: Hash the given binary blob with the given algorithms
(defaulting to DEFAULT_ALGORITHMS if none provided).
- file object: MultiHash.from_file(
file_object, hash_names=DEFAULT_ALGORITHMS).digest()
- path (filepath): MultiHash.from_path(b'foo').hexdigest()
- data (bytes): MultiHash.from_data(b'foo').bytehexdigest()
"Complex" usage, defining a swh hashlib instance first:
- To compute length, integrate the length to the set of algorithms to
compute, for example:
.. code-block:: python
h = MultiHash(hash_names=set({'length'}).union(DEFAULT_ALGORITHMS))
with open(filepath, 'rb') as f:
h.update(f.read(HASH_BLOCK_SIZE))
hashes = h.digest() # returns a dict of {hash_algo_name: hash_in_bytes}
- Write alongside computing hashing algorithms (from a stream), example:
.. code-block:: python
h = MultiHash(length=length)
with open(filepath, 'wb') as f:
for chunk in r.iter_content(): # r a stream of sort
h.update(chunk)
f.write(chunk)
hashes = h.hexdigest() # returns a dict of {hash_algo_name: hash_in_hex}
- hash_path: Hash the contents of the file at the given path with the
given algorithms (defaulting to DEFAULT_ALGORITHMS if none
provided).
"""
import binascii
import functools
import hashlib
import os
from io import BytesIO
import os
from typing import Callable, Dict, Optional, Union
ALGORITHMS = set(['sha1', 'sha256', 'sha1_git', 'blake2s256', 'blake2b512'])
ALGORITHMS = set(
["sha1", "sha256", "sha1_git", "blake2s256", "blake2b512", "md5", "sha512"]
)
"""Hashing algorithms supported by this module"""
DEFAULT_ALGORITHMS = set(['sha1', 'sha256', 'sha1_git', 'blake2s256'])
DEFAULT_ALGORITHMS = set(["sha1", "sha256", "sha1_git", "blake2s256"])
"""Algorithms computed by default when calling the functions from this module.
Subset of :const:`ALGORITHMS`.
......@@ -43,19 +72,103 @@ Subset of :const:`ALGORITHMS`.
HASH_BLOCK_SIZE = 32768
"""Block size for streaming hash computations made in this module"""
_blake2_hash_cache = {}
_blake2_hash_cache: Dict[str, Callable] = {}
def _new_blake2_hash(algo):
"""Return a function that initializes a blake2 hash.
class MultiHash:
"""Hashutil class to support multiple hashes computation.
Args:
hash_names (set): Set of hash algorithms (+ optionally length)
to compute hashes (cf. DEFAULT_ALGORITHMS)
length (int): Length of the total sum of chunks to read
If the length is provided as algorithm, the length is also
computed and returned.
"""
def __init__(self, hash_names=DEFAULT_ALGORITHMS, length=None):
self.state = {}
self.track_length = False
for name in hash_names:
if name == "length":
self.state["length"] = 0
self.track_length = True
else:
self.state[name] = _new_hash(name, length)
@classmethod
def from_state(cls, state, track_length):
ret = cls([])
ret.state = state
ret.track_length = track_length
@classmethod
def from_file(cls, fobj, hash_names=DEFAULT_ALGORITHMS, length=None):
ret = cls(length=length, hash_names=hash_names)
while True:
chunk = fobj.read(HASH_BLOCK_SIZE)
if not chunk:
break
ret.update(chunk)
return ret
@classmethod
def from_path(cls, path, hash_names=DEFAULT_ALGORITHMS):
length = os.path.getsize(path)
with open(path, "rb") as f:
ret = cls.from_file(f, hash_names=hash_names, length=length)
return ret
@classmethod
def from_data(cls, data, hash_names=DEFAULT_ALGORITHMS):
length = len(data)
fobj = BytesIO(data)
return cls.from_file(fobj, hash_names=hash_names, length=length)
def update(self, chunk):
for name, h in self.state.items():
if name == "length":
continue
h.update(chunk)
if self.track_length:
self.state["length"] += len(chunk)
def digest(self):
return {
name: h.digest() if name != "length" else h
for name, h in self.state.items()
}
def hexdigest(self):
return {
name: h.hexdigest() if name != "length" else h
for name, h in self.state.items()
}
def bytehexdigest(self):
return {
name: hash_to_bytehex(h.digest()) if name != "length" else h
for name, h in self.state.items()
}
def copy(self):
copied_state = {
name: h.copy() if name != "length" else h for name, h in self.state.items()
}
return self.from_state(copied_state, self.track_length)
def _new_blake2_hash(algo):
"""Return a function that initializes a blake2 hash."""
if algo in _blake2_hash_cache:
return _blake2_hash_cache[algo]()
lalgo = algo.lower()
if not lalgo.startswith('blake2'):
raise ValueError('Algorithm %s is not a blake2 hash' % algo)
if not lalgo.startswith("blake2"):
raise ValueError("Algorithm %s is not a blake2 hash" % algo)
blake_family = lalgo[:7]
......@@ -64,27 +177,14 @@ def _new_blake2_hash(algo):
try:
digest_size, remainder = divmod(int(lalgo[7:]), 8)
except ValueError:
raise ValueError(
'Unknown digest size for algo %s' % algo
) from None
raise ValueError("Unknown digest size for algo %s" % algo) from None
if remainder:
raise ValueError(
'Digest size for algorithm %s must be a multiple of 8' % algo
"Digest size for algorithm %s must be a multiple of 8" % algo
)
if lalgo in hashlib.algorithms_available:
# Handle the case where OpenSSL ships the given algorithm
# (e.g. Python 3.5 on Debian 9 stretch)
_blake2_hash_cache[algo] = lambda: hashlib.new(lalgo)
else:
# Try using the built-in implementation for Python 3.6+
if blake_family in hashlib.algorithms_available:
blake2 = getattr(hashlib, blake_family)
else:
import pyblake2
blake2 = getattr(pyblake2, blake_family)
_blake2_hash_cache[algo] = lambda: blake2(digest_size=digest_size)
blake2 = getattr(hashlib, blake_family)
_blake2_hash_cache[algo] = lambda: blake2(digest_size=digest_size)
return _blake2_hash_cache[algo]()
......@@ -94,18 +194,16 @@ def _new_hashlib_hash(algo):
Handle the swh-specific names for the blake2-related algorithms
"""
if algo.startswith('blake2'):
if algo.startswith("blake2"):
return _new_blake2_hash(algo)
else:
return hashlib.new(algo)
def _new_git_hash(base_algo, git_type, length):
"""Initialize a digest object (as returned by python's hashlib) for the
requested algorithm, and feed it with the header for a git object of the
given type and length.
def git_object_header(git_type: str, length: int) -> bytes:
"""Returns the header for a git object of the given type and length.
The header for hashing a git object consists of:
The header of a git object consists of:
- The type of the object (encoded in ASCII)
- One ASCII space (\x20)
- The length of the object (decimal encoded in ASCII)
......@@ -120,15 +218,26 @@ def _new_git_hash(base_algo, git_type, length):
Returns:
a hashutil.hash object
"""
git_object_types = {
"blob",
"tree",
"commit",
"tag",
"snapshot",
"raw_extrinsic_metadata",
"extid",
}
h = _new_hashlib_hash(base_algo)
git_header = '%s %d\0' % (git_type, length)
h.update(git_header.encode('ascii'))
if git_type not in git_object_types:
raise ValueError(
"Unexpected git object type %s, expected one of %s"
% (git_type, ", ".join(sorted(git_object_types)))
)
return h
return ("%s %d\0" % (git_type, length)).encode("ascii")
def _new_hash(algo, length=None):
def _new_hash(algo: str, length: Optional[int] = None):
"""Initialize a digest object (as returned by python's hashlib) for
the requested algorithm. See the constant ALGORITHMS for the list
of supported algorithms. If a git-specific hashing algorithm is
......@@ -150,93 +259,22 @@ def _new_hash(algo, length=None):
"""
if algo not in ALGORITHMS:
raise ValueError(
'Unexpected hashing algorithm %s, expected one of %s' %
(algo, ', '.join(sorted(ALGORITHMS))))
"Unexpected hashing algorithm %s, expected one of %s"
% (algo, ", ".join(sorted(ALGORITHMS)))
)
if algo.endswith('_git'):
if algo.endswith("_git"):
if length is None:
raise ValueError('Missing length for git hashing algorithm')
raise ValueError("Missing length for git hashing algorithm")
base_algo = algo[:-4]
return _new_git_hash(base_algo, 'blob', length)
h = _new_hashlib_hash(base_algo)
h.update(git_object_header("blob", length))
return h
return _new_hashlib_hash(algo)
def hash_file(fobj, length=None, algorithms=DEFAULT_ALGORITHMS, chunk_cb=None):
"""Hash the contents of the given file object with the given algorithms.
Args:
fobj: a file-like object
length: the length of the contents of the file-like object (for the
git-specific algorithms)
algorithms: the hashing algorithms to be used, as an iterable over
strings
Returns: a dict mapping each algorithm to a bytes digest.
Raises:
ValueError if algorithms contains an unknown hash algorithm.
"""
hashes = {algo: _new_hash(algo, length) for algo in algorithms}
while True:
chunk = fobj.read(HASH_BLOCK_SIZE)
if not chunk:
break
for hash in hashes.values():
hash.update(chunk)
if chunk_cb:
chunk_cb(chunk)
return {algo: hash.digest() for algo, hash in hashes.items()}
def hash_path(path, algorithms=DEFAULT_ALGORITHMS, chunk_cb=None):
"""Hash the contents of the file at the given path with the given
algorithms.
Args:
path: the path of the file to hash
algorithms: the hashing algorithms used
chunk_cb: a callback
Returns: a dict mapping each algorithm to a bytes digest.
Raises:
ValueError if algorithms contains an unknown hash algorithm.
OSError on file access error
"""
length = os.path.getsize(path)
with open(path, 'rb') as fobj:
hash = hash_file(fobj, length, algorithms, chunk_cb)
hash['length'] = length
return hash
def hash_data(data, algorithms=DEFAULT_ALGORITHMS, with_length=False):
"""Hash the given binary blob with the given algorithms.
Args:
data (bytes): raw content to hash
algorithms (list): the hashing algorithms used
with_length (bool): add the length key in the resulting dict
Returns: a dict mapping each algorithm to a bytes digest
Raises:
TypeError if data does not support the buffer interface.
ValueError if algorithms contains an unknown hash algorithm.
"""
fobj = BytesIO(data)
length = len(data)
data = hash_file(fobj, length, algorithms)
if with_length:
data['length'] = length
return data
def hash_git_data(data, git_type, base_algo='sha1'):
def hash_git_data(data, git_type, base_algo="sha1"):
"""Hash the given data as a git object of type git_type.
Args:
......@@ -249,21 +287,15 @@ def hash_git_data(data, git_type, base_algo='sha1'):
Raises:
ValueError if the git_type is unexpected.
"""
git_object_types = {'blob', 'tree', 'commit', 'tag', 'snapshot'}
if git_type not in git_object_types:
raise ValueError('Unexpected git object type %s, expected one of %s' %
(git_type, ', '.join(sorted(git_object_types))))
h = _new_git_hash(base_algo, git_type, len(data))
h = _new_hashlib_hash(base_algo)
h.update(git_object_header(git_type, len(data)))
h.update(data)
return h.digest()
@functools.lru_cache()
def hash_to_hex(hash):
def hash_to_hex(hash: Union[str, bytes]) -> str:
"""Converts a hash (in hex or bytes form) to its hexadecimal ascii form
Args:
......@@ -275,11 +307,11 @@ def hash_to_hex(hash):
"""
if isinstance(hash, str):
return hash
return binascii.hexlify(hash).decode('ascii')
return binascii.hexlify(hash).decode("ascii")
@functools.lru_cache()
def hash_to_bytehex(hash):
def hash_to_bytehex(hash: bytes) -> bytes:
"""Converts a hash to its hexadecimal bytes representation
Args:
......@@ -292,7 +324,7 @@ def hash_to_bytehex(hash):
@functools.lru_cache()
def hash_to_bytes(hash):
def hash_to_bytes(hash: Union[str, bytes]) -> bytes:
"""Converts a hash (in hex or bytes form) to its raw bytes form
Args:
......@@ -308,7 +340,7 @@ def hash_to_bytes(hash):
@functools.lru_cache()
def bytehex_to_hash(hex):
def bytehex_to_hash(hex: bytes) -> bytes:
"""Converts a hexadecimal bytes representation of a hash to that hash
Args:
......
This diff is collapsed.
# Copyright (C) 2017 The Software Heritage developers
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Merkle tree data structure"""
import abc
import collections
def deep_update(left, right):
"""Recursively update the left mapping with deeply nested values from the right
mapping.
This function is useful to merge the results of several calls to
:func:`MerkleNode.collect`.
Arguments:
left: a mapping (modified by the update operation)
right: a mapping
Returns:
the left mapping, updated with nested values from the right mapping
Example:
>>> a = {
... 'key1': {
... 'key2': {
... 'key3': 'value1/2/3',
... },
... },
... }
>>> deep_update(a, {
... 'key1': {
... 'key2': {
... 'key4': 'value1/2/4',
... },
... },
... }) == {
... 'key1': {
... 'key2': {
... 'key3': 'value1/2/3',
... 'key4': 'value1/2/4',
... },
... },
... }
True
>>> deep_update(a, {
... 'key1': {
... 'key2': {
... 'key3': 'newvalue1/2/3',
... },
... },
... }) == {
... 'key1': {
... 'key2': {
... 'key3': 'newvalue1/2/3',
... 'key4': 'value1/2/4',
... },
... },
... }
True
from __future__ import annotations
"""
for key, rvalue in right.items():
if isinstance(rvalue, collections.Mapping):
new_lvalue = deep_update(left.get(key, {}), rvalue)
left[key] = new_lvalue
else:
left[key] = rvalue
return left
import abc
from typing import Any, Dict, Iterator, List, Set
class MerkleNode(dict, metaclass=abc.ABCMeta):
......@@ -100,16 +39,18 @@ class MerkleNode(dict, metaclass=abc.ABCMeta):
The collection of updated data from the tree is implemented through the
:func:`collect` function and associated helpers.
Attributes:
data (dict): data associated to the current node
parents (list): known parents of the current node
collected (bool): whether the current node has been collected
"""
__slots__ = ['parents', 'data', '__hash', 'collected']
type = None
"""Type of the current node (used as a classifier for :func:`collect`)"""
__slots__ = ["parents", "data", "__hash", "collected"]
data: Dict
"""data associated to the current node"""
parents: List
"""known parents of the current node"""
collected: bool
"""whether the current node has been collected"""
def __init__(self, data=None):
super().__init__()
......@@ -118,6 +59,16 @@ class MerkleNode(dict, metaclass=abc.ABCMeta):
self.__hash = None
self.collected = False
def __eq__(self, other):
return (
isinstance(other, MerkleNode)
and super().__eq__(other)
and self.data == other.data
)
def __ne__(self, other):
return not self.__eq__(other)
def invalidate_hash(self):
"""Invalidate the cached hash of the current node."""
if not self.__hash:
......@@ -128,7 +79,7 @@ class MerkleNode(dict, metaclass=abc.ABCMeta):
for parent in self.parents:
parent.invalidate_hash()
def update_hash(self, *, force=False):
def update_hash(self, *, force=False) -> Any:
"""Recursively compute the hash of the current node.
Args:
......@@ -148,20 +99,23 @@ class MerkleNode(dict, metaclass=abc.ABCMeta):
return self.__hash
@property
def hash(self):
def hash(self) -> Any:
"""The hash of the current node, as calculated by
:func:`compute_hash`.
"""
return self.update_hash()
def __hash__(self):
return hash(self.hash)
@abc.abstractmethod
def compute_hash(self):
def compute_hash(self) -> Any:
"""Compute the hash of the current node.
The hash should depend on the data of the node, as well as on hashes
of the children nodes.
"""
raise NotImplementedError('Must implement compute_hash method')
raise NotImplementedError("Must implement compute_hash method")
def __setitem__(self, name, new_child):
"""Add a child, invalidating the current hash"""
......@@ -210,47 +164,24 @@ class MerkleNode(dict, metaclass=abc.ABCMeta):
"""
return self.data
def collect_node(self, **kwargs):
"""Collect the data for the current node, for use by :func:`collect`.
Arguments:
kwargs: passed as-is to :func:`get_data`.
Returns:
A :class:`dict` compatible with :func:`collect`.
"""
def collect_node(self) -> Set[MerkleNode]:
"""Collect the current node if it has not been yet, for use by :func:`collect`."""
if not self.collected:
self.collected = True
return {self.type: {self.hash: self.get_data(**kwargs)}}
return {self}
else:
return {}
def collect(self, **kwargs):
"""Collect the data for all nodes in the subtree rooted at `self`.
return set()
The data is deduplicated by type and by hash.
Arguments:
kwargs: passed as-is to :func:`get_data`.
def collect(self) -> Set[MerkleNode]:
"""Collect the added and modified nodes in the subtree rooted at `self`
since the last collect operation.
Returns:
A :class:`dict` with the following structure::
{
'typeA': {
node1.hash: node1.get_data(),
node2.hash: node2.get_data(),
},
'typeB': {
node3.hash: node3.get_data(),
...
},
...
}
A :class:`set` of collected nodes
"""
ret = self.collect_node(**kwargs)
ret = self.collect_node()
for child in self.values():
deep_update(ret, child.collect(**kwargs))
ret.update(child.collect())
return ret
......@@ -264,23 +195,39 @@ class MerkleNode(dict, metaclass=abc.ABCMeta):
for child in self.values():
child.reset_collect()
def iter_tree(self, dedup=True) -> Iterator[MerkleNode]:
"""Yields all children nodes, recursively. Common nodes are deduplicated
by default (deduplication can be turned off setting the given argument
'dedup' to False).
"""
yield from self._iter_tree(seen=set(), dedup=dedup)
def _iter_tree(self, seen: Set[bytes], dedup) -> Iterator[MerkleNode]:
if self.hash not in seen:
if dedup:
seen.add(self.hash)
yield self
for child in self.values():
yield from child._iter_tree(seen=seen, dedup=dedup)
class MerkleLeaf(MerkleNode):
"""A leaf to a Merkle tree.
A Merkle leaf is simply a Merkle node with children disabled.
"""
__slots__ = []
__slots__: List[str] = []
def __setitem__(self, name, child):
raise ValueError('%s is a leaf' % self.__class__.__name__)
raise ValueError("%s is a leaf" % self.__class__.__name__)
def __getitem__(self, name):
raise ValueError('%s is a leaf' % self.__class__.__name__)
raise ValueError("%s is a leaf" % self.__class__.__name__)
def __delitem__(self, name):
raise ValueError('%s is a leaf' % self.__class__.__name__)
raise ValueError("%s is a leaf" % self.__class__.__name__)
def update(self, new_children):
"""Children update operation. Disabled for leaves."""
raise ValueError('%s is a leaf' % self.__class__.__name__)
raise ValueError("%s is a leaf" % self.__class__.__name__)
This diff is collapsed.
# Marker file for PEP 561.
This diff is collapsed.
File added
File added
This diff is collapsed.
......@@ -5,8 +5,6 @@
import unittest
from nose.tools import istest
from swh.model.exceptions import ValidationError
from swh.model.fields import hashes
......@@ -14,149 +12,135 @@ from swh.model.fields import hashes
class ValidateHashes(unittest.TestCase):
def setUp(self):
self.valid_byte_hashes = {
'sha1': b'\xf1\xd2\xd2\xf9\x24\xe9\x86\xac\x86\xfd\xf7\xb3\x6c\x94'
b'\xbc\xdf\x32\xbe\xec\x15',
'sha1_git': b'\x25\x7c\xc5\x64\x2c\xb1\xa0\x54\xf0\x8c\xc8\x3f\x2d'
b'\x94\x3e\x56\xfd\x3e\xbe\x99',
'sha256': b'\xb5\xbb\x9d\x80\x14\xa0\xf9\xb1\xd6\x1e\x21\xe7\x96'
b'\xd7\x8d\xcc\xdf\x13\x52\xf2\x3c\xd3\x28\x12\xf4\x85'
b'\x0b\x87\x8a\xe4\x94\x4c',
"sha1": b"\xf1\xd2\xd2\xf9\x24\xe9\x86\xac\x86\xfd\xf7\xb3\x6c\x94"
b"\xbc\xdf\x32\xbe\xec\x15",
"sha1_git": b"\x25\x7c\xc5\x64\x2c\xb1\xa0\x54\xf0\x8c\xc8\x3f\x2d"
b"\x94\x3e\x56\xfd\x3e\xbe\x99",
"sha256": b"\xb5\xbb\x9d\x80\x14\xa0\xf9\xb1\xd6\x1e\x21\xe7\x96"
b"\xd7\x8d\xcc\xdf\x13\x52\xf2\x3c\xd3\x28\x12\xf4\x85"
b"\x0b\x87\x8a\xe4\x94\x4c",
}
self.valid_str_hashes = {
'sha1': 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15',
'sha1_git': '257cc5642cb1a054f08cc83f2d943e56fd3ebe99',
'sha256': 'b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f485'
'0b878ae4944c',
"sha1": "f1d2d2f924e986ac86fdf7b36c94bcdf32beec15",
"sha1_git": "257cc5642cb1a054f08cc83f2d943e56fd3ebe99",
"sha256": "b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f485"
"0b878ae4944c",
}
self.bad_hash = object()
@istest
def valid_bytes_hash(self):
def test_valid_bytes_hash(self):
for hash_type, value in self.valid_byte_hashes.items():
self.assertTrue(hashes.validate_hash(value, hash_type))
@istest
def valid_str_hash(self):
def test_valid_str_hash(self):
for hash_type, value in self.valid_str_hashes.items():
self.assertTrue(hashes.validate_hash(value, hash_type))
@istest
def invalid_hash_type(self):
hash_type = 'unknown_hash_type'
def test_invalid_hash_type(self):
hash_type = "unknown_hash_type"
with self.assertRaises(ValidationError) as cm:
hashes.validate_hash(self.valid_str_hashes['sha1'], hash_type)
hashes.validate_hash(self.valid_str_hashes["sha1"], hash_type)
exc = cm.exception
self.assertIsInstance(str(exc), str)
self.assertEqual(exc.code, 'unexpected-hash-type')
self.assertEqual(exc.params['hash_type'], hash_type)
self.assertEqual(exc.code, "unexpected-hash-type")
self.assertEqual(exc.params["hash_type"], hash_type)
self.assertIn('Unexpected hash type', str(exc))
self.assertIn("Unexpected hash type", str(exc))
self.assertIn(hash_type, str(exc))
@istest
def invalid_bytes_len(self):
def test_invalid_bytes_len(self):
for hash_type, value in self.valid_byte_hashes.items():
value = value + b'\x00\x01'
value = value + b"\x00\x01"
with self.assertRaises(ValidationError) as cm:
hashes.validate_hash(value, hash_type)
exc = cm.exception
self.assertIsInstance(str(exc), str)
self.assertEqual(exc.code, 'unexpected-hash-length')
self.assertEqual(exc.params['hash_type'], hash_type)
self.assertEqual(exc.params['length'], len(value))
self.assertEqual(exc.code, "unexpected-hash-length")
self.assertEqual(exc.params["hash_type"], hash_type)
self.assertEqual(exc.params["length"], len(value))
self.assertIn('Unexpected length', str(exc))
self.assertIn("Unexpected length", str(exc))
self.assertIn(str(len(value)), str(exc))
@istest
def invalid_str_len(self):
def test_invalid_str_len(self):
for hash_type, value in self.valid_str_hashes.items():
value = value + '0001'
value = value + "0001"
with self.assertRaises(ValidationError) as cm:
hashes.validate_hash(value, hash_type)
exc = cm.exception
self.assertIsInstance(str(exc), str)
self.assertEqual(exc.code, 'unexpected-hash-length')
self.assertEqual(exc.params['hash_type'], hash_type)
self.assertEqual(exc.params['length'], len(value))
self.assertEqual(exc.code, "unexpected-hash-length")
self.assertEqual(exc.params["hash_type"], hash_type)
self.assertEqual(exc.params["length"], len(value))
self.assertIn('Unexpected length', str(exc))
self.assertIn("Unexpected length", str(exc))
self.assertIn(str(len(value)), str(exc))
@istest
def invalid_str_contents(self):
def test_invalid_str_contents(self):
for hash_type, value in self.valid_str_hashes.items():
value = '\xa2' + value[1:-1] + '\xc3'
value = "\xa2" + value[1:-1] + "\xc3"
with self.assertRaises(ValidationError) as cm:
hashes.validate_hash(value, hash_type)
exc = cm.exception
self.assertIsInstance(str(exc), str)
self.assertEqual(exc.code, 'unexpected-hash-contents')
self.assertEqual(exc.params['hash_type'], hash_type)
self.assertEqual(exc.params['unexpected_chars'], '\xa2, \xc3')
self.assertEqual(exc.code, "unexpected-hash-contents")
self.assertEqual(exc.params["hash_type"], hash_type)
self.assertEqual(exc.params["unexpected_chars"], "\xa2, \xc3")
self.assertIn('Unexpected characters', str(exc))
self.assertIn('\xc3', str(exc))
self.assertIn('\xa2', str(exc))
self.assertIn("Unexpected characters", str(exc))
self.assertIn("\xc3", str(exc))
self.assertIn("\xa2", str(exc))
@istest
def invalid_value_type(self):
def test_invalid_value_type(self):
with self.assertRaises(ValidationError) as cm:
hashes.validate_hash(self.bad_hash, 'sha1')
hashes.validate_hash(self.bad_hash, "sha1")
exc = cm.exception
self.assertIsInstance(str(exc), str)
self.assertEqual(exc.code, 'unexpected-hash-value-type')
self.assertEqual(exc.params['type'], self.bad_hash.__class__.__name__)
self.assertEqual(exc.code, "unexpected-hash-value-type")
self.assertEqual(exc.params["type"], self.bad_hash.__class__.__name__)
self.assertIn('Unexpected type', str(exc))
self.assertIn("Unexpected type", str(exc))
self.assertIn(self.bad_hash.__class__.__name__, str(exc))
@istest
def validate_sha1(self):
self.assertTrue(hashes.validate_sha1(self.valid_byte_hashes['sha1']))
self.assertTrue(hashes.validate_sha1(self.valid_str_hashes['sha1']))
def test_validate_sha1(self):
self.assertTrue(hashes.validate_sha1(self.valid_byte_hashes["sha1"]))
self.assertTrue(hashes.validate_sha1(self.valid_str_hashes["sha1"]))
with self.assertRaises(ValidationError) as cm:
hashes.validate_sha1(self.bad_hash)
exc = cm.exception
self.assertIsInstance(str(exc), str)
self.assertEqual(exc.code, 'unexpected-hash-value-type')
self.assertEqual(exc.params['type'], self.bad_hash.__class__.__name__)
self.assertEqual(exc.code, "unexpected-hash-value-type")
self.assertEqual(exc.params["type"], self.bad_hash.__class__.__name__)
@istest
def validate_sha1_git(self):
self.assertTrue(
hashes.validate_sha1_git(self.valid_byte_hashes['sha1_git']))
self.assertTrue(
hashes.validate_sha1_git(self.valid_str_hashes['sha1_git']))
def test_validate_sha1_git(self):
self.assertTrue(hashes.validate_sha1_git(self.valid_byte_hashes["sha1_git"]))
self.assertTrue(hashes.validate_sha1_git(self.valid_str_hashes["sha1_git"]))
with self.assertRaises(ValidationError) as cm:
hashes.validate_sha1_git(self.bad_hash)
exc = cm.exception
self.assertIsInstance(str(exc), str)
self.assertEqual(exc.code, 'unexpected-hash-value-type')
self.assertEqual(exc.params['type'], self.bad_hash.__class__.__name__)
self.assertEqual(exc.code, "unexpected-hash-value-type")
self.assertEqual(exc.params["type"], self.bad_hash.__class__.__name__)
@istest
def validate_sha256(self):
self.assertTrue(
hashes.validate_sha256(self.valid_byte_hashes['sha256']))
self.assertTrue(
hashes.validate_sha256(self.valid_str_hashes['sha256']))
def test_validate_sha256(self):
self.assertTrue(hashes.validate_sha256(self.valid_byte_hashes["sha256"]))
self.assertTrue(hashes.validate_sha256(self.valid_str_hashes["sha256"]))
with self.assertRaises(ValidationError) as cm:
hashes.validate_sha256(self.bad_hash)
exc = cm.exception
self.assertIsInstance(str(exc), str)
self.assertEqual(exc.code, 'unexpected-hash-value-type')
self.assertEqual(exc.params['type'], self.bad_hash.__class__.__name__)
self.assertEqual(exc.code, "unexpected-hash-value-type")
self.assertEqual(exc.params["type"], self.bad_hash.__class__.__name__)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.