Skip to content
Snippets Groups Projects
Commit e4fd96f2 authored by Jenkins for Software Heritage's avatar Jenkins for Software Heritage
Browse files

New upstream version 0.13.0

parents 4e3bc4e1 24b653e4
No related branches found
Tags debian/upstream/0.13.0
No related merge requests found
Metadata-Version: 2.1
Name: swh.model
Version: 0.12.0
Version: 0.13.0
Summary: Software Heritage data model
Home-page: https://forge.softwareheritage.org/diffusion/DMOD/
Author: Software Heritage developers
......
......@@ -299,7 +299,10 @@ it can be *computed from the object itself*, without having to rely on any
third party. An implementation of SWHID that allows to do so locally is the
`swh identify <https://docs.softwareheritage.org/devel/swh-model/cli.html>`_
tool, available from the `swh.model <https://pypi.org/project/swh.model/>`_
Python package under the GPL license.
Python package under the GPL license. This package can be installed via the ``pip``
package manager with the one liner ``pip3 install swh.model`` on any machine with
Python (at least version 3.7) and ``pip`` installed (on a Debian or Ubuntu system a simple ``apt install python3 python3-pip``
will suffice, see `the general instructions <https://packaging.python.org/tutorials/installing-packages/>`_ for other platforms).
SWHIDs are also automatically computed by Software Heritage for all archived
objects as part of its archival activity, and can be looked up via the project
......
Metadata-Version: 2.1
Name: swh.model
Version: 0.12.0
Version: 0.13.0
Summary: Software Heritage data model
Home-page: https://forge.softwareheritage.org/diffusion/DMOD/
Author: Software Heritage developers
......
# Copyright (C) 2015-2020 The Software Heritage developers
# Copyright (C) 2015-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import binascii
import datetime
import enum
from functools import lru_cache
import hashlib
import re
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from typing import (
Any,
Dict,
Generic,
Iterable,
List,
Optional,
Tuple,
Type,
TypeVar,
Union,
)
import urllib.parse
import warnings
import attr
from attrs_strict import type_validator
from .collections import ImmutableDict
from .exceptions import ValidationError
from .fields.hashes import validate_sha1
from .hashutil import MultiHash, hash_git_data, hash_to_hex
from .hashutil import MultiHash, hash_git_data, hash_to_bytes, hash_to_hex
class ObjectType(enum.Enum):
"""Possible object types of a QualifiedSWHID or CoreSWHID.
The values of each variant is what is used in the SWHID's string representation."""
SNAPSHOT = "snp"
REVISION = "rev"
RELEASE = "rel"
DIRECTORY = "dir"
CONTENT = "cnt"
class ExtendedObjectType(enum.Enum):
"""Possible object types of an ExtendedSWHID.
The variants are a superset of :cls:`ObjectType`'s"""
SNAPSHOT = "snp"
REVISION = "rev"
RELEASE = "rel"
DIRECTORY = "dir"
CONTENT = "cnt"
ORIGIN = "ori"
RAW_EXTRINSIC_METADATA = "emd"
# The following are deprecated aliases of the variants defined in ObjectType
# while transitioning from SWHID to QualifiedSWHID
ORIGIN = "origin"
SNAPSHOT = "snapshot"
REVISION = "revision"
RELEASE = "release"
DIRECTORY = "directory"
CONTENT = "content"
RAW_EXTRINSIC_METADATA = "raw_extrinsic_metadata"
SWHID_NAMESPACE = "swh"
SWHID_VERSION = 1
SWHID_TYPES = ["ori", "snp", "rel", "rev", "dir", "cnt"]
SWHID_TYPES = ["snp", "rel", "rev", "dir", "cnt"]
EXTENDED_SWHID_TYPES = SWHID_TYPES + ["ori", "emd"]
SWHID_SEP = ":"
SWHID_CTXT_SEP = ";"
SWHID_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"}
SWHID_RE_RAW = (
f"(?P<scheme>{SWHID_NAMESPACE})"
f"{SWHID_SEP}(?P<version>{SWHID_VERSION})"
f"{SWHID_SEP}(?P<object_type>{'|'.join(SWHID_TYPES)})"
f"(?P<namespace>{SWHID_NAMESPACE})"
f"{SWHID_SEP}(?P<scheme_version>{SWHID_VERSION})"
f"{SWHID_SEP}(?P<object_type>{'|'.join(EXTENDED_SWHID_TYPES)})"
f"{SWHID_SEP}(?P<object_id>[0-9a-f]{{40}})"
f"({SWHID_CTXT_SEP}(?P<qualifiers>\\S+))?"
)
......@@ -685,6 +733,7 @@ _object_type_map = {
REVISION: {"short_name": "rev", "key_id": "id"},
DIRECTORY: {"short_name": "dir", "key_id": "id"},
CONTENT: {"short_name": "cnt", "key_id": "sha1_git"},
RAW_EXTRINSIC_METADATA: {"short_name": "emd", "key_id": "id"},
}
_swhid_type_map = {
......@@ -694,14 +743,350 @@ _swhid_type_map = {
"rev": REVISION,
"dir": DIRECTORY,
"cnt": CONTENT,
"emd": RAW_EXTRINSIC_METADATA,
}
# type of the "object_type" attribute of the SWHID class; either
# ObjectType or ExtendedObjectType
_TObjectType = TypeVar("_TObjectType", ObjectType, ExtendedObjectType)
# the SWHID class itself (this is used so that X.from_string() can return X
# for all X subclass of _BaseSWHID)
_TSWHID = TypeVar("_TSWHID", bound="_BaseSWHID")
@attr.s(frozen=True, kw_only=True)
class _BaseSWHID(Generic[_TObjectType]):
"""Common base class for CoreSWHID, QualifiedSWHID, and ExtendedSWHID.
This is an "abstract" class and should not be instantiated directly;
it only exists to deduplicate code between these three SWHID classes."""
namespace = attr.ib(type=str, default=SWHID_NAMESPACE)
"""the namespace of the identifier, defaults to ``swh``"""
scheme_version = attr.ib(type=int, default=SWHID_VERSION)
"""the scheme version of the identifier, defaults to 1"""
# overridden by subclasses
object_type: _TObjectType
"""the type of object the identifier points to"""
object_id = attr.ib(type=bytes, validator=type_validator())
"""object's identifier"""
@namespace.validator
def check_namespace(self, attribute, value):
if value != SWHID_NAMESPACE:
raise ValidationError(
"Invalid SWHID: invalid namespace: %(namespace)s",
params={"namespace": value},
)
@scheme_version.validator
def check_scheme_version(self, attribute, value):
if value != SWHID_VERSION:
raise ValidationError(
"Invalid SWHID: invalid version: %(version)s", params={"version": value}
)
@object_id.validator
def check_object_id(self, attribute, value):
if len(value) != 20:
raise ValidationError(
"Invalid SWHID: invalid checksum: %(object_id)s",
params={"object_id": hash_to_hex(value)},
)
def __str__(self) -> str:
return SWHID_SEP.join(
[
self.namespace,
str(self.scheme_version),
self.object_type.value,
hash_to_hex(self.object_id),
]
)
@classmethod
def from_string(cls: Type[_TSWHID], s: str) -> _TSWHID:
parts = _parse_swhid(s)
if parts.pop("qualifiers"):
raise ValidationError(f"{cls.__name__} does not support qualifiers.")
try:
return cls(**parts)
except ValueError as e:
raise ValidationError(
"ValueError: %(args)", params={"args": e.args}
) from None
@attr.s(frozen=True, kw_only=True)
class CoreSWHID(_BaseSWHID[ObjectType]):
"""
Dataclass holding the relevant info associated to a SoftWare Heritage
persistent IDentifier (SWHID).
Unlike `QualifiedSWHID`, it is restricted to core SWHIDs, ie. SWHIDs
with no qualifiers.
Raises:
swh.model.exceptions.ValidationError: In case of invalid object type or id
To get the raw SWHID string from an instance of this class,
use the :func:`str` function:
>>> swhid = CoreSWHID(
... object_type=ObjectType.CONTENT,
... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'),
... )
>>> str(swhid)
'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0'
And vice-versa with :meth:`CoreSWHID.from_string`:
>>> swhid == CoreSWHID.from_string(
... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0"
... )
True
"""
object_type = attr.ib(
type=ObjectType, validator=type_validator(), converter=ObjectType
)
"""the type of object the identifier points to"""
def _parse_core_swhid(swhid: Union[str, CoreSWHID, None]) -> Optional[CoreSWHID]:
if swhid is None or isinstance(swhid, CoreSWHID):
return swhid
else:
return CoreSWHID.from_string(swhid)
def _parse_lines_qualifier(
lines: Union[str, Tuple[int, Optional[int]], None]
) -> Optional[Tuple[int, Optional[int]]]:
try:
if lines is None or isinstance(lines, tuple):
return lines
elif "-" in lines:
(from_, to) = lines.split("-", 2)
return (int(from_), int(to))
else:
return (int(lines), None)
except ValueError:
raise ValidationError(
"Invalid format for the lines qualifier: %(lines)", params={"lines": lines}
)
def _parse_path_qualifier(path: Union[str, bytes, None]) -> Optional[bytes]:
if path is None or isinstance(path, bytes):
return path
else:
return urllib.parse.unquote_to_bytes(path)
@attr.s(frozen=True, kw_only=True)
class QualifiedSWHID(_BaseSWHID[ObjectType]):
"""
Dataclass holding the relevant info associated to a SoftWare Heritage
persistent IDentifier (SWHID)
Raises:
swh.model.exceptions.ValidationError: In case of invalid object type or id
To get the raw SWHID string from an instance of this class,
use the :func:`str` function:
>>> swhid = QualifiedSWHID(
... object_type=ObjectType.CONTENT,
... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'),
... lines=(5, 10),
... )
>>> str(swhid)
'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10'
And vice-versa with :meth:`QualifiedSWHID.from_string`:
>>> swhid == QualifiedSWHID.from_string(
... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10"
... )
True
"""
object_type = attr.ib(
type=ObjectType, validator=type_validator(), converter=ObjectType
)
"""the type of object the identifier points to"""
# qualifiers:
origin = attr.ib(type=Optional[str], default=None, validator=type_validator())
"""the software origin where an object has been found or observed in the wild,
as an URI"""
visit = attr.ib(type=Optional[CoreSWHID], default=None, converter=_parse_core_swhid)
"""the core identifier of a snapshot corresponding to a specific visit
of a repository containing the designated object"""
anchor = attr.ib(
type=Optional[CoreSWHID],
default=None,
validator=type_validator(),
converter=_parse_core_swhid,
)
"""a designated node in the Merkle DAG relative to which a path to the object
is specified, as the core identifier of a directory, a revision, a release,
or a snapshot"""
path = attr.ib(
type=Optional[bytes],
default=None,
validator=type_validator(),
converter=_parse_path_qualifier,
)
"""the absolute file path, from the root directory associated to the anchor node,
to the object; when the anchor denotes a directory or a revision, and almost always
when it’s a release, the root directory is uniquely determined;
when the anchor denotes a snapshot, the root directory is the one pointed to by HEAD
(possibly indirectly), and undefined if such a reference is missing"""
lines = attr.ib(
type=Optional[Tuple[int, Optional[int]]],
default=None,
validator=type_validator(),
converter=_parse_lines_qualifier,
)
"""lines: line number(s) of interest, usually within a content object"""
@visit.validator
def check_visit(self, attribute, value):
if value and value.object_type != ObjectType.SNAPSHOT:
raise ValidationError(
"The 'visit' qualifier must be a 'snp' SWHID, not '%(type)s'",
params={"type": value.object_type.value},
)
@anchor.validator
def check_anchor(self, attribute, value):
if value and value.object_type not in (
ObjectType.DIRECTORY,
ObjectType.REVISION,
ObjectType.RELEASE,
ObjectType.SNAPSHOT,
):
raise ValidationError(
"The 'visit' qualifier must be a 'dir', 'rev', 'rel', or 'snp' SWHID, "
"not '%s(type)s'",
params={"type": value.object_type.value},
)
def qualifiers(self) -> Dict[str, str]:
origin = self.origin
if origin:
unescaped_origin = origin
origin = origin.replace(";", "%3B")
assert urllib.parse.unquote_to_bytes(
origin
) == urllib.parse.unquote_to_bytes(
unescaped_origin
), "Escaping ';' in the origin qualifier corrupted the origin URL."
d: Dict[str, Optional[str]] = {
"origin": origin,
"visit": str(self.visit) if self.visit else None,
"anchor": str(self.anchor) if self.anchor else None,
"path": (
urllib.parse.quote_from_bytes(self.path)
if self.path is not None
else None
),
"lines": (
"-".join(str(line) for line in self.lines if line is not None)
if self.lines
else None
),
}
return {k: v for (k, v) in d.items() if v is not None}
def __str__(self) -> str:
swhid = SWHID_SEP.join(
[
self.namespace,
str(self.scheme_version),
self.object_type.value,
hash_to_hex(self.object_id),
]
)
qualifiers = self.qualifiers()
if qualifiers:
for k, v in qualifiers.items():
swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v)
return swhid
@classmethod
def from_string(cls, s: str) -> QualifiedSWHID:
parts = _parse_swhid(s)
qualifiers = parts.pop("qualifiers")
invalid_qualifiers = set(qualifiers) - SWHID_QUALIFIERS
if invalid_qualifiers:
raise ValidationError(
"Invalid qualifier(s): %(qualifiers)",
params={"qualifiers": ", ".join(invalid_qualifiers)},
)
try:
return QualifiedSWHID(**parts, **qualifiers)
except ValueError as e:
raise ValidationError(
"ValueError: %(args)s", params={"args": e.args}
) from None
@attr.s(frozen=True, kw_only=True)
class ExtendedSWHID(_BaseSWHID[ExtendedObjectType]):
"""
Dataclass holding the relevant info associated to a SoftWare Heritage
persistent IDentifier (SWHID).
It extends `CoreSWHID`, by allowing non-standard object types; and should
only be used internally to Software Heritage.
Raises:
swh.model.exceptions.ValidationError: In case of invalid object type or id
To get the raw SWHID string from an instance of this class,
use the :func:`str` function:
>>> swhid = ExtendedSWHID(
... object_type=ExtendedObjectType.CONTENT,
... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'),
... )
>>> str(swhid)
'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0'
And vice-versa with :meth:`CoreSWHID.from_string`:
>>> swhid == ExtendedSWHID.from_string(
... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0"
... )
True
"""
object_type = attr.ib(
type=ExtendedObjectType,
validator=type_validator(),
converter=ExtendedObjectType,
)
"""the type of object the identifier points to"""
@attr.s(frozen=True)
class SWHID:
"""
Named tuple holding the relevant info associated to a SoftWare Heritage
persistent IDentifier (SWHID)
Deprecated alternative to QualifiedSWHID.
Args:
namespace (str): the namespace of the identifier, defaults to ``swh``
......@@ -744,6 +1129,13 @@ class SWHID:
type=ImmutableDict[str, Any], converter=ImmutableDict, default=ImmutableDict()
)
def __attrs_post_init__(self):
warnings.warn(
"swh.model.identifiers.SWHID is deprecated; "
"use swh.model.identifiers.QualifiedSWHID instead.",
DeprecationWarning,
)
@namespace.validator
def check_namespace(self, attribute, value):
if value != SWHID_NAMESPACE:
......@@ -835,16 +1227,17 @@ def swhid(
return str(swhid)
def parse_swhid(swhid: str) -> SWHID:
def _parse_swhid(swhid: str) -> Dict[str, Any]:
"""Parse a Software Heritage identifier (SWHID) from string (see:
:ref:`persistent-identifiers`.)
This is for internal use; use :meth:`CoreSWHID.from_string`,
:meth:`QualifiedSWHID.from_string`, or :meth:`ExtendedSWHID.from_string` instead,
as they perform validation and build a dataclass.
Args:
swhid (str): A persistent identifier
Returns:
a named tuple holding the parsing result
Raises:
swh.model.exceptions.ValidationError: if passed string is not a valid SWHID
......@@ -854,10 +1247,10 @@ def parse_swhid(swhid: str) -> SWHID:
raise ValidationError(
"Invalid SWHID: invalid syntax: %(swhid)s", params={"swhid": swhid}
)
parts = m.groupdict()
parts: Dict[str, Any] = m.groupdict()
_qualifiers = {}
qualifiers_raw = parts["qualifiers"]
parts["qualifiers"] = {}
if qualifiers_raw:
for qualifier in qualifiers_raw.split(SWHID_CTXT_SEP):
try:
......@@ -867,12 +1260,29 @@ def parse_swhid(swhid: str) -> SWHID:
"Invalid SWHID: invalid qualifier: %(qualifier)s",
params={"qualifier": qualifier},
)
_qualifiers[k] = v
parts["qualifiers"][k] = v
parts["scheme_version"] = int(parts["scheme_version"])
parts["object_id"] = hash_to_bytes(parts["object_id"])
return parts
def parse_swhid(swhid: str) -> SWHID:
"""Parse a Software Heritage identifier (SWHID) from string (see:
:ref:`persistent-identifiers`.)
Args:
swhid (str): A persistent identifier
Raises:
swh.model.exceptions.ValidationError: if passed string is not a valid SWHID
"""
parts = _parse_swhid(swhid)
return SWHID(
parts["scheme"],
int(parts["version"]),
parts["namespace"],
parts["scheme_version"],
_swhid_type_map[parts["object_type"]],
parts["object_id"],
_qualifiers, # type: ignore # mypy can't properly unify types
hash_to_hex(parts["object_id"]),
parts["qualifiers"],
)
# Copyright (C) 2015-2018 The Software Heritage developers
# Copyright (C) 2015-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import contextlib
import hashlib
import io
import os
......@@ -14,6 +15,17 @@ from swh.model import hashutil
from swh.model.hashutil import MultiHash
@contextlib.contextmanager
def patch_blake2(function_name):
try:
with patch(function_name) as mock:
yield mock
finally:
# mocking blake2 inserts mock objects in the cache; we need
# to clean it before the next test runs
hashutil._blake2_hash_cache.clear()
class BaseHashutil(unittest.TestCase):
def setUp(self):
# Reset function cache
......@@ -195,7 +207,7 @@ class Hashutil(BaseHashutil):
if "blake2b" not in hashlib.algorithms_available:
self.skipTest("blake2b not built in")
with patch("hashlib.blake2b") as mock_blake2b:
with patch_blake2("hashlib.blake2b") as mock_blake2b:
mock_blake2b.return_value = sentinel = object()
h = hashutil._new_hash("blake2b512")
......@@ -216,7 +228,7 @@ class Hashutil(BaseHashutil):
if "blake2s" not in hashlib.algorithms_available:
self.skipTest("blake2s not built in")
with patch("hashlib.blake2s") as mock_blake2s:
with patch_blake2("hashlib.blake2s") as mock_blake2s:
mock_blake2s.return_value = sentinel = object()
h = hashutil._new_hash("blake2s256")
......@@ -233,7 +245,7 @@ class Hashutil(BaseHashutil):
if "blake2b" in hashlib.algorithms_available:
self.skipTest("blake2b built in")
with patch("pyblake2.blake2b") as mock_blake2b:
with patch_blake2("pyblake2.blake2b") as mock_blake2b:
mock_blake2b.return_value = sentinel = object()
h = hashutil._new_hash("blake2b512")
......@@ -247,7 +259,7 @@ class Hashutil(BaseHashutil):
if "blake2s" in hashlib.algorithms_available:
self.skipTest("blake2s built in")
with patch("pyblake2.blake2s") as mock_blake2s:
with patch_blake2("pyblake2.blake2s") as mock_blake2s:
mock_blake2s.return_value = sentinel = object()
h = hashutil._new_hash("blake2s256")
......
This diff is collapsed.
......@@ -8,6 +8,7 @@ deps =
pytest-cov
commands =
pytest --cov={envsitepackagesdir}/swh/model \
--doctest-modules \
{envsitepackagesdir}/swh/model \
--cov-branch {posargs}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment