diff --git a/swh/model/tests/test_validators.py b/swh/model/tests/test_validators.py new file mode 100644 index 0000000000000000000000000000000000000000..6f2865888351584955288927f6b90899165793d6 --- /dev/null +++ b/swh/model/tests/test_validators.py @@ -0,0 +1,75 @@ +# Copyright (C) 2015 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import unittest + +from nose.tools import istest + +from swh.model import validators, hashutil, exceptions + + +class TestValidators(unittest.TestCase): + def setUp(self): + self.valid_visible_content = { + 'status': 'visible', + 'length': 5, + 'data': b'1984\n', + 'ctime': datetime.datetime(2015, 11, 22, 16, 33, 56, + tzinfo=datetime.timezone.utc), + } + + self.valid_visible_content.update( + hashutil.hash_data(self.valid_visible_content['data'])) + + self.valid_absent_content = { + 'status': 'absent', + 'length': 5, + 'ctime': datetime.datetime(2015, 11, 22, 16, 33, 56, + tzinfo=datetime.timezone.utc), + 'reason': 'Content too large', + 'sha1_git': self.valid_visible_content['sha1_git'], + 'origin': 42, + } + + self.invalid_content_hash_mismatch = self.valid_visible_content.copy() + self.invalid_content_hash_mismatch.update( + hashutil.hash_data(b"this is not the data you're looking for")) + + @istest + def validate_content(self): + self.assertTrue( + validators.validate_content(self.valid_visible_content)) + + self.assertTrue( + validators.validate_content(self.valid_absent_content)) + + @istest + def validate_content_hash_mismatch(self): + with self.assertRaises(exceptions.ValidationError) as cm: + validators.validate_content(self.invalid_content_hash_mismatch) + + # All the hashes are wrong. The exception should be of the form: + # ValidationError({ + # NON_FIELD_ERRORS: [ + # ValidationError('content-hash-mismatch', 'sha1'), + # ValidationError('content-hash-mismatch', 'sha1_git'), + # ValidationError('content-hash-mismatch', 'sha256'), + # ] + # }) + + exc = cm.exception + self.assertIsInstance(str(exc), str) + self.assertEquals(set(exc.error_dict.keys()), + {exceptions.NON_FIELD_ERRORS}) + + hash_mismatches = exc.error_dict[exceptions.NON_FIELD_ERRORS] + self.assertIsInstance(hash_mismatches, list) + self.assertEqual(len(hash_mismatches), 3) + self.assertTrue(all(mismatch.code == 'content-hash-mismatch' + for mismatch in hash_mismatches)) + self.assertEqual(set(mismatch.params['hash'] + for mismatch in hash_mismatches), + {'sha1', 'sha1_git', 'sha256'}) diff --git a/swh/model/validators.py b/swh/model/validators.py index f948ab1e13401f4bf467b8d41e3a7e511f20d045..cb2e2770403e5018be9df7d21f2ef14b0de8e72f 100644 --- a/swh/model/validators.py +++ b/swh/model/validators.py @@ -3,8 +3,10 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import binascii + from .exceptions import ValidationError, NON_FIELD_ERRORS -from . import fields +from . import fields, hashutil def validate_content(content): @@ -19,9 +21,11 @@ def validate_content(content): hashes = {'sha1', 'sha1_git', 'sha256'} errors = [] + out = True if content['status'] == 'absent': try: - out = fields.validate_all_keys(content, {'reason', 'origin'}) + out = out and fields.validate_all_keys(content, {'reason', + 'origin'}) except ValidationError as e: errors.append(e) try: @@ -30,7 +34,7 @@ def validate_content(content): errors.append(e) else: try: - out = fields.validate_all_keys(content, hashes) + out = out and fields.validate_all_keys(content, hashes) except ValidationError as e: errors.append(e) @@ -39,6 +43,27 @@ def validate_content(content): return out + def validate_hashes(content): + errors = [] + if 'data' in content: + hashes = hashutil.hash_data(content['data']) + for hash_type, computed_hash in hashes.items(): + if hash_type not in content: + continue + content_hash = content[hash_type] + if isinstance(content_hash, bytes): + content_hash = binascii.hexlify(content_hash).decode() + if content_hash != computed_hash: + errors.append(ValidationError( + 'hash mismatch in content for hash %(hash)s', + params={'hash': hash_type}, + code='content-hash-mismatch', + )) + if errors: + raise ValidationError(errors) + + return True + content_schema = { 'sha1': (False, fields.validate_sha1), 'sha1_git': (False, fields.validate_sha1_git), @@ -48,7 +73,8 @@ def validate_content(content): 'ctime': (True, fields.validate_datetime), 'reason': (False, fields.validate_str), 'origin': (False, fields.validate_int), - NON_FIELD_ERRORS: validate_keys, + 'data': (False, fields.validate_bytes), + NON_FIELD_ERRORS: [validate_keys, validate_hashes], } return fields.validate_against_schema('content', content_schema, content)