Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • anlambert/swh-model
  • lunar/swh-model
  • franckbret/swh-model
  • douardda/swh-model
  • olasd/swh-model
  • swh/devel/swh-model
  • Alphare/swh-model
  • samplet/swh-model
  • marmoute/swh-model
  • rboyer/swh-model
10 results
Show changes
Showing
with 5638 additions and 1698 deletions
......@@ -13,16 +13,16 @@ def validate_type(value, type):
"""Validate that value is an integer"""
if not isinstance(value, type):
if isinstance(type, tuple):
typestr = 'one of %s' % ', '.join(typ.__name__ for typ in type)
typestr = "one of %s" % ", ".join(typ.__name__ for typ in type)
else:
typestr = type.__name__
raise ValidationError(
'Unexpected type %(type)s, expected %(expected_type)s',
"Unexpected type %(type)s, expected %(expected_type)s",
params={
'type': value.__class__.__name__,
'expected_type': typestr,
"type": value.__class__.__name__,
"expected_type": typestr,
},
code='unexpected-type'
code="unexpected-type",
)
return True
......@@ -54,10 +54,12 @@ def validate_datetime(value):
errors.append(e)
if isinstance(value, datetime.datetime) and value.tzinfo is None:
errors.append(ValidationError(
'Datetimes must be timezone-aware in swh',
code='datetime-without-tzinfo',
))
errors.append(
ValidationError(
"Datetimes must be timezone-aware in swh",
code="datetime-without-tzinfo",
)
)
if errors:
raise ValidationError(errors)
......@@ -69,12 +71,12 @@ def validate_enum(value, expected_values):
"""Validate that value is contained in expected_values"""
if value not in expected_values:
raise ValidationError(
'Unexpected value %(value)s, expected one of %(expected_values)s',
"Unexpected value %(value)s, expected one of %(expected_values)s",
params={
'value': value,
'expected_values': ', '.join(sorted(expected_values)),
"value": value,
"expected_values": ", ".join(sorted(expected_values)),
},
code='unexpected-value',
code="unexpected-value",
)
return True
This diff is collapsed.
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import stat
from enum import Enum, IntEnum
from swh.model import hashutil, identifiers
ROOT_TREE_KEY = b''
class GitType(Enum):
BLOB = b'blob'
TREE = b'tree'
EXEC = b'exec'
LINK = b'link'
COMM = b'commit'
RELE = b'release'
REFS = b'ref'
class GitPerm(IntEnum):
BLOB = 0o100644
TREE = 0o040000
EXEC = 0o100755
LINK = 0o120000
def _compute_directory_git_sha1(hashes):
"""Compute a directory git sha1 from hashes.
Args:
hashes (list): list of tree entries with the following keys:
- sha1_git: the tree entry's sha1
- name: file or subdir's name
- perms: the tree entry's sha1 permissions
Returns:
the binary sha1 of the dictionary's identifier
Assumes:
Every path exists in hashes.
"""
directory = {
'entries':
[
{
'name': entry['name'],
'perms': entry['perms'],
'target': entry['sha1_git'],
'type': 'dir' if entry['perms'] == GitPerm.TREE else 'file',
}
for entry in hashes
]
}
return hashutil.hash_to_bytes(identifiers.directory_identifier(directory))
def compute_directory_git_sha1(dirpath, hashes):
"""Compute a directory git sha1 for a dirpath.
Args:
dirpath: the directory's absolute path
hashes (list): list of tree entries with keys:
- sha1_git: the tree entry's sha1
- name: file or subdir's name
- perms: the tree entry's sha1 permissions
Returns:
the binary sha1 of the dictionary's identifier
Assumes:
Every path exists in hashes.
"""
return _compute_directory_git_sha1(hashes[dirpath])
def compute_revision_sha1_git(revision):
"""Compute a revision sha1 git from its dict representation.
Args:
revision: Additional dictionary information needed to compute a
synthetic revision. The following keys are expected:
- author
- date
- committer
- committer_date
- message
- type
- directory: binary form of the tree hash
Returns:
revision sha1 in bytes
# FIXME: beware, bytes output from storage api
"""
return hashutil.hash_to_bytes(identifiers.revision_identifier(revision))
def compute_release_sha1_git(release):
"""Compute a release sha1 git from its dict representation.
Args:
release: Additional dictionary information needed to compute a
synthetic release. Following keys are expected:
- name
- message
- date
- author
- revision: binary form of the sha1_git revision targeted by this
Returns:
release sha1 in bytes
"""
return hashutil.hash_to_bytes(identifiers.release_identifier(release))
def compute_link_metadata(linkpath):
"""Given a linkpath, compute the git metadata.
Args:
linkpath: absolute pathname of the link
Returns:
dict: Dictionary of values with the following keys:
- data: link's content
- length: link's content length
- name: basename of the link
- perms: git permission for link
- type: git type for link
- path: absolute path to the link on filesystem
"""
data = os.readlink(linkpath)
link_metadata = hashutil.hash_data(data)
link_metadata.update({
'data': data,
'length': len(data),
'name': os.path.basename(linkpath),
'perms': GitPerm.LINK,
'type': GitType.BLOB,
'path': linkpath
})
return link_metadata
def compute_blob_metadata(filepath):
"""Given a filepath resolving to a regular file, compute the metadata.
Other file types (fifo, character or block device, symlink) will
be considered empty regular file. To deal properly with symlinks,
use swh.model.git.compute_link_metadata.
Args:
filepath: absolute pathname of the regular file.
Returns:
dict: Dictionary of values with the following keys:
- name: basename of the file
- length: data length
- perms: git permission for file
- type: git type for file
- path: absolute filepath on filesystem
"""
mode = os.lstat(filepath).st_mode
if not stat.S_ISREG(mode): # special (block or character device, fifo)
perms = GitPerm.BLOB
blob_metadata = hashutil.hash_data(b'')
blob_metadata['length'] = 0
else:
perms = GitPerm.EXEC if os.access(filepath, os.X_OK) else GitPerm.BLOB
blob_metadata = hashutil.hash_path(filepath)
blob_metadata.update({
'name': os.path.basename(filepath),
'perms': perms,
'type': GitType.BLOB,
'path': filepath
})
return blob_metadata
def _compute_tree_metadata(dirname, hashes):
"""Given a dirname, compute the git metadata.
Args:
dirname: absolute pathname of the directory.
hashes (list): list of tree dirname's entries with keys:
- sha1_git: the tree entry's sha1
- name: file or subdir's name
- perms: the tree entry's sha1 permissions
Returns:
dict: Dictionary of values with the following keys:
- sha1_git: tree's sha1 git
- name: basename of the directory
- perms: git permission for directory
- type: git type for directory
- path: absolute path to directory on filesystem
"""
return {
'sha1_git': _compute_directory_git_sha1(hashes),
'name': os.path.basename(dirname),
'perms': GitPerm.TREE,
'type': GitType.TREE,
'path': dirname
}
def compute_tree_metadata(dirname, ls_hashes):
"""Given a dirname, compute the git metadata.
Args:
dirname: absolute pathname of the directory.
ls_hashes: dictionary of path, hashes
Returns:
dict: Dictionary of values with the following keys:
- sha1_git: tree's sha1 git
- name: basename of the directory
- perms: git permission for directory
- type: git type for directory
- path: absolute path to directory on filesystem
"""
return _compute_tree_metadata(dirname, ls_hashes[dirname])
def default_validation_dir(dirpath):
"""Default validation function.
This is the equivalent of the identity function.
Args:
dirpath: Path to validate
Returns: True
"""
return True
def _walk(rootdir,
dir_ok_fn=default_validation_dir,
remove_empty_folder=False):
"""Walk the filesystem and yields a 3 tuples (dirpath, dirnames as set
of absolute paths, filenames as set of abslute paths)
Ignore files which won't pass the dir_ok_fn validation.
If remove_empty_folder is True, remove and ignore any
encountered empty folder.
Args:
- rootdir: starting walk root directory path
- dir_ok_fn: validation function. if folder encountered are not ok,
they are ignored. Default to default_validation_dir which does
nothing.
- remove_empty_folder: Flag to remove and ignore any encountered empty
folders.
Yields:
3 tuples dirpath, set of absolute children dirname paths, set
of absolute filename paths.
"""
def basic_gen_dir(rootdir):
for dp, dns, fns in os.walk(rootdir, topdown=False):
yield (dp,
set((os.path.join(dp, dn) for dn in dns)),
set((os.path.join(dp, fn) for fn in fns)))
if dir_ok_fn == default_validation_dir:
if not remove_empty_folder: # os.walk
yield from basic_gen_dir(rootdir)
else: # os.walk + empty dir cleanup
empty_folders = set()
for dp, dns, fns in basic_gen_dir(rootdir):
if not dns and not fns:
empty_folders.add(dp)
# need to remove it because folder of empty folder
# is an empty folder!!!
if os.path.islink(dp):
os.remove(dp)
else:
os.rmdir(dp)
parent = os.path.dirname(dp)
# edge case about parent containing one empty
# folder which become an empty one
while not os.listdir(parent):
empty_folders.add(parent)
if os.path.islink(parent):
os.remove(parent)
else:
os.rmdir(parent)
parent = os.path.dirname(parent)
continue
yield (dp, dns - empty_folders, fns)
else:
def filtfn(dirnames):
return set(filter(dir_ok_fn, dirnames))
gen_dir = ((dp, dns, fns) for dp, dns, fns
in basic_gen_dir(rootdir) if dir_ok_fn(dp))
if not remove_empty_folder: # os.walk + filtering
for dp, dns, fns in gen_dir:
yield (dp, filtfn(dns), fns)
else: # os.walk + filtering + empty dir cleanup
empty_folders = set()
for dp, dns, fns in gen_dir:
dps = filtfn(dns)
if not dps and not fns:
empty_folders.add(dp)
# need to remove it because folder of empty folder
# is an empty folder!!!
if os.path.islink(dp):
os.remove(dp)
else:
os.rmdir(dp)
parent = os.path.dirname(dp)
# edge case about parent containing one empty
# folder which become an empty one
while not os.listdir(parent):
empty_folders.add(parent)
if os.path.islink(parent):
os.remove(parent)
else:
os.rmdir(parent)
parent = os.path.dirname(parent)
continue
yield dp, dps - empty_folders, fns
def walk_and_compute_sha1_from_directory(rootdir,
dir_ok_fn=default_validation_dir,
with_root_tree=True,
remove_empty_folder=False):
"""(Deprecated) TODO migrate the code to
compute_hashes_from_directory.
Compute git sha1 from directory rootdir.
Args:
rootdir: Root directory from which beginning the git hash computation
dir_ok_fn: Filter function to filter directory according to rules
defined in the function. By default, all folders are ok. Example
override: ``dir_ok_fn = lambda dirpath: b'svn' not in dirpath``
with_root_tree: Determine if we compute the upper root tree's
checksums. As a default, we want it. One possible use case where
this is not useful is the update (cf. `update_checksums_from`)
Returns:
dict: Dictionary of entries with keys <path-name> and as values a list
of directory entries. Those are list of dictionary with keys:
- perms
- type
- name
- sha1_git
- and specifically for content: sha1, sha256, etc.
Note:
One special key is ROOT_TREE_KEY to indicate the upper root of the
directory (this is the revision's directory).
"""
ls_hashes = {}
all_links = set()
if rootdir.endswith(b'/'):
rootdir = rootdir.rstrip(b'/')
for dirpath, dirnames, filenames in _walk(
rootdir, dir_ok_fn, remove_empty_folder):
hashes = []
links = (file
for file in filenames.union(dirnames)
if os.path.islink(file))
for linkpath in links:
all_links.add(linkpath)
m_hashes = compute_link_metadata(linkpath)
hashes.append(m_hashes)
for filepath in (file for file in filenames if file not in all_links):
m_hashes = compute_blob_metadata(filepath)
hashes.append(m_hashes)
ls_hashes[dirpath] = hashes
dir_hashes = []
for fulldirname in (dir for dir in dirnames if dir not in all_links):
tree_hash = _compute_tree_metadata(fulldirname,
ls_hashes[fulldirname])
dir_hashes.append(tree_hash)
ls_hashes[dirpath].extend(dir_hashes)
if with_root_tree:
# compute the current directory hashes
root_hash = {
'sha1_git': _compute_directory_git_sha1(ls_hashes[rootdir]),
'path': rootdir,
'name': os.path.basename(rootdir),
'perms': GitPerm.TREE,
'type': GitType.TREE
}
ls_hashes[ROOT_TREE_KEY] = [root_hash]
return ls_hashes
def compute_hashes_from_directory(rootdir,
dir_ok_fn=default_validation_dir,
remove_empty_folder=False):
"""Compute git sha1 from directory rootdir.
Args:
rootdir: Root directory from which beginning the git hash
computation
dir_ok_fn: Filter function to filter directory according to rules
defined in the function. By default, all folders are ok. Example
override: ``dir_ok_fn = lambda dirpath: b'svn' not in dirpath``
Returns:
dict: Dictionary of entries with keys absolute path name.
Path-name can be a file/link or directory.
The associated value is a dictionary with keys:
- checksums: the dictionary with the hashes for the link/file/dir
Those are list of dictionary with keys:
- 'perms'
- 'type'
- 'name'
- 'sha1_git'
- and specifically for content: sha1, sha256, etc.
- children: Only for a directory, the set of children paths
Note:
One special key is the / which indicates the upper root of the
directory (this is the revision's directory).
"""
def _get_dict_from_dirpath(_dict, path):
"""Retrieve the default associated value for key path.
"""
return _dict.get(path, dict(children=set(), checksums=None))
def _get_dict_from_filepath(_dict, path):
"""Retrieve the default associated value for key path.
"""
return _dict.get(path, dict(checksums=None))
ls_hashes = {}
all_links = set()
if rootdir.endswith(b'/'):
rootdir = rootdir.rstrip(b'/')
for dirpath, dirnames, filenames in _walk(
rootdir, dir_ok_fn, remove_empty_folder):
dir_entry = _get_dict_from_dirpath(ls_hashes, dirpath)
children = dir_entry['children']
links = (file
for file in filenames.union(dirnames)
if os.path.islink(file))
for linkpath in links:
all_links.add(linkpath)
m_hashes = compute_link_metadata(linkpath)
d = _get_dict_from_filepath(ls_hashes, linkpath)
d['checksums'] = m_hashes
ls_hashes[linkpath] = d
children.add(linkpath)
for filepath in (file for file in filenames if file not in all_links):
m_hashes = compute_blob_metadata(filepath)
d = _get_dict_from_filepath(ls_hashes, filepath)
d['checksums'] = m_hashes
ls_hashes[filepath] = d
children.add(filepath)
for fulldirname in (dir for dir in dirnames if dir not in all_links):
d_hashes = _get_dict_from_dirpath(ls_hashes, fulldirname)
tree_hash = _compute_tree_metadata(
fulldirname,
(ls_hashes[p]['checksums'] for p in d_hashes['children'])
)
d = _get_dict_from_dirpath(ls_hashes, fulldirname)
d['checksums'] = tree_hash
ls_hashes[fulldirname] = d
children.add(fulldirname)
dir_entry['children'] = children
ls_hashes[dirpath] = dir_entry
# compute the current directory hashes
d_hashes = _get_dict_from_dirpath(ls_hashes, rootdir)
root_hash = {
'sha1_git': _compute_directory_git_sha1(
(ls_hashes[p]['checksums'] for p in d_hashes['children'])
),
'path': rootdir,
'name': os.path.basename(rootdir),
'perms': GitPerm.TREE,
'type': GitType.TREE
}
d_hashes['checksums'] = root_hash
ls_hashes[rootdir] = d_hashes
return ls_hashes
def children_hashes(children, objects):
"""Given a collection of children path, yield the corresponding
hashes.
Args:
objects: objects hash as returned by git.compute_hashes_from_directory
children: collection of bytes path
Yields:
Dictionary hashes
"""
for p in children:
c = objects.get(p)
if c:
h = c.get('checksums')
if h:
yield h
def objects_per_type(filter_type, objects_per_path):
"""Given an object dictionary returned by
:func:`compute_hashes_from_directory`, yields corresponding element
type's hashes
Args:
filter_type: one of GitType enum
objects_per_path:
Yields:
Elements of type filter_type's hashes
"""
for path, obj in objects_per_path.items():
o = obj['checksums']
if o['type'] == filter_type:
if 'children' in obj: # for trees
if obj['children']:
o['children'] = children_hashes(obj['children'],
objects_per_path)
else:
o['children'] = []
yield o
This diff is collapsed.
# Copyright (C) 2015-2017 The Software Heritage developers
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -10,31 +10,60 @@ Only a subset of hashing algorithms is supported as defined in the
ALGORITHMS set. Any provided algorithms not in that list will result
in a ValueError explaining the error.
This modules defines the following hashing functions:
This module defines a MultiHash class to ease the softwareheritage
hashing algorithms computation. This allows to compute hashes from
file object, path, data using a similar interface as what the standard
hashlib module provides.
- hash_file: Hash the contents of the given file object with the given
algorithms (defaulting to DEFAULT_ALGORITHMS if none provided).
Basic usage examples:
- hash_data: Hash the given binary blob with the given algorithms
(defaulting to DEFAULT_ALGORITHMS if none provided).
- file object: MultiHash.from_file(
file_object, hash_names=DEFAULT_ALGORITHMS).digest()
- path (filepath): MultiHash.from_path(b'foo').hexdigest()
- data (bytes): MultiHash.from_data(b'foo').bytehexdigest()
"Complex" usage, defining a swh hashlib instance first:
- To compute length, integrate the length to the set of algorithms to
compute, for example:
.. code-block:: python
h = MultiHash(hash_names=set({'length'}).union(DEFAULT_ALGORITHMS))
with open(filepath, 'rb') as f:
h.update(f.read(HASH_BLOCK_SIZE))
hashes = h.digest() # returns a dict of {hash_algo_name: hash_in_bytes}
- Write alongside computing hashing algorithms (from a stream), example:
.. code-block:: python
h = MultiHash(length=length)
with open(filepath, 'wb') as f:
for chunk in r.iter_content(): # r a stream of sort
h.update(chunk)
f.write(chunk)
hashes = h.hexdigest() # returns a dict of {hash_algo_name: hash_in_hex}
- hash_path: Hash the contents of the file at the given path with the
given algorithms (defaulting to DEFAULT_ALGORITHMS if none
provided).
"""
import binascii
import functools
import hashlib
import os
from io import BytesIO
import os
from typing import Callable, Dict, Optional, Union
ALGORITHMS = set(['sha1', 'sha256', 'sha1_git', 'blake2s256', 'blake2b512'])
ALGORITHMS = set(
["sha1", "sha256", "sha1_git", "blake2s256", "blake2b512", "md5", "sha512"]
)
"""Hashing algorithms supported by this module"""
DEFAULT_ALGORITHMS = set(['sha1', 'sha256', 'sha1_git', 'blake2s256'])
DEFAULT_ALGORITHMS = set(["sha1", "sha256", "sha1_git", "blake2s256"])
"""Algorithms computed by default when calling the functions from this module.
Subset of :const:`ALGORITHMS`.
......@@ -43,23 +72,138 @@ Subset of :const:`ALGORITHMS`.
HASH_BLOCK_SIZE = 32768
"""Block size for streaming hash computations made in this module"""
# Load blake2 hashes from pyblake2 if they are not available in the builtin
# hashlib
__pyblake2_hashes = {'blake2s256': 'blake2s',
'blake2b512': 'blake2b'}
__cache = hashlib.__builtin_constructor_cache
for __hash, __pyblake2_fn in __pyblake2_hashes.items():
if __hash not in hashlib.algorithms_available:
import pyblake2
__cache[__hash] = getattr(pyblake2, __pyblake2_fn)
_blake2_hash_cache: Dict[str, Callable] = {}
class MultiHash:
"""Hashutil class to support multiple hashes computation.
def _new_git_hash(base_algo, git_type, length):
"""Initialize a digest object (as returned by python's hashlib) for the
requested algorithm, and feed it with the header for a git object of the
given type and length.
Args:
The header for hashing a git object consists of:
hash_names (set): Set of hash algorithms (+ optionally length)
to compute hashes (cf. DEFAULT_ALGORITHMS)
length (int): Length of the total sum of chunks to read
If the length is provided as algorithm, the length is also
computed and returned.
"""
def __init__(self, hash_names=DEFAULT_ALGORITHMS, length=None):
self.state = {}
self.track_length = False
for name in hash_names:
if name == "length":
self.state["length"] = 0
self.track_length = True
else:
self.state[name] = _new_hash(name, length)
@classmethod
def from_state(cls, state, track_length):
ret = cls([])
ret.state = state
ret.track_length = track_length
@classmethod
def from_file(cls, fobj, hash_names=DEFAULT_ALGORITHMS, length=None):
ret = cls(length=length, hash_names=hash_names)
while True:
chunk = fobj.read(HASH_BLOCK_SIZE)
if not chunk:
break
ret.update(chunk)
return ret
@classmethod
def from_path(cls, path, hash_names=DEFAULT_ALGORITHMS):
length = os.path.getsize(path)
with open(path, "rb") as f:
ret = cls.from_file(f, hash_names=hash_names, length=length)
return ret
@classmethod
def from_data(cls, data, hash_names=DEFAULT_ALGORITHMS):
length = len(data)
fobj = BytesIO(data)
return cls.from_file(fobj, hash_names=hash_names, length=length)
def update(self, chunk):
for name, h in self.state.items():
if name == "length":
continue
h.update(chunk)
if self.track_length:
self.state["length"] += len(chunk)
def digest(self):
return {
name: h.digest() if name != "length" else h
for name, h in self.state.items()
}
def hexdigest(self):
return {
name: h.hexdigest() if name != "length" else h
for name, h in self.state.items()
}
def bytehexdigest(self):
return {
name: hash_to_bytehex(h.digest()) if name != "length" else h
for name, h in self.state.items()
}
def copy(self):
copied_state = {
name: h.copy() if name != "length" else h for name, h in self.state.items()
}
return self.from_state(copied_state, self.track_length)
def _new_blake2_hash(algo):
"""Return a function that initializes a blake2 hash."""
if algo in _blake2_hash_cache:
return _blake2_hash_cache[algo]()
lalgo = algo.lower()
if not lalgo.startswith("blake2"):
raise ValueError("Algorithm %s is not a blake2 hash" % algo)
blake_family = lalgo[:7]
digest_size = None
if lalgo[7:]:
try:
digest_size, remainder = divmod(int(lalgo[7:]), 8)
except ValueError:
raise ValueError("Unknown digest size for algo %s" % algo) from None
if remainder:
raise ValueError(
"Digest size for algorithm %s must be a multiple of 8" % algo
)
blake2 = getattr(hashlib, blake_family)
_blake2_hash_cache[algo] = lambda: blake2(digest_size=digest_size)
return _blake2_hash_cache[algo]()
def _new_hashlib_hash(algo):
"""Initialize a digest object from hashlib.
Handle the swh-specific names for the blake2-related algorithms
"""
if algo.startswith("blake2"):
return _new_blake2_hash(algo)
else:
return hashlib.new(algo)
def git_object_header(git_type: str, length: int) -> bytes:
"""Returns the header for a git object of the given type and length.
The header of a git object consists of:
- The type of the object (encoded in ASCII)
- One ASCII space (\x20)
- The length of the object (decimal encoded in ASCII)
......@@ -74,15 +218,26 @@ def _new_git_hash(base_algo, git_type, length):
Returns:
a hashutil.hash object
"""
git_object_types = {
"blob",
"tree",
"commit",
"tag",
"snapshot",
"raw_extrinsic_metadata",
"extid",
}
h = hashlib.new(base_algo)
git_header = '%s %d\0' % (git_type, length)
h.update(git_header.encode('ascii'))
if git_type not in git_object_types:
raise ValueError(
"Unexpected git object type %s, expected one of %s"
% (git_type, ", ".join(sorted(git_object_types)))
)
return h
return ("%s %d\0" % (git_type, length)).encode("ascii")
def _new_hash(algo, length=None):
def _new_hash(algo: str, length: Optional[int] = None):
"""Initialize a digest object (as returned by python's hashlib) for
the requested algorithm. See the constant ALGORITHMS for the list
of supported algorithms. If a git-specific hashing algorithm is
......@@ -104,87 +259,22 @@ def _new_hash(algo, length=None):
"""
if algo not in ALGORITHMS:
raise ValueError(
'Unexpected hashing algorithm %s, expected one of %s' %
(algo, ', '.join(sorted(ALGORITHMS))))
"Unexpected hashing algorithm %s, expected one of %s"
% (algo, ", ".join(sorted(ALGORITHMS)))
)
if algo.endswith('_git'):
if algo.endswith("_git"):
if length is None:
raise ValueError('Missing length for git hashing algorithm')
raise ValueError("Missing length for git hashing algorithm")
base_algo = algo[:-4]
return _new_git_hash(base_algo, 'blob', length)
return hashlib.new(algo)
def hash_file(fobj, length=None, algorithms=DEFAULT_ALGORITHMS, chunk_cb=None):
"""Hash the contents of the given file object with the given algorithms.
Args:
fobj: a file-like object
length: the length of the contents of the file-like object (for the
git-specific algorithms)
algorithms: the hashing algorithms used
Returns: a dict mapping each algorithm to a bytes digest.
Raises:
ValueError if algorithms contains an unknown hash algorithm.
"""
hashes = {algo: _new_hash(algo, length) for algo in algorithms}
while True:
chunk = fobj.read(HASH_BLOCK_SIZE)
if not chunk:
break
for hash in hashes.values():
hash.update(chunk)
if chunk_cb:
chunk_cb(chunk)
return {algo: hash.digest() for algo, hash in hashes.items()}
h = _new_hashlib_hash(base_algo)
h.update(git_object_header("blob", length))
return h
def hash_path(path, algorithms=DEFAULT_ALGORITHMS, chunk_cb=None):
"""Hash the contents of the file at the given path with the given
algorithms.
Args:
path: the path of the file to hash
algorithms: the hashing algorithms used
chunk_cb: a callback
Returns: a dict mapping each algorithm to a bytes digest.
Raises:
ValueError if algorithms contains an unknown hash algorithm.
OSError on file access error
"""
length = os.path.getsize(path)
with open(path, 'rb') as fobj:
hash = hash_file(fobj, length, algorithms, chunk_cb)
hash['length'] = length
return hash
return _new_hashlib_hash(algo)
def hash_data(data, algorithms=DEFAULT_ALGORITHMS):
"""Hash the given binary blob with the given algorithms.
Args:
data: a bytes object
algorithms: the hashing algorithms used
Returns: a dict mapping each algorithm to a bytes digest
Raises:
TypeError if data does not support the buffer interface.
ValueError if algorithms contains an unknown hash algorithm.
"""
fobj = BytesIO(data)
return hash_file(fobj, len(data), algorithms)
def hash_git_data(data, git_type, base_algo='sha1'):
def hash_git_data(data, git_type, base_algo="sha1"):
"""Hash the given data as a git object of type git_type.
Args:
......@@ -197,21 +287,15 @@ def hash_git_data(data, git_type, base_algo='sha1'):
Raises:
ValueError if the git_type is unexpected.
"""
git_object_types = {'blob', 'tree', 'commit', 'tag'}
if git_type not in git_object_types:
raise ValueError('Unexpected git object type %s, expected one of %s' %
(git_type, ', '.join(sorted(git_object_types))))
h = _new_git_hash(base_algo, git_type, len(data))
h = _new_hashlib_hash(base_algo)
h.update(git_object_header(git_type, len(data)))
h.update(data)
return h.digest()
@functools.lru_cache()
def hash_to_hex(hash):
def hash_to_hex(hash: Union[str, bytes]) -> str:
"""Converts a hash (in hex or bytes form) to its hexadecimal ascii form
Args:
......@@ -223,11 +307,11 @@ def hash_to_hex(hash):
"""
if isinstance(hash, str):
return hash
return binascii.hexlify(hash).decode('ascii')
return binascii.hexlify(hash).decode("ascii")
@functools.lru_cache()
def hash_to_bytehex(hash):
def hash_to_bytehex(hash: bytes) -> bytes:
"""Converts a hash to its hexadecimal bytes representation
Args:
......@@ -240,7 +324,7 @@ def hash_to_bytehex(hash):
@functools.lru_cache()
def hash_to_bytes(hash):
def hash_to_bytes(hash: Union[str, bytes]) -> bytes:
"""Converts a hash (in hex or bytes form) to its raw bytes form
Args:
......@@ -256,7 +340,7 @@ def hash_to_bytes(hash):
@functools.lru_cache()
def bytehex_to_hash(hex):
def bytehex_to_hash(hex: bytes) -> bytes:
"""Converts a hexadecimal bytes representation of a hash to that hash
Args:
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
# Marker file for PEP 561.
This diff is collapsed.
File added
File added
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
from random import choice, randint, random, shuffle
from typing import Dict, List
from pytz import all_timezones, timezone
from swh.model.hashutil import MultiHash
PROTOCOLS = ["git", "http", "https", "deb", "svn", "mock"]
DOMAINS = ["example.com", "some.long.host.name", "xn--n28h.tld"]
PATHS = [
"",
"/",
"/stuff",
"/stuff/",
"/path/to/resource",
"/path/with/anchor#id=42",
"/path/with/qargs?q=1&b",
]
CONTENT_STATUS = ["visible", "hidden", "absent"]
MAX_DATE = 3e9 # around 2065
def gen_all_origins():
for protocol in PROTOCOLS:
for domain in DOMAINS:
for urlpath in PATHS:
yield {"url": "%s://%s%s" % (protocol, domain, urlpath)}
ORIGINS = list(gen_all_origins())
def gen_origins(n: int = 100) -> List:
"""Returns a list of n randomly generated origins suitable for using as
Storage.add_origin() argument.
"""
origins = ORIGINS[:]
shuffle(origins)
return origins[:n]
def gen_content():
size = randint(1, 10 * 1024)
data = bytes(randint(0, 255) for i in range(size))
status = choice(CONTENT_STATUS)
h = MultiHash.from_data(data)
ctime = datetime.fromtimestamp(random() * MAX_DATE, timezone(choice(all_timezones)))
content = {
"data": data,
"status": status,
"length": size,
"ctime": ctime,
**h.digest(),
}
if status == "absent":
content["reason"] = "why not"
content["data"] = None
return content
def gen_contents(n=20) -> List[Dict]:
"""Returns a list of n randomly generated content objects (as dict) suitable
for using as Storage.content_add() argument.
"""
return [gen_content() for i in range(n)]
This diff is collapsed.
This diff is collapsed.