Newer
Older
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from abc import ABCMeta, abstractmethod
from enum import Enum
from typing import Dict, List, Optional, Union
import attr
from attrs_strict import type_validator
import dateutil.parser
import iso8601
from .identifiers import (
normalize_timestamp,
directory_identifier,
revision_identifier,
release_identifier,
snapshot_identifier,
from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash

vlorentz
committed
class MissingData(Exception):
"""Raised by `Content.with_data` when it has no way of fetching the
data (but not when fetching the data fails)."""

vlorentz
committed
pass
# TODO: Limit this to 20 bytes
Sha1Git = bytes
def dictify(value):
"Helper function used by BaseModel.to_dict()"
if isinstance(value, BaseModel):
return value.to_dict()
elif isinstance(value, Enum):
return value.value
elif isinstance(value, dict):
return {k: dictify(v) for k, v in value.items()}
elif isinstance(value, list):
return [dictify(v) for v in value]
else:
return value
class BaseModel:
"""Base class for SWH model classes.
Provides serialization/deserialization to/from Python dictionaries,
that are suitable for JSON/msgpack-like formats."""
def to_dict(self):
"""Wrapper of `attr.asdict` that can be overridden by subclasses
that have special handling of some of the fields."""
return dictify(attr.asdict(self, recurse=False))
@classmethod
def from_dict(cls, d):
"""Takes a dictionary representing a tree of SWH objects, and
recursively builds the corresponding objects."""
return cls(**d)
class HashableObject(metaclass=ABCMeta):
"""Mixin to automatically compute object identifier hash when
the associated model is instantiated."""
@staticmethod
@abstractmethod
def compute_hash(object_dict):
"""Derived model classes must implement this to compute
the object hash from its dict representation."""
pass
def __attrs_post_init__(self):
if not self.id:
obj_id = hash_to_bytes(self.compute_hash(self.to_dict()))
class Person(BaseModel):
"""Represents the author/committer of a revision or release."""
fullname = attr.ib(type=bytes, validator=type_validator())
name = attr.ib(type=Optional[bytes], validator=type_validator())
email = attr.ib(type=Optional[bytes], validator=type_validator())
@classmethod
def from_fullname(cls, fullname: bytes):
"""Returns a Person object, by guessing the name and email from the
fullname, in the `name <email>` format.
The fullname is left unchanged."""
if fullname is None:
name: Optional[bytes]
email: Optional[bytes]
try:
except ValueError:
name = fullname
email = None
else:
raw_name = fullname[:open_bracket]
if not raw_name:
name = None
else:
name = raw_name.strip()
try:
except ValueError:
email = raw_email
else:
email = raw_email[:close_bracket]
return Person(name=name or None, email=email or None, fullname=fullname,)
class Timestamp(BaseModel):
"""Represents a naive timestamp from a VCS."""
seconds = attr.ib(type=int, validator=type_validator())
microseconds = attr.ib(type=int, validator=type_validator())
@seconds.validator
def check_seconds(self, attribute, value):
"""Check that seconds fit in a 64-bits signed integer."""
if not (-(2 ** 63) <= value < 2 ** 63):
raise ValueError("Seconds must be a signed 64-bits integer.")
@microseconds.validator
def check_microseconds(self, attribute, value):
"""Checks that microseconds are positive and < 1000000."""
if not (0 <= value < 10 ** 6):
raise ValueError("Microseconds must be in [0, 1000000[.")
class TimestampWithTimezone(BaseModel):
"""Represents a TZ-aware timestamp from a VCS."""
timestamp = attr.ib(type=Timestamp, validator=type_validator())
offset = attr.ib(type=int, validator=type_validator())
negative_utc = attr.ib(type=bool, validator=type_validator())
@offset.validator
def check_offset(self, attribute, value):
"""Checks the offset is a 16-bits signed integer (in theory, it
should always be between -14 and +14 hours)."""
# max 14 hours offset in theory, but you never know what
# you'll find in the wild...
@negative_utc.validator
def check_negative_utc(self, attribute, value):
if self.offset and value:
raise ValueError("negative_utc can only be True is offset=0")
@classmethod
def from_dict(cls, obj: Union[Dict, datetime.datetime, int]):
"""Builds a TimestampWithTimezone from any of the formats
accepted by :func:`swh.model.normalize_timestamp`."""
# TODO: this accept way more types than just dicts; find a better
# name
d = normalize_timestamp(obj)
return cls(
timestamp=Timestamp.from_dict(d["timestamp"]),
offset=d["offset"],
negative_utc=d["negative_utc"],
)
@classmethod
def from_datetime(cls, dt: datetime.datetime):
return cls.from_dict(dt)
@classmethod
def from_iso8601(cls, s):
"""Builds a TimestampWithTimezone from an ISO8601-formatted string.
"""
dt = iso8601.parse_date(s)
tstz = cls.from_datetime(dt)
tstz = attr.evolve(tstz, negative_utc=True)
return tstz
class Origin(BaseModel):
"""Represents a software source: a VCS and an URL."""
class OriginVisit(BaseModel):
"""Represents a visit of an origin at a given point in time, by a
SWH loader."""
origin = attr.ib(type=str, validator=type_validator())
date = attr.ib(type=datetime.datetime, validator=type_validator())
status = attr.ib(
type=str, validator=attr.validators.in_(["ongoing", "full", "partial"])
)
type = attr.ib(type=str, validator=type_validator())
snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator())
metadata = attr.ib(
type=Optional[Dict[str, object]], validator=type_validator(), default=None
)
visit = attr.ib(type=Optional[int], validator=type_validator(), default=None)
"""Should not be set before calling 'origin_visit_add()'."""
def to_dict(self):
"""Serializes the date as a string and omits the visit id if it is
`None`."""
ov = super().to_dict()
return ov
@classmethod
def from_dict(cls, d):
"""Parses the date from a string, and accepts missing visit ids."""
d = d.copy()
return super().from_dict(d)
@attr.s(frozen=True)
class OriginVisitUpdate(BaseModel):
"""Represents a visit update of an origin at a given point in time.
"""
origin = attr.ib(type=str, validator=type_validator())
visit = attr.ib(type=int, validator=type_validator())
date = attr.ib(type=datetime.datetime, validator=type_validator())
status = attr.ib(
type=str, validator=attr.validators.in_(["ongoing", "full", "partial"])
)
snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator())
metadata = attr.ib(
type=Optional[Dict[str, object]], validator=type_validator(), default=None
)
class TargetType(Enum):
"""The type of content pointed to by a snapshot branch. Usually a
revision or an alias."""
CONTENT = "content"
DIRECTORY = "directory"
REVISION = "revision"
RELEASE = "release"
SNAPSHOT = "snapshot"
ALIAS = "alias"
class ObjectType(Enum):
"""The type of content pointed to by a release. Usually a revision"""
CONTENT = "content"
DIRECTORY = "directory"
REVISION = "revision"
RELEASE = "release"
SNAPSHOT = "snapshot"
class SnapshotBranch(BaseModel):
"""Represents one of the branches of a snapshot."""
target = attr.ib(type=bytes, validator=type_validator())
target_type = attr.ib(type=TargetType, validator=type_validator())
@target.validator
def check_target(self, attribute, value):
"""Checks the target type is not an alias, checks the target is a
valid sha1_git."""
if self.target_type != TargetType.ALIAS and self.target is not None:
if len(value) != 20:
raise ValueError("Wrong length for bytes identifier: %d" % len(value))
@classmethod
def from_dict(cls, d):
return cls(target=d["target"], target_type=TargetType(d["target_type"]))
class Snapshot(BaseModel, HashableObject):
"""Represents the full state of an origin at a given point in time."""
branches = attr.ib(
type=Dict[bytes, Optional[SnapshotBranch]], validator=type_validator()
)
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
@staticmethod
def compute_hash(object_dict):
return snapshot_identifier(object_dict)
@classmethod
def from_dict(cls, d):
d = d.copy()
return cls(
branches={
name: SnapshotBranch.from_dict(branch) if branch else None
class Release(BaseModel, HashableObject):
name = attr.ib(type=bytes, validator=type_validator())
message = attr.ib(type=Optional[bytes], validator=type_validator())
target = attr.ib(type=Optional[Sha1Git], validator=type_validator())
target_type = attr.ib(type=ObjectType, validator=type_validator())
synthetic = attr.ib(type=bool, validator=type_validator())
author = attr.ib(type=Optional[Person], validator=type_validator(), default=None)
date = attr.ib(
type=Optional[TimestampWithTimezone], validator=type_validator(), default=None
)
metadata = attr.ib(
type=Optional[Dict[str, object]], validator=type_validator(), default=None
)
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
@staticmethod
def compute_hash(object_dict):
return release_identifier(object_dict)
@author.validator
def check_author(self, attribute, value):
"""If the author is `None`, checks the date is `None` too."""
if self.author is None and self.date is not None:
raise ValueError("release date must be None if author is None.")
def to_dict(self):
return rel
@classmethod
def from_dict(cls, d):
d = d.copy()
if d.get("author"):
d["author"] = Person.from_dict(d["author"])
if d.get("date"):
d["date"] = TimestampWithTimezone.from_dict(d["date"])
return cls(target_type=ObjectType(d.pop("target_type")), **d)
class RevisionType(Enum):
GIT = "git"
TAR = "tar"
DSC = "dsc"
SUBVERSION = "svn"
MERCURIAL = "hg"
class Revision(BaseModel, HashableObject):
message = attr.ib(type=bytes, validator=type_validator())
author = attr.ib(type=Person, validator=type_validator())
committer = attr.ib(type=Person, validator=type_validator())
date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator())
committer_date = attr.ib(
type=Optional[TimestampWithTimezone], validator=type_validator()
)
type = attr.ib(type=RevisionType, validator=type_validator())
directory = attr.ib(type=Sha1Git, validator=type_validator())
synthetic = attr.ib(type=bool, validator=type_validator())
metadata = attr.ib(
type=Optional[Dict[str, object]], validator=type_validator(), default=None
)
parents = attr.ib(
type=List[Sha1Git], validator=type_validator(), default=attr.Factory(list)
)
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
@staticmethod
def compute_hash(object_dict):
return revision_identifier(object_dict)
@classmethod
def from_dict(cls, d):
if date:
date = TimestampWithTimezone.from_dict(date)
committer_date = TimestampWithTimezone.from_dict(committer_date)
return cls(
author=Person.from_dict(d.pop("author")),
committer=Person.from_dict(d.pop("committer")),
date=date,
committer_date=committer_date,
class DirectoryEntry(BaseModel):
name = attr.ib(type=bytes, validator=type_validator())
type = attr.ib(type=str, validator=attr.validators.in_(["file", "dir", "rev"]))
target = attr.ib(type=Sha1Git, validator=type_validator())
perms = attr.ib(type=int, validator=type_validator())
"""Usually one of the values of `swh.model.from_disk.DentryPerms`."""
class Directory(BaseModel, HashableObject):
entries = attr.ib(type=List[DirectoryEntry], validator=type_validator())
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"")
@staticmethod
def compute_hash(object_dict):
return directory_identifier(object_dict)
@classmethod
def from_dict(cls, d):
d = d.copy()
return cls(
entries=[DirectoryEntry.from_dict(entry) for entry in d.pop("entries")], **d
)
class BaseContent(BaseModel):

vlorentz
committed
status = attr.ib(
type=str, validator=attr.validators.in_(["visible", "hidden", "absent"])
)

vlorentz
committed
@staticmethod
def _hash_data(data: bytes):
"""Hash some data, returning most of the fields of a content object"""
d = MultiHash.from_data(data).digest()
return d
@classmethod
def from_dict(cls, d, use_subclass=True):
if use_subclass:
# Chooses a subclass to instantiate instead.
return SkippedContent.from_dict(d)
else:
return Content.from_dict(d)
else:
return super().from_dict(d)
def get_hash(self, hash_name):
if hash_name not in DEFAULT_ALGORITHMS:
raise ValueError("{} is not a valid hash name.".format(hash_name))
return getattr(self, hash_name)
def hashes(self) -> Dict[str, bytes]:
"""Returns a dictionary {hash_name: hash_value}"""
return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS}
@attr.s(frozen=True)
class Content(BaseContent):
sha1 = attr.ib(type=bytes, validator=type_validator())
sha1_git = attr.ib(type=Sha1Git, validator=type_validator())
sha256 = attr.ib(type=bytes, validator=type_validator())
blake2s256 = attr.ib(type=bytes, validator=type_validator())
length = attr.ib(type=int, validator=type_validator())
status = attr.ib(
type=str,
validator=attr.validators.in_(["visible", "hidden"]),
default="visible",
)

vlorentz
committed
data = attr.ib(type=Optional[bytes], validator=type_validator(), default=None)
ctime = attr.ib(
type=Optional[datetime.datetime], validator=type_validator(), default=None
)
@length.validator
def check_length(self, attribute, value):
"""Checks the length is positive."""
def to_dict(self):
content = super().to_dict()
return content
@classmethod
def from_data(cls, data, status="visible", ctime=None) -> "Content":
"""Generate a Content from a given `data` byte string.
This populates the Content with the hashes and length for the data
passed as argument, as well as the data itself.
"""
d = cls._hash_data(data)
return cls(**d)
@classmethod
def from_dict(cls, d):
d = d.copy()
return super().from_dict(d, use_subclass=False)

vlorentz
committed
"""Loads the `data` attribute; meaning that it is guaranteed not to
be None after this call.
This call is almost a no-op, but subclasses may overload this method
to lazy-load data (eg. from disk or objstorage)."""
if self.data is None:

vlorentz
committed
return self
@attr.s(frozen=True)
class SkippedContent(BaseContent):
sha1 = attr.ib(type=Optional[bytes], validator=type_validator())
sha1_git = attr.ib(type=Optional[Sha1Git], validator=type_validator())
sha256 = attr.ib(type=Optional[bytes], validator=type_validator())
blake2s256 = attr.ib(type=Optional[bytes], validator=type_validator())
length = attr.ib(type=Optional[int], validator=type_validator())
status = attr.ib(type=str, validator=attr.validators.in_(["absent"]))
reason = attr.ib(type=Optional[str], validator=type_validator(), default=None)
origin = attr.ib(type=Optional[str], validator=type_validator(), default=None)
ctime = attr.ib(
type=Optional[datetime.datetime], validator=type_validator(), default=None
)
@reason.validator
def check_reason(self, attribute, value):
"""Checks the reason is full if status != absent."""
assert self.reason == value
if value is None:
raise ValueError("Must provide a reason if content is absent.")
@length.validator
def check_length(self, attribute, value):
"""Checks the length is positive or -1."""
def to_dict(self):
content = super().to_dict()
return content
@classmethod
def from_data(
cls, data: bytes, reason: str, ctime: Optional[datetime.datetime] = None
) -> "SkippedContent":
"""Generate a SkippedContent from a given `data` byte string.
This populates the SkippedContent with the hashes and length for the
data passed as argument.
You can use `attr.evolve` on such a generated content to nullify some
of its attributes, e.g. for tests.
"""
d = cls._hash_data(data)
del d["data"]
d["status"] = "absent"
d["reason"] = reason
d["ctime"] = ctime
return cls(**d)
@classmethod
def from_dict(cls, d):
raise ValueError('SkippedContent has no "data" attribute %r' % d)
return super().from_dict(d2, use_subclass=False)