From 9ec683264c415731286005dff823e1099ef358c3 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz <vlorentz@softwareheritage.org> Date: Wed, 22 Sep 2021 16:36:30 +0200 Subject: [PATCH] Move SWHID classes and functions from identifiers.py to swhids.py identifiers.py initially worked only on bare sha1_git. I chose to add the SWHID classes in that module because of the name, but the SWHID code didn't actually interact with the other functions in the module, so it now feels out of place to me. --- swh/model/identifiers.py | 455 +-------------------------------------- swh/model/model.py | 8 +- swh/model/swhids.py | 448 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 456 insertions(+), 455 deletions(-) create mode 100644 swh/model/swhids.py diff --git a/swh/model/identifiers.py b/swh/model/identifiers.py index 421d2e77..fb39d798 100644 --- a/swh/model/identifiers.py +++ b/swh/model/identifiers.py @@ -7,56 +7,14 @@ from __future__ import annotations import binascii import datetime -import enum from functools import lru_cache import hashlib -import re -from typing import ( - Any, - Dict, - Generic, - Iterable, - List, - Optional, - Tuple, - Type, - TypeVar, - Union, -) -import urllib.parse +from typing import Any, Dict, Iterable, List, Optional, Tuple -import attr -from attrs_strict import type_validator - -from .exceptions import ValidationError -from .hashutil import MultiHash, git_object_header, hash_to_bytes, hash_to_hex - - -class ObjectType(enum.Enum): - """Possible object types of a QualifiedSWHID or CoreSWHID. - - The values of each variant is what is used in the SWHID's string representation.""" - - SNAPSHOT = "snp" - REVISION = "rev" - RELEASE = "rel" - DIRECTORY = "dir" - CONTENT = "cnt" - - -class ExtendedObjectType(enum.Enum): - """Possible object types of an ExtendedSWHID. - - The variants are a superset of :class:`ObjectType`'s""" - - SNAPSHOT = "snp" - REVISION = "rev" - RELEASE = "rel" - DIRECTORY = "dir" - CONTENT = "cnt" - ORIGIN = "ori" - RAW_EXTRINSIC_METADATA = "emd" +from .hashutil import MultiHash, git_object_header +# Reexport for backward compatibility +from .swhids import * # noqa # The following are deprecated aliases of the variants defined in ObjectType # while transitioning from SWHID to QualifiedSWHID @@ -69,24 +27,6 @@ CONTENT = "content" RAW_EXTRINSIC_METADATA = "raw_extrinsic_metadata" -SWHID_NAMESPACE = "swh" -SWHID_VERSION = 1 -SWHID_TYPES = ["snp", "rel", "rev", "dir", "cnt"] -EXTENDED_SWHID_TYPES = SWHID_TYPES + ["ori", "emd"] -SWHID_SEP = ":" -SWHID_CTXT_SEP = ";" -SWHID_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"} - -SWHID_RE_RAW = ( - f"(?P<namespace>{SWHID_NAMESPACE})" - f"{SWHID_SEP}(?P<scheme_version>{SWHID_VERSION})" - f"{SWHID_SEP}(?P<object_type>{'|'.join(EXTENDED_SWHID_TYPES)})" - f"{SWHID_SEP}(?P<object_id>[0-9a-f]{{40}})" - f"({SWHID_CTXT_SEP}(?P<qualifiers>\\S+))?" -) -SWHID_RE = re.compile(SWHID_RE_RAW) - - @lru_cache() def identifier_to_bytes(identifier): """Convert a text identifier to bytes. @@ -887,390 +827,3 @@ def extid_identifier(extid: Dict[str, Any]) -> str: git_object = format_git_object_from_headers("extid", headers) return hashlib.new("sha1", git_object).hexdigest() - - -# type of the "object_type" attribute of the SWHID class; either -# ObjectType or ExtendedObjectType -_TObjectType = TypeVar("_TObjectType", ObjectType, ExtendedObjectType) - -# the SWHID class itself (this is used so that X.from_string() can return X -# for all X subclass of _BaseSWHID) -_TSWHID = TypeVar("_TSWHID", bound="_BaseSWHID") - - -@attr.s(frozen=True, kw_only=True) -class _BaseSWHID(Generic[_TObjectType]): - """Common base class for CoreSWHID, QualifiedSWHID, and ExtendedSWHID. - - This is an "abstract" class and should not be instantiated directly; - it only exists to deduplicate code between these three SWHID classes.""" - - namespace = attr.ib(type=str, default=SWHID_NAMESPACE) - """the namespace of the identifier, defaults to ``swh``""" - - scheme_version = attr.ib(type=int, default=SWHID_VERSION) - """the scheme version of the identifier, defaults to 1""" - - # overridden by subclasses - object_type: _TObjectType - """the type of object the identifier points to""" - - object_id = attr.ib(type=bytes, validator=type_validator()) - """object's identifier""" - - @namespace.validator - def check_namespace(self, attribute, value): - if value != SWHID_NAMESPACE: - raise ValidationError( - "Invalid SWHID: invalid namespace: %(namespace)s", - params={"namespace": value}, - ) - - @scheme_version.validator - def check_scheme_version(self, attribute, value): - if value != SWHID_VERSION: - raise ValidationError( - "Invalid SWHID: invalid version: %(version)s", params={"version": value} - ) - - @object_id.validator - def check_object_id(self, attribute, value): - if len(value) != 20: - raise ValidationError( - "Invalid SWHID: invalid checksum: %(object_id)s", - params={"object_id": hash_to_hex(value)}, - ) - - def __str__(self) -> str: - return SWHID_SEP.join( - [ - self.namespace, - str(self.scheme_version), - self.object_type.value, - hash_to_hex(self.object_id), - ] - ) - - @classmethod - def from_string(cls: Type[_TSWHID], s: str) -> _TSWHID: - parts = _parse_swhid(s) - if parts.pop("qualifiers"): - raise ValidationError(f"{cls.__name__} does not support qualifiers.") - try: - return cls(**parts) - except ValueError as e: - raise ValidationError( - "ValueError: %(args)s", params={"args": e.args} - ) from None - - -@attr.s(frozen=True, kw_only=True) -class CoreSWHID(_BaseSWHID[ObjectType]): - """ - Dataclass holding the relevant info associated to a SoftWare Heritage - persistent IDentifier (SWHID). - - Unlike `QualifiedSWHID`, it is restricted to core SWHIDs, ie. SWHIDs - with no qualifiers. - - Raises: - swh.model.exceptions.ValidationError: In case of invalid object type or id - - To get the raw SWHID string from an instance of this class, - use the :func:`str` function: - - >>> swhid = CoreSWHID( - ... object_type=ObjectType.CONTENT, - ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'), - ... ) - >>> str(swhid) - 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0' - - And vice-versa with :meth:`CoreSWHID.from_string`: - - >>> swhid == CoreSWHID.from_string( - ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0" - ... ) - True - """ - - object_type = attr.ib( - type=ObjectType, validator=type_validator(), converter=ObjectType - ) - """the type of object the identifier points to""" - - def to_extended(self) -> ExtendedSWHID: - """Converts this CoreSWHID into an ExtendedSWHID. - - As ExtendedSWHID is a superset of CoreSWHID, this is lossless.""" - return ExtendedSWHID( - namespace=self.namespace, - scheme_version=self.scheme_version, - object_type=ExtendedObjectType(self.object_type.value), - object_id=self.object_id, - ) - - -def _parse_core_swhid(swhid: Union[str, CoreSWHID, None]) -> Optional[CoreSWHID]: - if swhid is None or isinstance(swhid, CoreSWHID): - return swhid - else: - return CoreSWHID.from_string(swhid) - - -def _parse_lines_qualifier( - lines: Union[str, Tuple[int, Optional[int]], None] -) -> Optional[Tuple[int, Optional[int]]]: - try: - if lines is None or isinstance(lines, tuple): - return lines - elif "-" in lines: - (from_, to) = lines.split("-", 2) - return (int(from_), int(to)) - else: - return (int(lines), None) - except ValueError: - raise ValidationError( - "Invalid format for the lines qualifier: %(lines)s", params={"lines": lines} - ) - - -def _parse_path_qualifier(path: Union[str, bytes, None]) -> Optional[bytes]: - if path is None or isinstance(path, bytes): - return path - else: - return urllib.parse.unquote_to_bytes(path) - - -@attr.s(frozen=True, kw_only=True) -class QualifiedSWHID(_BaseSWHID[ObjectType]): - """ - Dataclass holding the relevant info associated to a SoftWare Heritage - persistent IDentifier (SWHID) - - Raises: - swh.model.exceptions.ValidationError: In case of invalid object type or id - - To get the raw SWHID string from an instance of this class, - use the :func:`str` function: - - >>> swhid = QualifiedSWHID( - ... object_type=ObjectType.CONTENT, - ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'), - ... lines=(5, 10), - ... ) - >>> str(swhid) - 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10' - - And vice-versa with :meth:`QualifiedSWHID.from_string`: - - >>> swhid == QualifiedSWHID.from_string( - ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10" - ... ) - True - """ - - object_type = attr.ib( - type=ObjectType, validator=type_validator(), converter=ObjectType - ) - """the type of object the identifier points to""" - - # qualifiers: - - origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) - """the software origin where an object has been found or observed in the wild, - as an URI""" - - visit = attr.ib(type=Optional[CoreSWHID], default=None, converter=_parse_core_swhid) - """the core identifier of a snapshot corresponding to a specific visit - of a repository containing the designated object""" - - anchor = attr.ib( - type=Optional[CoreSWHID], - default=None, - validator=type_validator(), - converter=_parse_core_swhid, - ) - """a designated node in the Merkle DAG relative to which a path to the object - is specified, as the core identifier of a directory, a revision, a release, - or a snapshot""" - - path = attr.ib( - type=Optional[bytes], - default=None, - validator=type_validator(), - converter=_parse_path_qualifier, - ) - """the absolute file path, from the root directory associated to the anchor node, - to the object; when the anchor denotes a directory or a revision, and almost always - when it’s a release, the root directory is uniquely determined; - when the anchor denotes a snapshot, the root directory is the one pointed to by HEAD - (possibly indirectly), and undefined if such a reference is missing""" - - lines = attr.ib( - type=Optional[Tuple[int, Optional[int]]], - default=None, - validator=type_validator(), - converter=_parse_lines_qualifier, - ) - """lines: line number(s) of interest, usually within a content object""" - - @visit.validator - def check_visit(self, attribute, value): - if value and value.object_type != ObjectType.SNAPSHOT: - raise ValidationError( - "The 'visit' qualifier must be a 'snp' SWHID, not '%(type)s'", - params={"type": value.object_type.value}, - ) - - @anchor.validator - def check_anchor(self, attribute, value): - if value and value.object_type not in ( - ObjectType.DIRECTORY, - ObjectType.REVISION, - ObjectType.RELEASE, - ObjectType.SNAPSHOT, - ): - raise ValidationError( - "The 'visit' qualifier must be a 'dir', 'rev', 'rel', or 'snp' SWHID, " - "not '%s(type)s'", - params={"type": value.object_type.value}, - ) - - def qualifiers(self) -> Dict[str, str]: - origin = self.origin - if origin: - unescaped_origin = origin - origin = origin.replace(";", "%3B") - assert urllib.parse.unquote_to_bytes( - origin - ) == urllib.parse.unquote_to_bytes( - unescaped_origin - ), "Escaping ';' in the origin qualifier corrupted the origin URL." - - d: Dict[str, Optional[str]] = { - "origin": origin, - "visit": str(self.visit) if self.visit else None, - "anchor": str(self.anchor) if self.anchor else None, - "path": ( - urllib.parse.quote_from_bytes(self.path) - if self.path is not None - else None - ), - "lines": ( - "-".join(str(line) for line in self.lines if line is not None) - if self.lines - else None - ), - } - return {k: v for (k, v) in d.items() if v is not None} - - def __str__(self) -> str: - swhid = SWHID_SEP.join( - [ - self.namespace, - str(self.scheme_version), - self.object_type.value, - hash_to_hex(self.object_id), - ] - ) - qualifiers = self.qualifiers() - if qualifiers: - for k, v in qualifiers.items(): - swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v) - return swhid - - @classmethod - def from_string(cls, s: str) -> QualifiedSWHID: - parts = _parse_swhid(s) - qualifiers = parts.pop("qualifiers") - invalid_qualifiers = set(qualifiers) - SWHID_QUALIFIERS - if invalid_qualifiers: - raise ValidationError( - "Invalid qualifier(s): %(qualifiers)s", - params={"qualifiers": ", ".join(invalid_qualifiers)}, - ) - try: - return QualifiedSWHID(**parts, **qualifiers) - except ValueError as e: - raise ValidationError( - "ValueError: %(args)s", params={"args": e.args} - ) from None - - -@attr.s(frozen=True, kw_only=True) -class ExtendedSWHID(_BaseSWHID[ExtendedObjectType]): - """ - Dataclass holding the relevant info associated to a SoftWare Heritage - persistent IDentifier (SWHID). - - It extends `CoreSWHID`, by allowing non-standard object types; and should - only be used internally to Software Heritage. - - Raises: - swh.model.exceptions.ValidationError: In case of invalid object type or id - - To get the raw SWHID string from an instance of this class, - use the :func:`str` function: - - >>> swhid = ExtendedSWHID( - ... object_type=ExtendedObjectType.CONTENT, - ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'), - ... ) - >>> str(swhid) - 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0' - - And vice-versa with :meth:`CoreSWHID.from_string`: - - >>> swhid == ExtendedSWHID.from_string( - ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0" - ... ) - True - """ - - object_type = attr.ib( - type=ExtendedObjectType, - validator=type_validator(), - converter=ExtendedObjectType, - ) - """the type of object the identifier points to""" - - -def _parse_swhid(swhid: str) -> Dict[str, Any]: - """Parse a Software Heritage identifier (SWHID) from string (see: - :ref:`persistent-identifiers`.) - - This is for internal use; use :meth:`CoreSWHID.from_string`, - :meth:`QualifiedSWHID.from_string`, or :meth:`ExtendedSWHID.from_string` instead, - as they perform validation and build a dataclass. - - Args: - swhid (str): A persistent identifier - - Raises: - swh.model.exceptions.ValidationError: if passed string is not a valid SWHID - - """ - m = SWHID_RE.fullmatch(swhid) - if not m: - raise ValidationError( - "Invalid SWHID: invalid syntax: %(swhid)s", params={"swhid": swhid} - ) - parts: Dict[str, Any] = m.groupdict() - - qualifiers_raw = parts["qualifiers"] - parts["qualifiers"] = {} - if qualifiers_raw: - for qualifier in qualifiers_raw.split(SWHID_CTXT_SEP): - try: - k, v = qualifier.split("=", maxsplit=1) - parts["qualifiers"][k] = v - except ValueError: - raise ValidationError( - "Invalid SWHID: invalid qualifier: %(qualifier)s", - params={"qualifier": qualifier}, - ) - - parts["scheme_version"] = int(parts["scheme_version"]) - parts["object_id"] = hash_to_bytes(parts["object_id"]) - return parts diff --git a/swh/model/model.py b/swh/model/model.py index 5f8e0521..7228b55f 100644 --- a/swh/model/model.py +++ b/swh/model/model.py @@ -27,10 +27,10 @@ from .identifiers import ( revision_identifier, snapshot_identifier, ) -from .identifiers import CoreSWHID -from .identifiers import ExtendedObjectType as SwhidExtendedObjectType -from .identifiers import ExtendedSWHID -from .identifiers import ObjectType as SwhidObjectType +from .swhids import CoreSWHID +from .swhids import ExtendedObjectType as SwhidExtendedObjectType +from .swhids import ExtendedSWHID +from .swhids import ObjectType as SwhidObjectType class MissingData(Exception): diff --git a/swh/model/swhids.py b/swh/model/swhids.py new file mode 100644 index 00000000..93917855 --- /dev/null +++ b/swh/model/swhids.py @@ -0,0 +1,448 @@ +# Copyright (C) 2015-2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from __future__ import annotations + +import enum +import re +from typing import Any, Dict, Generic, Optional, Tuple, Type, TypeVar, Union +import urllib.parse + +import attr +from attrs_strict import type_validator + +from .exceptions import ValidationError +from .hashutil import hash_to_bytes, hash_to_hex + + +class ObjectType(enum.Enum): + """Possible object types of a QualifiedSWHID or CoreSWHID. + + The values of each variant is what is used in the SWHID's string representation.""" + + SNAPSHOT = "snp" + REVISION = "rev" + RELEASE = "rel" + DIRECTORY = "dir" + CONTENT = "cnt" + + +class ExtendedObjectType(enum.Enum): + """Possible object types of an ExtendedSWHID. + + The variants are a superset of :class:`ObjectType`'s""" + + SNAPSHOT = "snp" + REVISION = "rev" + RELEASE = "rel" + DIRECTORY = "dir" + CONTENT = "cnt" + ORIGIN = "ori" + RAW_EXTRINSIC_METADATA = "emd" + + +SWHID_NAMESPACE = "swh" +SWHID_VERSION = 1 +SWHID_TYPES = ["snp", "rel", "rev", "dir", "cnt"] +EXTENDED_SWHID_TYPES = SWHID_TYPES + ["ori", "emd"] +SWHID_SEP = ":" +SWHID_CTXT_SEP = ";" +SWHID_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"} + +SWHID_RE_RAW = ( + f"(?P<namespace>{SWHID_NAMESPACE})" + f"{SWHID_SEP}(?P<scheme_version>{SWHID_VERSION})" + f"{SWHID_SEP}(?P<object_type>{'|'.join(EXTENDED_SWHID_TYPES)})" + f"{SWHID_SEP}(?P<object_id>[0-9a-f]{{40}})" + f"({SWHID_CTXT_SEP}(?P<qualifiers>\\S+))?" +) +SWHID_RE = re.compile(SWHID_RE_RAW) + + +# type of the "object_type" attribute of the SWHID class; either +# ObjectType or ExtendedObjectType +_TObjectType = TypeVar("_TObjectType", ObjectType, ExtendedObjectType) + +# the SWHID class itself (this is used so that X.from_string() can return X +# for all X subclass of _BaseSWHID) +_TSWHID = TypeVar("_TSWHID", bound="_BaseSWHID") + + +@attr.s(frozen=True, kw_only=True) +class _BaseSWHID(Generic[_TObjectType]): + """Common base class for CoreSWHID, QualifiedSWHID, and ExtendedSWHID. + + This is an "abstract" class and should not be instantiated directly; + it only exists to deduplicate code between these three SWHID classes.""" + + namespace = attr.ib(type=str, default=SWHID_NAMESPACE) + """the namespace of the identifier, defaults to ``swh``""" + + scheme_version = attr.ib(type=int, default=SWHID_VERSION) + """the scheme version of the identifier, defaults to 1""" + + # overridden by subclasses + object_type: _TObjectType + """the type of object the identifier points to""" + + object_id = attr.ib(type=bytes, validator=type_validator()) + """object's identifier""" + + @namespace.validator + def check_namespace(self, attribute, value): + if value != SWHID_NAMESPACE: + raise ValidationError( + "Invalid SWHID: invalid namespace: %(namespace)s", + params={"namespace": value}, + ) + + @scheme_version.validator + def check_scheme_version(self, attribute, value): + if value != SWHID_VERSION: + raise ValidationError( + "Invalid SWHID: invalid version: %(version)s", params={"version": value} + ) + + @object_id.validator + def check_object_id(self, attribute, value): + if len(value) != 20: + raise ValidationError( + "Invalid SWHID: invalid checksum: %(object_id)s", + params={"object_id": hash_to_hex(value)}, + ) + + def __str__(self) -> str: + return SWHID_SEP.join( + [ + self.namespace, + str(self.scheme_version), + self.object_type.value, + hash_to_hex(self.object_id), + ] + ) + + @classmethod + def from_string(cls: Type[_TSWHID], s: str) -> _TSWHID: + parts = _parse_swhid(s) + if parts.pop("qualifiers"): + raise ValidationError(f"{cls.__name__} does not support qualifiers.") + try: + return cls(**parts) + except ValueError as e: + raise ValidationError( + "ValueError: %(args)s", params={"args": e.args} + ) from None + + +@attr.s(frozen=True, kw_only=True) +class CoreSWHID(_BaseSWHID[ObjectType]): + """ + Dataclass holding the relevant info associated to a SoftWare Heritage + persistent IDentifier (SWHID). + + Unlike `QualifiedSWHID`, it is restricted to core SWHIDs, ie. SWHIDs + with no qualifiers. + + Raises: + swh.model.exceptions.ValidationError: In case of invalid object type or id + + To get the raw SWHID string from an instance of this class, + use the :func:`str` function: + + >>> swhid = CoreSWHID( + ... object_type=ObjectType.CONTENT, + ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'), + ... ) + >>> str(swhid) + 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0' + + And vice-versa with :meth:`CoreSWHID.from_string`: + + >>> swhid == CoreSWHID.from_string( + ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0" + ... ) + True + """ + + object_type = attr.ib( + type=ObjectType, validator=type_validator(), converter=ObjectType + ) + """the type of object the identifier points to""" + + def to_extended(self) -> ExtendedSWHID: + """Converts this CoreSWHID into an ExtendedSWHID. + + As ExtendedSWHID is a superset of CoreSWHID, this is lossless.""" + return ExtendedSWHID( + namespace=self.namespace, + scheme_version=self.scheme_version, + object_type=ExtendedObjectType(self.object_type.value), + object_id=self.object_id, + ) + + +def _parse_core_swhid(swhid: Union[str, CoreSWHID, None]) -> Optional[CoreSWHID]: + if swhid is None or isinstance(swhid, CoreSWHID): + return swhid + else: + return CoreSWHID.from_string(swhid) + + +def _parse_lines_qualifier( + lines: Union[str, Tuple[int, Optional[int]], None] +) -> Optional[Tuple[int, Optional[int]]]: + try: + if lines is None or isinstance(lines, tuple): + return lines + elif "-" in lines: + (from_, to) = lines.split("-", 2) + return (int(from_), int(to)) + else: + return (int(lines), None) + except ValueError: + raise ValidationError( + "Invalid format for the lines qualifier: %(lines)s", params={"lines": lines} + ) + + +def _parse_path_qualifier(path: Union[str, bytes, None]) -> Optional[bytes]: + if path is None or isinstance(path, bytes): + return path + else: + return urllib.parse.unquote_to_bytes(path) + + +@attr.s(frozen=True, kw_only=True) +class QualifiedSWHID(_BaseSWHID[ObjectType]): + """ + Dataclass holding the relevant info associated to a SoftWare Heritage + persistent IDentifier (SWHID) + + Raises: + swh.model.exceptions.ValidationError: In case of invalid object type or id + + To get the raw SWHID string from an instance of this class, + use the :func:`str` function: + + >>> swhid = QualifiedSWHID( + ... object_type=ObjectType.CONTENT, + ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'), + ... lines=(5, 10), + ... ) + >>> str(swhid) + 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10' + + And vice-versa with :meth:`QualifiedSWHID.from_string`: + + >>> swhid == QualifiedSWHID.from_string( + ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0;lines=5-10" + ... ) + True + """ + + object_type = attr.ib( + type=ObjectType, validator=type_validator(), converter=ObjectType + ) + """the type of object the identifier points to""" + + # qualifiers: + + origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) + """the software origin where an object has been found or observed in the wild, + as an URI""" + + visit = attr.ib(type=Optional[CoreSWHID], default=None, converter=_parse_core_swhid) + """the core identifier of a snapshot corresponding to a specific visit + of a repository containing the designated object""" + + anchor = attr.ib( + type=Optional[CoreSWHID], + default=None, + validator=type_validator(), + converter=_parse_core_swhid, + ) + """a designated node in the Merkle DAG relative to which a path to the object + is specified, as the core identifier of a directory, a revision, a release, + or a snapshot""" + + path = attr.ib( + type=Optional[bytes], + default=None, + validator=type_validator(), + converter=_parse_path_qualifier, + ) + """the absolute file path, from the root directory associated to the anchor node, + to the object; when the anchor denotes a directory or a revision, and almost always + when it’s a release, the root directory is uniquely determined; + when the anchor denotes a snapshot, the root directory is the one pointed to by HEAD + (possibly indirectly), and undefined if such a reference is missing""" + + lines = attr.ib( + type=Optional[Tuple[int, Optional[int]]], + default=None, + validator=type_validator(), + converter=_parse_lines_qualifier, + ) + """lines: line number(s) of interest, usually within a content object""" + + @visit.validator + def check_visit(self, attribute, value): + if value and value.object_type != ObjectType.SNAPSHOT: + raise ValidationError( + "The 'visit' qualifier must be a 'snp' SWHID, not '%(type)s'", + params={"type": value.object_type.value}, + ) + + @anchor.validator + def check_anchor(self, attribute, value): + if value and value.object_type not in ( + ObjectType.DIRECTORY, + ObjectType.REVISION, + ObjectType.RELEASE, + ObjectType.SNAPSHOT, + ): + raise ValidationError( + "The 'visit' qualifier must be a 'dir', 'rev', 'rel', or 'snp' SWHID, " + "not '%s(type)s'", + params={"type": value.object_type.value}, + ) + + def qualifiers(self) -> Dict[str, str]: + origin = self.origin + if origin: + unescaped_origin = origin + origin = origin.replace(";", "%3B") + assert urllib.parse.unquote_to_bytes( + origin + ) == urllib.parse.unquote_to_bytes( + unescaped_origin + ), "Escaping ';' in the origin qualifier corrupted the origin URL." + + d: Dict[str, Optional[str]] = { + "origin": origin, + "visit": str(self.visit) if self.visit else None, + "anchor": str(self.anchor) if self.anchor else None, + "path": ( + urllib.parse.quote_from_bytes(self.path) + if self.path is not None + else None + ), + "lines": ( + "-".join(str(line) for line in self.lines if line is not None) + if self.lines + else None + ), + } + return {k: v for (k, v) in d.items() if v is not None} + + def __str__(self) -> str: + swhid = SWHID_SEP.join( + [ + self.namespace, + str(self.scheme_version), + self.object_type.value, + hash_to_hex(self.object_id), + ] + ) + qualifiers = self.qualifiers() + if qualifiers: + for k, v in qualifiers.items(): + swhid += "%s%s=%s" % (SWHID_CTXT_SEP, k, v) + return swhid + + @classmethod + def from_string(cls, s: str) -> QualifiedSWHID: + parts = _parse_swhid(s) + qualifiers = parts.pop("qualifiers") + invalid_qualifiers = set(qualifiers) - SWHID_QUALIFIERS + if invalid_qualifiers: + raise ValidationError( + "Invalid qualifier(s): %(qualifiers)s", + params={"qualifiers": ", ".join(invalid_qualifiers)}, + ) + try: + return QualifiedSWHID(**parts, **qualifiers) + except ValueError as e: + raise ValidationError( + "ValueError: %(args)s", params={"args": e.args} + ) from None + + +@attr.s(frozen=True, kw_only=True) +class ExtendedSWHID(_BaseSWHID[ExtendedObjectType]): + """ + Dataclass holding the relevant info associated to a SoftWare Heritage + persistent IDentifier (SWHID). + + It extends `CoreSWHID`, by allowing non-standard object types; and should + only be used internally to Software Heritage. + + Raises: + swh.model.exceptions.ValidationError: In case of invalid object type or id + + To get the raw SWHID string from an instance of this class, + use the :func:`str` function: + + >>> swhid = ExtendedSWHID( + ... object_type=ExtendedObjectType.CONTENT, + ... object_id=bytes.fromhex('8ff44f081d43176474b267de5451f2c2e88089d0'), + ... ) + >>> str(swhid) + 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0' + + And vice-versa with :meth:`CoreSWHID.from_string`: + + >>> swhid == ExtendedSWHID.from_string( + ... "swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0" + ... ) + True + """ + + object_type = attr.ib( + type=ExtendedObjectType, + validator=type_validator(), + converter=ExtendedObjectType, + ) + """the type of object the identifier points to""" + + +def _parse_swhid(swhid: str) -> Dict[str, Any]: + """Parse a Software Heritage identifier (SWHID) from string (see: + :ref:`persistent-identifiers`.) + + This is for internal use; use :meth:`CoreSWHID.from_string`, + :meth:`QualifiedSWHID.from_string`, or :meth:`ExtendedSWHID.from_string` instead, + as they perform validation and build a dataclass. + + Args: + swhid (str): A persistent identifier + + Raises: + swh.model.exceptions.ValidationError: if passed string is not a valid SWHID + + """ + m = SWHID_RE.fullmatch(swhid) + if not m: + raise ValidationError( + "Invalid SWHID: invalid syntax: %(swhid)s", params={"swhid": swhid} + ) + parts: Dict[str, Any] = m.groupdict() + + qualifiers_raw = parts["qualifiers"] + parts["qualifiers"] = {} + if qualifiers_raw: + for qualifier in qualifiers_raw.split(SWHID_CTXT_SEP): + try: + k, v = qualifier.split("=", maxsplit=1) + parts["qualifiers"][k] = v + except ValueError: + raise ValidationError( + "Invalid SWHID: invalid qualifier: %(qualifier)s", + params={"qualifier": qualifier}, + ) + + parts["scheme_version"] = int(parts["scheme_version"]) + parts["object_id"] = hash_to_bytes(parts["object_id"]) + return parts -- GitLab