Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • anlambert/swh-model
  • lunar/swh-model
  • franckbret/swh-model
  • douardda/swh-model
  • olasd/swh-model
  • swh/devel/swh-model
  • Alphare/swh-model
  • samplet/swh-model
  • marmoute/swh-model
  • rboyer/swh-model
10 results
Show changes
Showing
with 3852 additions and 949 deletions
[flake8]
# E203: whitespaces before ':' <https://github.com/psf/black/issues/315>
# E231: missing whitespace after ','
# W503: line break before binary operator <https://github.com/psf/black/issues/52>
ignore = E203,E231,W503
max-line-length = 88
#!/usr/bin/env python3
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from io import open
from os import path
from setuptools import find_packages, setup
here = path.abspath(path.dirname(__file__))
# Get the long description from the README file
with open(path.join(here, "README.md"), encoding="utf-8") as f:
long_description = f.read()
def parse_requirements(name=None):
if name:
reqf = "requirements-%s.txt" % name
else:
reqf = "requirements.txt"
requirements = []
if not path.exists(reqf):
return requirements
with open(reqf) as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith("#"):
continue
requirements.append(line)
return requirements
blake2_requirements = ['pyblake2;python_version<"3.6"']
setup(
name="swh.model",
description="Software Heritage data model",
long_description=long_description,
long_description_content_type="text/markdown",
python_requires=">=3.7",
author="Software Heritage developers",
author_email="swh-devel@inria.fr",
url="https://forge.softwareheritage.org/diffusion/DMOD/",
packages=find_packages(),
setup_requires=["setuptools-scm"],
use_scm_version=True,
install_requires=(
parse_requirements() + parse_requirements("swh") + blake2_requirements
),
extras_require={
"cli": parse_requirements("cli"),
"testing": parse_requirements("test") + parse_requirements("cli"),
},
include_package_data=True,
entry_points="""
[console_scripts]
swh-identify=swh.model.cli:identify
[swh.cli.subcommands]
identify=swh.model.cli
""",
classifiers=[
"Programming Language :: Python :: 3",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
"Development Status :: 5 - Production/Stable",
],
project_urls={
"Bug Reports": "https://forge.softwareheritage.org/maniphest",
"Funding": "https://www.softwareheritage.org/donate",
"Source": "https://forge.softwareheritage.org/source/swh-model",
"Documentation": "https://docs.softwareheritage.org/devel/swh-model/",
},
)
from pkgutil import extend_path
from typing import Iterable
__path__ = extend_path(__path__, __name__) # type: Iterable[str]
......@@ -5,14 +5,30 @@
import os
import sys
from typing import Dict, List, Optional
from typing import Callable, Dict, Iterable, Optional
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import click
try:
import click
except ImportError:
print(
"Cannot run swh-identify; the Click package is not installed."
"Please install 'swh.model[cli]' for full functionality.",
file=sys.stderr,
)
exit(1)
try:
import swh.core.cli
from swh.core.cli import swh as swh_cli_group
from swh.model.identifiers import CoreSWHID, ObjectType
cli_command = swh.core.cli.swh.command
except ImportError:
# stub so that swh-identify can be used when swh-core isn't installed
cli_command = click.command
from swh.model.from_disk import Directory
from swh.model.swhids import CoreSWHID
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
......@@ -28,7 +44,7 @@ _DULWICH_TYPES = {
class CoreSWHIDParamType(click.ParamType):
"""Click argument that accepts a core SWHID and returns them as
:class:`swh.model.identifiers.CoreSWHID` instances """
:class:`swh.model.swhids.CoreSWHID` instances"""
name = "SWHID"
......@@ -43,63 +59,60 @@ class CoreSWHIDParamType(click.ParamType):
def swhid_of_file(path) -> CoreSWHID:
from swh.model.from_disk import Content
from swh.model.hashutil import hash_to_bytes
object = Content.from_file(path=path).get_data()
return CoreSWHID(
object_type=ObjectType.CONTENT, object_id=hash_to_bytes(object["sha1_git"])
)
object = Content.from_file(path=path)
return object.swhid()
def swhid_of_file_content(data) -> CoreSWHID:
from swh.model.from_disk import Content
from swh.model.hashutil import hash_to_bytes
object = Content.from_bytes(mode=644, data=data).get_data()
return CoreSWHID(
object_type=ObjectType.CONTENT, object_id=hash_to_bytes(object["sha1_git"])
)
object = Content.from_bytes(mode=644, data=data)
return object.swhid()
def swhid_of_dir(path: bytes, exclude_patterns: List[bytes] = None) -> CoreSWHID:
from swh.model.from_disk import (
Directory,
accept_all_directories,
ignore_directories_patterns,
)
from swh.model.hashutil import hash_to_bytes
def model_of_dir(
path: bytes,
exclude_patterns: Optional[Iterable[bytes]] = None,
update_info: Optional[Callable[[int], None]] = None,
) -> Directory:
from swh.model.from_disk import accept_all_paths, ignore_directories_patterns
dir_filter = (
path_filter = (
ignore_directories_patterns(path, exclude_patterns)
if exclude_patterns
else accept_all_directories
else accept_all_paths
)
object = Directory.from_disk(path=path, dir_filter=dir_filter).get_data()
return CoreSWHID(
object_type=ObjectType.DIRECTORY, object_id=hash_to_bytes(object["id"])
return Directory.from_disk(
path=path, path_filter=path_filter, progress_callback=update_info
)
def swhid_of_dir(
path: bytes, exclude_patterns: Optional[Iterable[bytes]] = None
) -> CoreSWHID:
obj = model_of_dir(path, exclude_patterns)
return obj.swhid()
def swhid_of_origin(url):
from swh.model.hashutil import hash_to_bytes
from swh.model.identifiers import (
ExtendedObjectType,
ExtendedSWHID,
origin_identifier,
)
from swh.model.model import Origin
return ExtendedSWHID(
object_type=ExtendedObjectType.ORIGIN,
object_id=hash_to_bytes(origin_identifier({"url": url})),
)
return Origin(url).swhid()
def swhid_of_git_repo(path) -> CoreSWHID:
import dulwich.repo
try:
import dulwich.repo
except ImportError:
raise click.ClickException(
"Cannot compute snapshot identifier; the Dulwich package is not installed. "
"Please install 'swh.model[cli]' for full functionality.",
)
from swh.model import hashutil
from swh.model.identifiers import snapshot_identifier
from swh.model.model import Snapshot
repo = dulwich.repo.Repo(path)
......@@ -122,13 +135,12 @@ def swhid_of_git_repo(path) -> CoreSWHID:
snapshot = {"branches": branches}
return CoreSWHID(
object_type=ObjectType.SNAPSHOT,
object_id=hashutil.hash_to_bytes(snapshot_identifier(snapshot)),
)
return Snapshot.from_dict(snapshot).swhid()
def identify_object(obj_type, follow_symlinks, exclude_patterns, obj) -> str:
def identify_object(
obj_type: str, follow_symlinks: bool, exclude_patterns: Iterable[bytes], obj
) -> str:
from urllib.parse import urlparse
if obj_type == "auto":
......@@ -155,9 +167,7 @@ def identify_object(obj_type, follow_symlinks, exclude_patterns, obj) -> str:
if obj_type == "content":
swhid = str(swhid_of_file(path))
elif obj_type == "directory":
swhid = str(
swhid_of_dir(path, [pattern.encode() for pattern in exclude_patterns])
)
swhid = str(swhid_of_dir(path, exclude_patterns))
elif obj_type == "origin":
swhid = str(swhid_of_origin(obj))
elif obj_type == "snapshot":
......@@ -170,7 +180,7 @@ def identify_object(obj_type, follow_symlinks, exclude_patterns, obj) -> str:
return swhid
@swh_cli_group.command(context_settings=CONTEXT_SETTINGS)
@cli_command(context_settings=CONTEXT_SETTINGS)
@click.option(
"--dereference/--no-dereference",
"follow_symlinks",
......@@ -199,7 +209,7 @@ def identify_object(obj_type, follow_symlinks, exclude_patterns, obj) -> str:
metavar="PATTERN",
multiple=True,
help="Exclude directories using glob patterns \
(e.g., '*.git' to exclude all .git directories)",
(e.g., ``*.git`` to exclude all .git directories)",
)
@click.option(
"--verify",
......@@ -208,66 +218,102 @@ def identify_object(obj_type, follow_symlinks, exclude_patterns, obj) -> str:
type=CoreSWHIDParamType(),
help="reference identifier to be compared with computed one",
)
@click.option(
"-r",
"--recursive",
is_flag=True,
help="compute SWHID recursively",
)
@click.argument("objects", nargs=-1, required=True)
def identify(
obj_type, verify, show_filename, follow_symlinks, objects, exclude_patterns,
obj_type,
verify,
show_filename,
follow_symlinks,
objects,
exclude_patterns,
recursive,
):
"""Compute the Software Heritage persistent identifier (SWHID) for the given
source code object(s).
For more details about SWHIDs see:
\b
https://docs.softwareheritage.org/devel/swh-model/persistent-identifiers.html
Tip: you can pass "-" to identify the content of standard input.
\b
Examples:
Examples::
\b
$ swh identify fork.c kmod.c sched/deadline.c
swh:1:cnt:2e391c754ae730bd2d8520c2ab497c403220c6e3 fork.c
swh:1:cnt:0277d1216f80ae1adeed84a686ed34c9b2931fc2 kmod.c
swh:1:cnt:57b939c81bce5d06fa587df8915f05affbe22b82 sched/deadline.c
\b
$ swh identify --no-filename /usr/src/linux/kernel/
swh:1:dir:f9f858a48d663b3809c9e2f336412717496202ab
\b
$ git clone --mirror https://forge.softwareheritage.org/source/helloworld.git
$ swh identify --type snapshot helloworld.git/
swh:1:snp:510aa88bdc517345d258c1fc2babcd0e1f905e93 helloworld.git
swh:1:snp:510aa88bdc517345d258c1fc2babcd0e1f905e93 helloworld.git
""" # NoQA # overlong lines in shell examples are fine
"""
from functools import partial
import logging
if exclude_patterns:
exclude_patterns = set(pattern.encode() for pattern in exclude_patterns)
if verify and len(objects) != 1:
raise click.BadParameter("verification requires a single object")
results = zip(
objects,
map(
partial(identify_object, obj_type, follow_symlinks, exclude_patterns),
objects,
),
)
if recursive and not os.path.isdir(objects[0]):
recursive = False
logging.warn("recursive option disabled, input is not a directory object")
if verify:
swhid = next(results)[1]
if str(verify) == swhid:
click.echo("SWHID match: %s" % swhid)
sys.exit(0)
else:
click.echo("SWHID mismatch: %s != %s" % (verify, swhid))
sys.exit(1)
else:
for (obj, swhid) in results:
msg = swhid
if show_filename:
msg = "%s\t%s" % (swhid, os.fsdecode(obj))
if recursive:
if verify:
raise click.BadParameter(
"verification of recursive object identification is not supported"
)
if not obj_type == ("auto" or "directory"):
raise click.BadParameter(
"recursive identification is supported only for directories"
)
path = os.fsencode(objects[0])
dir_obj = model_of_dir(path, exclude_patterns)
for sub_obj in dir_obj.iter_tree():
path_name = "path" if "path" in sub_obj.data.keys() else "data"
path = os.fsdecode(sub_obj.data[path_name])
swhid = str(sub_obj.swhid())
msg = f"{swhid}\t{path}" if show_filename else f"{swhid}"
click.echo(msg)
else:
results = zip(
objects,
map(
partial(identify_object, obj_type, follow_symlinks, exclude_patterns),
objects,
),
)
if verify:
swhid = next(results)[1]
if str(verify) == swhid:
click.echo("SWHID match: %s" % swhid)
sys.exit(0)
else:
click.echo("SWHID mismatch: %s != %s" % (verify, swhid))
sys.exit(1)
else:
for obj, swhid in results:
msg = swhid
if show_filename:
msg = "%s\t%s" % (swhid, os.fsdecode(obj))
click.echo(msg)
if __name__ == "__main__":
......
# Copyright (C) 2020 The Software Heritage developers
# Copyright (C) 2020-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
"""Utility data structures."""
from collections.abc import Mapping
import copy
from typing import Dict, Generic, Iterable, Optional, Tuple, TypeVar, Union
KT = TypeVar("KT")
......@@ -11,36 +16,40 @@ VT = TypeVar("VT")
class ImmutableDict(Mapping, Generic[KT, VT]):
data: Tuple[Tuple[KT, VT], ...]
"""A frozen dictionary.
This class behaves like a dictionary, but internally stores objects in a tuple,
so it is both immutable and hashable."""
_data: Dict[KT, VT]
def __init__(
self,
data: Union[
Iterable[Tuple[KT, VT]], "ImmutableDict[KT, VT]", Dict[KT, VT]
] = {},
data: Union[Iterable[Tuple[KT, VT]], ImmutableDict[KT, VT], Dict[KT, VT]] = {},
):
if isinstance(data, dict):
self.data = tuple(item for item in data.items())
self._data = data
elif isinstance(data, ImmutableDict):
self.data = data.data
self._data = data._data
else:
self.data = tuple(data)
self._data = {k: v for k, v in data}
@property
def data(self):
return tuple(self._data.items())
def __repr__(self):
return f"ImmutableDict({dict(self.data)!r})"
def __getitem__(self, key):
for (k, v) in self.data:
if k == key:
return v
raise KeyError(key)
return self._data[key]
def __iter__(self):
for (k, v) in self.data:
for k, v in self.data:
yield k
def __len__(self):
return len(self.data)
return len(self._data)
def items(self):
yield from self.data
......@@ -48,15 +57,9 @@ class ImmutableDict(Mapping, Generic[KT, VT]):
def __hash__(self):
return hash(tuple(sorted(self.data)))
def copy_pop(self, popped_key) -> Tuple[Optional[VT], "ImmutableDict[KT, VT]"]:
def copy_pop(self, popped_key) -> Tuple[Optional[VT], ImmutableDict[KT, VT]]:
"""Returns a copy of this ImmutableDict without the given key,
as well as the value associated to the key."""
popped_value = None
new_items = []
for (key, value) in self.data:
if key == popped_key:
popped_value = value
else:
new_items.append((key, value))
new_items = copy.deepcopy(self._data)
popped_value: Optional[VT] = new_items.pop(popped_key, None)
return (popped_value, ImmutableDict(new_items))
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Primitives for finding unknown content efficiently."""
from __future__ import annotations
from collections import namedtuple
import itertools
import logging
import random
from typing import (
Any,
Callable,
Iterable,
List,
Mapping,
NamedTuple,
Optional,
Set,
Union,
)
from typing_extensions import Protocol, runtime_checkable
from .from_disk import model
from .model import Sha1Git
logger = logging.getLogger(__name__)
# Maximum amount when sampling from the undecided set of directory entries
SAMPLE_SIZE = 1000
# Sets of sha1 of contents, skipped contents and directories respectively
Sample: NamedTuple = namedtuple(
"Sample", ["contents", "skipped_contents", "directories"]
)
@runtime_checkable
class ArchiveDiscoveryInterface(Protocol):
"""Interface used in discovery code to abstract over ways of connecting to
the SWH archive (direct storage, web API, etc.) for all methods needed by
discovery algorithms."""
contents: List[model.Content]
skipped_contents: List[model.SkippedContent]
directories: List[model.Directory]
def __init__(
self,
contents: List[model.Content],
skipped_contents: List[model.SkippedContent],
directories: List[model.Directory],
) -> None:
self.contents = contents
self.skipped_contents = skipped_contents
self.directories = directories
def content_missing(self, contents: List[Sha1Git]) -> Iterable[Sha1Git]:
"""List content missing from the archive by sha1"""
def skipped_content_missing(
self, skipped_contents: List[Sha1Git]
) -> Iterable[Sha1Git]:
"""List skipped content missing from the archive by sha1"""
def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
"""List directories missing from the archive by sha1"""
class BaseDiscoveryGraph:
"""Creates the base structures and methods needed for discovery algorithms.
Subclasses should override ``get_sample`` to affect how the discovery is made.
The `update_info_callback` is an optional argument that will get called for
each new piece of information we get. The callback arguments are `(content,
known)`.
- content: the relevant model.Content object,
- known: a boolean, True if the file is known to the archive False otherwise.
"""
def __init__(
self,
contents,
skipped_contents,
directories,
update_info_callback: Optional[Callable[[Any, bool], None]] = None,
):
self._all_contents: Mapping[
Sha1Git, Union[model.Content, model.SkippedContent]
] = {}
self._undecided_directories: Set[Sha1Git] = set()
self._children: Mapping[Sha1Git, Set[Sha1Git]] = {}
self._parents: Mapping[model.DirectoryEntry, Set[Any]] = {}
self.undecided: Set[Sha1Git] = set()
for content in itertools.chain(contents, skipped_contents):
self.undecided.add(content.sha1_git)
self._all_contents[content.sha1_git] = content
for directory in directories:
self.undecided.add(directory.id)
self._undecided_directories.add(directory.id)
self._children[directory.id] = {c.target for c in directory.entries}
for child in directory.entries:
self._parents.setdefault(child.target, set()).add(directory.id)
self.undecided |= self._undecided_directories
self.known: Set[Sha1Git] = set()
self.unknown: Set[Sha1Git] = set()
self._update_info_callback = update_info_callback
self._sha1_to_obj = {}
for content in itertools.chain(contents, skipped_contents):
self._sha1_to_obj[content.sha1_git] = content
for directory in directories:
self._sha1_to_obj[directory.id] = directory
def mark_known(self, entries: Iterable[Sha1Git]):
"""Mark ``entries`` and those they imply as known in the SWH archive"""
self._mark_entries(entries, self._children, self.known)
def mark_unknown(self, entries: Iterable[Sha1Git]):
"""Mark ``entries`` and those they imply as unknown in the SWH archive"""
self._mark_entries(entries, self._parents, self.unknown)
def _mark_entries(
self,
entries: Iterable[Sha1Git],
transitive_mapping: Mapping[Any, Any],
target_set: Set[Any],
):
"""Use Merkle graph properties to mark a directory entry as known or unknown.
If an entry is known, then all of its descendants are known. If it's
unknown, then all of its ancestors are unknown.
- ``entries``: directory entries to mark along with their ancestors/descendants
where applicable.
- ``transitive_mapping``: mapping from an entry to the next entries to mark
in the hierarchy, if any.
- ``target_set``: set where marked entries will be added.
"""
callback = self._update_info_callback
to_process = set(entries)
while to_process:
current = to_process.pop()
target_set.add(current)
new = current in self.undecided
self.undecided.discard(current)
self._undecided_directories.discard(current)
next_entries = transitive_mapping.get(current, set()) & self.undecided
to_process.update(next_entries)
if new and callback is not None:
obj = self._sha1_to_obj[current]
callback(obj, current in self.known)
def get_sample(
self,
) -> Sample:
"""Return a three-tuple of samples from the undecided sets of contents,
skipped contents and directories respectively.
These samples will be queried against the storage which will tell us
which are known."""
raise NotImplementedError()
def do_query(self, archive: ArchiveDiscoveryInterface, sample: Sample) -> None:
"""Given a three-tuple of samples, ask the archive which are known or
unknown and mark them as such."""
methods = (
archive.content_missing,
archive.skipped_content_missing,
archive.directory_missing,
)
for sample_per_type, method in zip(sample, methods):
if not sample_per_type:
continue
known = set(sample_per_type)
unknown = set(method(list(sample_per_type)))
known -= unknown
self.mark_known(known)
self.mark_unknown(unknown)
class RandomDirSamplingDiscoveryGraph(BaseDiscoveryGraph):
"""Use a random sampling using only directories.
This allows us to find a statistically good spread of entries in the graph
with a smaller population than using all types of entries. When there are
no more directories, only contents or skipped contents are undecided if any
are left: we send them directly to the storage since they should be few and
their structure flat."""
def get_sample(self) -> Sample:
if self._undecided_directories:
if len(self._undecided_directories) <= SAMPLE_SIZE:
return Sample(
contents=set(),
skipped_contents=set(),
directories=set(self._undecided_directories),
)
sample = random.sample(tuple(self._undecided_directories), SAMPLE_SIZE)
directories = {o for o in sample}
return Sample(
contents=set(), skipped_contents=set(), directories=directories
)
contents = set()
skipped_contents = set()
for sha1 in self.undecided:
obj = self._all_contents[sha1]
obj_type = obj.object_type
if obj_type == model.Content.object_type:
contents.add(sha1)
elif obj_type == model.SkippedContent.object_type:
skipped_contents.add(sha1)
else:
raise TypeError(f"Unexpected object type {obj_type}")
return Sample(
contents=contents, skipped_contents=skipped_contents, directories=set()
)
def filter_known_objects(
archive: ArchiveDiscoveryInterface,
update_info_callback: Optional[Callable[[Any, bool], None]] = None,
):
"""Filter ``archive``'s ``contents``, ``skipped_contents`` and ``directories``
to only return those that are unknown to the SWH archive using a discovery
algorithm.
The `update_info_callback` is an optional argument that will get called for
each new piece of information we get. The callback arguments are `(content,
known)`.
- content: the relevant model.Content object,
- known: a boolean, True if the file is known to the archive False otherwise.
"""
contents = archive.contents
skipped_contents = archive.skipped_contents
directories = archive.directories
contents_count = len(contents)
skipped_contents_count = len(skipped_contents)
directories_count = len(directories)
graph = RandomDirSamplingDiscoveryGraph(
contents,
skipped_contents,
directories,
update_info_callback=update_info_callback,
)
while graph.undecided:
sample = graph.get_sample()
graph.do_query(archive, sample)
contents = [c for c in contents if c.sha1_git in graph.unknown]
skipped_contents = [c for c in skipped_contents if c.sha1_git in graph.unknown]
directories = [c for c in directories if c.id in graph.unknown]
logger.debug(
"Filtered out %d contents, %d skipped contents and %d directories",
contents_count - len(contents),
skipped_contents_count - len(skipped_contents),
directories_count - len(directories),
)
return (contents, skipped_contents, directories)
......@@ -27,7 +27,10 @@ def validate_against_schema(model, schema, value):
if not isinstance(value, dict):
raise ValidationError(
"Unexpected type %(type)s for %(model)s, expected dict",
params={"model": model, "type": value.__class__.__name__,},
params={
"model": model,
"type": value.__class__.__name__,
},
code="model-unexpected-type",
)
......
......@@ -96,7 +96,9 @@ def validate_hash(value, hash_type):
raise ValidationError(
"Unexpected type %(type)s for hash, expected str or bytes",
params={"type": value.__class__.__name__,},
params={
"type": value.__class__.__name__,
},
code="unexpected-hash-value-type",
)
......
......@@ -18,7 +18,10 @@ def validate_type(value, type):
typestr = type.__name__
raise ValidationError(
"Unexpected type %(type)s, expected %(expected_type)s",
params={"type": value.__class__.__name__, "expected_type": typestr,},
params={
"type": value.__class__.__name__,
"expected_type": typestr,
},
code="unexpected-type",
)
......
This diff is collapsed.
This diff is collapsed.
......@@ -56,9 +56,11 @@ import functools
import hashlib
from io import BytesIO
import os
from typing import Callable, Dict
from typing import Callable, Dict, Optional, Union
ALGORITHMS = set(["sha1", "sha256", "sha1_git", "blake2s256", "blake2b512"])
ALGORITHMS = set(
["sha1", "sha256", "sha1_git", "blake2s256", "blake2b512", "md5", "sha512"]
)
"""Hashing algorithms supported by this module"""
DEFAULT_ALGORITHMS = set(["sha1", "sha256", "sha1_git", "blake2s256"])
......@@ -70,7 +72,7 @@ Subset of :const:`ALGORITHMS`.
HASH_BLOCK_SIZE = 32768
"""Block size for streaming hash computations made in this module"""
_blake2_hash_cache = {} # type: Dict[str, Callable]
_blake2_hash_cache: Dict[str, Callable] = {}
class MultiHash:
......@@ -160,9 +162,7 @@ class MultiHash:
def _new_blake2_hash(algo):
"""Return a function that initializes a blake2 hash.
"""
"""Return a function that initializes a blake2 hash."""
if algo in _blake2_hash_cache:
return _blake2_hash_cache[algo]()
......@@ -183,20 +183,8 @@ def _new_blake2_hash(algo):
"Digest size for algorithm %s must be a multiple of 8" % algo
)
if lalgo in hashlib.algorithms_available:
# Handle the case where OpenSSL ships the given algorithm
# (e.g. Python 3.5 on Debian 9 stretch)
_blake2_hash_cache[algo] = lambda: hashlib.new(lalgo)
else:
# Try using the built-in implementation for Python 3.6+
if blake_family in hashlib.algorithms_available:
blake2 = getattr(hashlib, blake_family)
else:
import pyblake2
blake2 = getattr(pyblake2, blake_family)
_blake2_hash_cache[algo] = lambda: blake2(digest_size=digest_size)
blake2 = getattr(hashlib, blake_family)
_blake2_hash_cache[algo] = lambda: blake2(digest_size=digest_size)
return _blake2_hash_cache[algo]()
......@@ -212,12 +200,10 @@ def _new_hashlib_hash(algo):
return hashlib.new(algo)
def _new_git_hash(base_algo, git_type, length):
"""Initialize a digest object (as returned by python's hashlib) for the
requested algorithm, and feed it with the header for a git object of the
given type and length.
def git_object_header(git_type: str, length: int) -> bytes:
"""Returns the header for a git object of the given type and length.
The header for hashing a git object consists of:
The header of a git object consists of:
- The type of the object (encoded in ASCII)
- One ASCII space (\x20)
- The length of the object (decimal encoded in ASCII)
......@@ -232,15 +218,26 @@ def _new_git_hash(base_algo, git_type, length):
Returns:
a hashutil.hash object
"""
git_object_types = {
"blob",
"tree",
"commit",
"tag",
"snapshot",
"raw_extrinsic_metadata",
"extid",
}
h = _new_hashlib_hash(base_algo)
git_header = "%s %d\0" % (git_type, length)
h.update(git_header.encode("ascii"))
if git_type not in git_object_types:
raise ValueError(
"Unexpected git object type %s, expected one of %s"
% (git_type, ", ".join(sorted(git_object_types)))
)
return h
return ("%s %d\0" % (git_type, length)).encode("ascii")
def _new_hash(algo, length=None):
def _new_hash(algo: str, length: Optional[int] = None):
"""Initialize a digest object (as returned by python's hashlib) for
the requested algorithm. See the constant ALGORITHMS for the list
of supported algorithms. If a git-specific hashing algorithm is
......@@ -270,7 +267,9 @@ def _new_hash(algo, length=None):
if length is None:
raise ValueError("Missing length for git hashing algorithm")
base_algo = algo[:-4]
return _new_git_hash(base_algo, "blob", length)
h = _new_hashlib_hash(base_algo)
h.update(git_object_header("blob", length))
return h
return _new_hashlib_hash(algo)
......@@ -288,23 +287,15 @@ def hash_git_data(data, git_type, base_algo="sha1"):
Raises:
ValueError if the git_type is unexpected.
"""
git_object_types = {"blob", "tree", "commit", "tag", "snapshot"}
if git_type not in git_object_types:
raise ValueError(
"Unexpected git object type %s, expected one of %s"
% (git_type, ", ".join(sorted(git_object_types)))
)
h = _new_git_hash(base_algo, git_type, len(data))
h = _new_hashlib_hash(base_algo)
h.update(git_object_header(git_type, len(data)))
h.update(data)
return h.digest()
@functools.lru_cache()
def hash_to_hex(hash):
def hash_to_hex(hash: Union[str, bytes]) -> str:
"""Converts a hash (in hex or bytes form) to its hexadecimal ascii form
Args:
......@@ -320,7 +311,7 @@ def hash_to_hex(hash):
@functools.lru_cache()
def hash_to_bytehex(hash):
def hash_to_bytehex(hash: bytes) -> bytes:
"""Converts a hash to its hexadecimal bytes representation
Args:
......@@ -333,7 +324,7 @@ def hash_to_bytehex(hash):
@functools.lru_cache()
def hash_to_bytes(hash):
def hash_to_bytes(hash: Union[str, bytes]) -> bytes:
"""Converts a hash (in hex or bytes form) to its raw bytes form
Args:
......@@ -349,7 +340,7 @@ def hash_to_bytes(hash):
@functools.lru_cache()
def bytehex_to_hash(hex):
def bytehex_to_hash(hex: bytes) -> bytes:
"""Converts a hexadecimal bytes representation of a hash to that hash
Args:
......
This diff is collapsed.
# Copyright (C) 2017-2020 The Software Heritage developers
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Merkle tree data structure"""
import abc
from collections.abc import Mapping
from typing import Iterator, List, Set
def deep_update(left, right):
"""Recursively update the left mapping with deeply nested values from the right
mapping.
This function is useful to merge the results of several calls to
:func:`MerkleNode.collect`.
Arguments:
left: a mapping (modified by the update operation)
right: a mapping
Returns:
the left mapping, updated with nested values from the right mapping
Example:
>>> a = {
... 'key1': {
... 'key2': {
... 'key3': 'value1/2/3',
... },
... },
... }
>>> deep_update(a, {
... 'key1': {
... 'key2': {
... 'key4': 'value1/2/4',
... },
... },
... }) == {
... 'key1': {
... 'key2': {
... 'key3': 'value1/2/3',
... 'key4': 'value1/2/4',
... },
... },
... }
True
>>> deep_update(a, {
... 'key1': {
... 'key2': {
... 'key3': 'newvalue1/2/3',
... },
... },
... }) == {
... 'key1': {
... 'key2': {
... 'key3': 'newvalue1/2/3',
... 'key4': 'value1/2/4',
... },
... },
... }
True
from __future__ import annotations
"""
for key, rvalue in right.items():
if isinstance(rvalue, Mapping):
new_lvalue = deep_update(left.get(key, {}), rvalue)
left[key] = new_lvalue
else:
left[key] = rvalue
return left
import abc
from typing import Any, Dict, Iterator, List, Set
class MerkleNode(dict, metaclass=abc.ABCMeta):
......@@ -101,16 +39,18 @@ class MerkleNode(dict, metaclass=abc.ABCMeta):
The collection of updated data from the tree is implemented through the
:func:`collect` function and associated helpers.
Attributes:
data (dict): data associated to the current node
parents (list): known parents of the current node
collected (bool): whether the current node has been collected
"""
__slots__ = ["parents", "data", "__hash", "collected"]
"""Type of the current node (used as a classifier for :func:`collect`)"""
data: Dict
"""data associated to the current node"""
parents: List
"""known parents of the current node"""
collected: bool
"""whether the current node has been collected"""
def __init__(self, data=None):
super().__init__()
......@@ -139,7 +79,7 @@ class MerkleNode(dict, metaclass=abc.ABCMeta):
for parent in self.parents:
parent.invalidate_hash()
def update_hash(self, *, force=False):
def update_hash(self, *, force=False) -> Any:
"""Recursively compute the hash of the current node.
Args:
......@@ -159,14 +99,17 @@ class MerkleNode(dict, metaclass=abc.ABCMeta):
return self.__hash
@property
def hash(self):
def hash(self) -> Any:
"""The hash of the current node, as calculated by
:func:`compute_hash`.
"""
return self.update_hash()
def __hash__(self):
return hash(self.hash)
@abc.abstractmethod
def compute_hash(self):
def compute_hash(self) -> Any:
"""Compute the hash of the current node.
The hash should depend on the data of the node, as well as on hashes
......@@ -221,47 +164,24 @@ class MerkleNode(dict, metaclass=abc.ABCMeta):
"""
return self.data
def collect_node(self, **kwargs):
"""Collect the data for the current node, for use by :func:`collect`.
Arguments:
kwargs: passed as-is to :func:`get_data`.
Returns:
A :class:`dict` compatible with :func:`collect`.
"""
def collect_node(self) -> Set[MerkleNode]:
"""Collect the current node if it has not been yet, for use by :func:`collect`."""
if not self.collected:
self.collected = True
return {self.object_type: {self.hash: self.get_data(**kwargs)}}
return {self}
else:
return {}
def collect(self, **kwargs):
"""Collect the data for all nodes in the subtree rooted at `self`.
return set()
The data is deduplicated by type and by hash.
Arguments:
kwargs: passed as-is to :func:`get_data`.
def collect(self) -> Set[MerkleNode]:
"""Collect the added and modified nodes in the subtree rooted at `self`
since the last collect operation.
Returns:
A :class:`dict` with the following structure::
{
'typeA': {
node1.hash: node1.get_data(),
node2.hash: node2.get_data(),
},
'typeB': {
node3.hash: node3.get_data(),
...
},
...
}
A :class:`set` of collected nodes
"""
ret = self.collect_node(**kwargs)
ret = self.collect_node()
for child in self.values():
deep_update(ret, child.collect(**kwargs))
ret.update(child.collect())
return ret
......@@ -275,18 +195,20 @@ class MerkleNode(dict, metaclass=abc.ABCMeta):
for child in self.values():
child.reset_collect()
def iter_tree(self) -> Iterator["MerkleNode"]:
"""Yields all children nodes, recursively. Common nodes are
deduplicated.
def iter_tree(self, dedup=True) -> Iterator[MerkleNode]:
"""Yields all children nodes, recursively. Common nodes are deduplicated
by default (deduplication can be turned off setting the given argument
'dedup' to False).
"""
yield from self._iter_tree(set())
yield from self._iter_tree(seen=set(), dedup=dedup)
def _iter_tree(self, seen: Set[bytes]) -> Iterator["MerkleNode"]:
def _iter_tree(self, seen: Set[bytes], dedup) -> Iterator[MerkleNode]:
if self.hash not in seen:
seen.add(self.hash)
if dedup:
seen.add(self.hash)
yield self
for child in self.values():
yield from child._iter_tree(seen=seen)
yield from child._iter_tree(seen=seen, dedup=dedup)
class MerkleLeaf(MerkleNode):
......@@ -295,7 +217,7 @@ class MerkleLeaf(MerkleNode):
A Merkle leaf is simply a Merkle node with children disabled.
"""
__slots__ = [] # type: List[str]
__slots__: List[str] = []
def __setitem__(self, name, child):
raise ValueError("%s is a leaf" % self.__class__.__name__)
......
This diff is collapsed.
......@@ -157,7 +157,9 @@ class ValidateCompound(unittest.TestCase):
def test_validate_whole_schema_shortcut_previous_error(self):
with self.assertRaises(ValidationError) as cm:
compound.validate_against_schema(
self.test_model, self.test_schema_shortcut, self.test_value_missing,
self.test_model,
self.test_schema_shortcut,
self.test_value_missing,
)
exc = cm.exception
......@@ -167,7 +169,9 @@ class ValidateCompound(unittest.TestCase):
def test_validate_whole_schema(self):
with self.assertRaises(ValidationError) as cm:
compound.validate_against_schema(
self.test_model, self.test_schema_shortcut, self.test_value,
self.test_model,
self.test_schema_shortcut,
self.test_value,
)
# The exception should be of the form:
......
This diff is collapsed.
......@@ -4,19 +4,21 @@
# See top-level LICENSE file for more information
import os
import sys
import tarfile
import tempfile
import unittest
import unittest.mock
from click.testing import CliRunner
import pytest
from swh.model import cli
from swh.model.hashutil import hash_to_hex
from swh.model.tests.swh_model_data import SAMPLE_FOLDER_SWHIDS
from swh.model.tests.test_from_disk import DataMixin
@pytest.mark.fs
class TestIdentify(DataMixin, unittest.TestCase):
def setUp(self):
super().setUp()
......@@ -52,6 +54,7 @@ class TestIdentify(DataMixin, unittest.TestCase):
result = self.runner.invoke(cli.identify, ["--type", "directory", path])
self.assertSWHID(result, "swh:1:dir:e8b0f1466af8608c8a3fb9879db172b887e80759")
@pytest.mark.requires_optional_deps
def test_snapshot_id(self):
"""identify a snapshot"""
tarball = os.path.join(
......@@ -68,6 +71,20 @@ class TestIdentify(DataMixin, unittest.TestCase):
result, "swh:1:snp:abc888898124270905a0ef3c67e872ce08e7e0c1"
)
def test_snapshot_without_dulwich(self):
"""checks swh-identify returns a 'nice' message instead of a traceback
when dulwich is not installed"""
with unittest.mock.patch.dict(sys.modules, {"dulwich": None}):
with tempfile.TemporaryDirectory(prefix="swh.model.cli") as d:
result = self.runner.invoke(
cli.identify,
["--type", "snapshot", d],
catch_exceptions=False,
)
assert result.exit_code == 1
assert "'swh.model[cli]'" in result.output
def test_origin_id(self):
"""identify an origin URL"""
url = "https://github.com/torvalds/linux"
......@@ -78,7 +95,8 @@ class TestIdentify(DataMixin, unittest.TestCase):
"""identify symlink --- both itself and target"""
regular = os.path.join(self.tmpdir_name, b"foo.txt")
link = os.path.join(self.tmpdir_name, b"bar.txt")
open(regular, "w").write("foo\n")
with open(regular, "w") as f:
f.write("foo\n")
os.symlink(os.path.basename(regular), link)
result = self.runner.invoke(cli.identify, [link])
......@@ -162,3 +180,34 @@ class TestIdentify(DataMixin, unittest.TestCase):
)
self.assertSWHID(result, "swh:1:dir:e8b0f1466af8608c8a3fb9879db172b887e80759")
def test_recursive_directory(self):
self.make_from_tarball(self.tmpdir_name)
path = os.path.join(self.tmpdir_name, b"sample-folder")
result = self.runner.invoke(cli.identify, ["--recursive", path])
self.assertEqual(result.exit_code, 0, result.output)
result = result.output.split()
result_swhids = []
# get all SWHID from the result
for i in range(0, len(result)):
if i % 2 == 0:
result_swhids.append(result[i])
assert len(result_swhids) == len(SAMPLE_FOLDER_SWHIDS)
for swhid in SAMPLE_FOLDER_SWHIDS:
assert swhid in result_swhids
def test_recursive_directory_no_filename(self):
self.make_from_tarball(self.tmpdir_name)
path = os.path.join(self.tmpdir_name, b"sample-folder")
result = self.runner.invoke(
cli.identify, ["--recursive", "--no-filename", path]
)
self.assertEqual(result.exit_code, 0, result.output)
result_swhids = result.output.split()
assert len(result_swhids) == len(SAMPLE_FOLDER_SWHIDS)
for swhid in SAMPLE_FOLDER_SWHIDS:
assert swhid in result_swhids
# Copyright (C) 2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import dataclass
from typing import Iterable, List
from swh.model import discovery, model
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Sha1Git
from swh.model.tests.test_identifiers import directory_example
pytest_plugins = ["aiohttp.pytest_plugin"]
UNKNOWN_HASH = hash_to_bytes("17140cb6109f1e3296dc52e2b2cd29bcb40e86be")
KNOWN_CONTENT_HASH = hash_to_bytes("e8e4106de42e2d5d5efab6a9422b9a8677c993c8")
KNOWN_DIRECTORY_HASH = hash_to_bytes("d7ed3d2c31d608823be58b1cbe57605310615231")
KNOWN_DIRECTORY_HASH_2 = hash_to_bytes("c76724e9a0be4b60f4bf0cb48b261df8eda94b1d")
@dataclass
class FakeArchive:
contents: List[model.Content]
skipped_contents: List[model.SkippedContent]
directories: List[model.Directory]
def content_missing(self, contents: List[Sha1Git]) -> Iterable[Sha1Git]:
return []
def skipped_content_missing(
self, skipped_contents: List[Sha1Git]
) -> Iterable[Sha1Git]:
"""List skipped content missing from the archive by sha1"""
return []
def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]:
"""List directories missing from the archive by sha1"""
return []
def test_filter_known_objects(monkeypatch):
# Test with smaller sample sizes to actually trigger the random sampling
monkeypatch.setattr(discovery, "SAMPLE_SIZE", 1)
base_directory = model.Directory.from_dict(directory_example)
# Hardcoding another hash is enough since it's all that's being checked
directory_data = directory_example.copy()
directory_data["id"] = KNOWN_DIRECTORY_HASH_2
other_directory = model.Directory.from_dict(directory_data)
archive = FakeArchive(
contents=[model.Content.from_data(b"blabla")],
skipped_contents=[model.SkippedContent.from_data(b"blabla2", reason="reason")],
directories=[
base_directory,
other_directory,
],
)
assert archive.contents[0].sha1_git == KNOWN_CONTENT_HASH
assert archive.directories[0].id == KNOWN_DIRECTORY_HASH
assert archive.directories[1].id == KNOWN_DIRECTORY_HASH_2
(contents, skipped_contents, directories) = discovery.filter_known_objects(archive)
assert len(contents) == 0
assert len(skipped_contents) == 0
assert len(directories) == 0