Skip to content
Snippets Groups Projects

model.hashutil: Open new endpoint to allow to hash stream

2 files
+ 245
35
Compare changes
  • Side-by-side
  • Inline
Files
2
# Copyright (C) 2015-2017 The Software Heritage developers
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -13,9 +13,10 @@ from nose.tools import istest
from unittest.mock import patch
from swh.model import hashutil
from swh.model.hashutil import MultiHash
class Hashutil(unittest.TestCase):
class BaseHashutil(unittest.TestCase):
def setUp(self):
# Reset function cache
hashutil._blake2_hash_cache = {}
@@ -35,6 +36,11 @@ class Hashutil(unittest.TestCase):
for type, cksum in self.hex_checksums.items()
}
self.bytehex_checksums = {
type: hashutil.hash_to_bytehex(cksum)
for type, cksum in self.checksums.items()
}
self.git_hex_checksums = {
'blob': self.hex_checksums['sha1_git'],
'tree': '5b2e883aa33d2efab98442693ea4dd5f1b8871b0',
@@ -47,6 +53,75 @@ class Hashutil(unittest.TestCase):
for type, cksum in self.git_hex_checksums.items()
}
class MultiHashTest(BaseHashutil):
@istest
def multi_hash_data(self):
checksums = MultiHash.from_data(self.data).digest()
self.assertEqual(checksums, self.checksums)
self.assertFalse('length' in checksums)
@istest
def multi_hash_data_with_length(self):
expected_checksums = self.checksums.copy()
expected_checksums['length'] = len(self.data)
algos = set(['length']).union(hashutil.DEFAULT_ALGORITHMS)
checksums = MultiHash.from_data(self.data, hash_names=algos).digest()
self.assertEqual(checksums, expected_checksums)
self.assertTrue('length' in checksums)
@istest
def multi_hash_data_unknown_hash(self):
with self.assertRaises(ValueError) as cm:
MultiHash.from_data(self.data, ['unknown-hash'])
self.assertIn('Unexpected hashing algorithm', cm.exception.args[0])
self.assertIn('unknown-hash', cm.exception.args[0])
@istest
def multi_hash_file(self):
fobj = io.BytesIO(self.data)
checksums = MultiHash.from_file(fobj, length=len(self.data)).digest()
self.assertEqual(checksums, self.checksums)
@istest
def multi_hash_file_hexdigest(self):
fobj = io.BytesIO(self.data)
length = len(self.data)
checksums = MultiHash.from_file(fobj, length=length).hexdigest()
self.assertEqual(checksums, self.hex_checksums)
@istest
def multi_hash_file_bytehexdigest(self):
fobj = io.BytesIO(self.data)
length = len(self.data)
checksums = MultiHash.from_file(fobj, length=length).bytehexdigest()
self.assertEqual(checksums, self.bytehex_checksums)
@istest
def multi_hash_file_missing_length(self):
fobj = io.BytesIO(self.data)
with self.assertRaises(ValueError) as cm:
MultiHash.from_file(fobj, hash_names=['sha1_git'])
self.assertIn('Missing length', cm.exception.args[0])
@istest
def multi_hash_path(self):
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(self.data)
hashes = MultiHash.from_path(f.name).digest()
os.remove(f.name)
self.checksums['length'] = len(self.data)
self.assertEquals(self.checksums, hashes)
class Hashutil(BaseHashutil):
@istest
def hash_data(self):
checksums = hashutil.hash_data(self.data)
@@ -58,7 +133,8 @@ class Hashutil(unittest.TestCase):
expected_checksums = self.checksums.copy()
expected_checksums['length'] = len(self.data)
checksums = hashutil.hash_data(self.data, with_length=True)
algos = set(['length']).union(hashutil.DEFAULT_ALGORITHMS)
checksums = hashutil.hash_data(self.data, algorithms=algos)
self.assertEqual(checksums, expected_checksums)
self.assertTrue('length' in checksums)
Loading