Skip to content
Snippets Groups Projects
Commit d1b21569 authored by vlorentz's avatar vlorentz
Browse files

Add a model based using 'attrs' and Hypothesis strategies to generate it.

parent 4d40f4d3
No related branches found
No related tags found
No related merge requests found
...@@ -3,3 +3,5 @@ ...@@ -3,3 +3,5 @@
# dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html
vcversioner vcversioner
Click Click
attrs
hypothesis
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from hypothesis.strategies import (
lists, one_of, composite, builds, integers, sampled_from, binary,
dictionaries, none, from_regex, just
)
from .from_disk import DentryPerms
from .model import (
Person, Timestamp, TimestampWithTimezone, Origin, OriginVisit,
Snapshot, SnapshotBranch, TargetType, Release, Revision,
Directory, DirectoryEntry, Content
)
from .identifiers import snapshot_identifier, identifier_to_bytes
def sha1_git():
return binary(min_size=20, max_size=20)
@composite
def urls(draw):
protocol = draw(sampled_from(['git', 'http', 'https', 'deb']))
domain = draw(from_regex(r'\A([a-z]([a-z0-9-]*)\.){1,3}[a-z0-9]+\Z'))
return '%s://%s' % (protocol, domain)
def persons():
return builds(Person)
def timestamps():
return builds(
Timestamp,
seconds=integers(-2**63, 2**63-1),
microseconds=integers(0, 1000000))
def timestamps_with_timezone():
return builds(
TimestampWithTimezone,
timestamp=timestamps(),
offset=integers(-2**16, 2**16-1))
def origins():
return builds(
Origin,
type=sampled_from(['git', 'hg', 'svn', 'pypi', 'deb']),
url=urls())
def origin_visits():
return builds(
OriginVisit,
visit=integers(0, 1000),
origin=origins())
def releases():
return builds(
Release,
id=sha1_git(),
date=timestamps_with_timezone(),
author=one_of(none(), persons()),
target=one_of(none(), sha1_git()))
def revisions():
return builds(
Revision,
id=sha1_git(),
date=timestamps_with_timezone(),
committer_date=timestamps_with_timezone(),
parents=lists(binary()),
directory=binary(),
metadata=one_of(none(), dictionaries(binary(), binary())))
def directory_entries():
return builds(
DirectoryEntry,
target=sha1_git(),
perms=sampled_from([perm.value for perm in DentryPerms]))
def directories():
return builds(
Directory,
id=sha1_git(),
entries=lists(directory_entries()))
def contents():
def filter_data(content):
if content.status != 'visible':
content.data = None
return content
return builds(
Content,
length=integers(0),
data=binary(),
sha1_git=sha1_git(),
).map(filter_data)
def branch_names():
return binary()
def branch_targets_object():
return builds(
SnapshotBranch,
target=sha1_git(),
target_type=sampled_from([
TargetType.CONTENT, TargetType.DIRECTORY, TargetType.REVISION,
TargetType.RELEASE, TargetType.SNAPSHOT]))
def branch_targets_alias():
return builds(
SnapshotBranch,
target_type=just(TargetType.ALIAS))
def branch_targets(*, only_objects=False):
if only_objects:
return branch_targets_object()
else:
return one_of(branch_targets_alias(), branch_targets_object())
@composite
def snapshots(draw, *, min_size=0, max_size=100, only_objects=False):
branches = draw(dictionaries(
keys=branch_names(),
values=branch_targets(only_objects=only_objects),
min_size=min_size,
max_size=max_size,
))
if not only_objects:
# Make sure aliases point to actual branches
unresolved_aliases = {
target.target
for target in branches.values()
if (target
and target.target_type == 'alias'
and target.target not in branches)
}
for alias in unresolved_aliases:
branches[alias] = draw(branch_targets(only_objects=True))
while True:
try:
id_ = snapshot_identifier({
'branches': {
name: branch.to_dict()
for (name, branch) in branches.items()}})
except ValueError as e:
print(e.args)
for (source, target) in e.args[1]:
branches[source] = draw(branch_targets(only_objects=True))
else:
break
return Snapshot(
id=identifier_to_bytes(id_),
branches=branches)
def objects():
return one_of(
origins().map(lambda x: ('origin', x)),
origin_visits().map(lambda x: ('origin_visit', x)),
snapshots().map(lambda x: ('snapshot', x)),
releases().map(lambda x: ('release', x)),
revisions().map(lambda x: ('revision', x)),
directories().map(lambda x: ('directory', x)),
contents().map(lambda x: ('content', x)),
)
def object_dicts():
return objects().map(lambda x: (x[0], x[1].to_dict()))
...@@ -581,6 +581,8 @@ def snapshot_identifier(snapshot, *, ignore_unresolved=False): ...@@ -581,6 +581,8 @@ def snapshot_identifier(snapshot, *, ignore_unresolved=False):
if target_id not in snapshot['branches'] or target_id == name: if target_id not in snapshot['branches'] or target_id == name:
unresolved.append((name, target_id)) unresolved.append((name, target_id))
else: else:
print(name)
print(target)
target_type = target['target_type'].encode() target_type = target['target_type'].encode()
target_id = identifier_to_bytes(target['target']) target_id = identifier_to_bytes(target['target'])
......
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from enum import Enum
from typing import List, Optional, Dict
import attr
# TODO: Limit this to 20 bytes
Sha1Git = bytes
@attr.s
class Person:
name = attr.ib(type=bytes)
email = attr.ib(type=bytes)
fullname = attr.ib(type=bytes)
@attr.s
class Timestamp:
seconds = attr.ib(type=int)
microseconds = attr.ib(type=int)
@seconds.validator
def check_seconds(self, attribute, value):
"""Check that seconds fit in a 64-bits signed integer."""
if not (-2**63 <= value < 2**63):
raise ValueError('Seconds must be a signed 64-bits integer.')
@microseconds.validator
def check_microseconds(self, attribute, value):
"""Checks that microseconds are positive and < 1000000."""
if not (0 <= value < 10**6):
raise ValueError('Microseconds must be in [0, 1000000[.')
@attr.s
class TimestampWithTimezone:
timestamp = attr.ib(type=Timestamp)
offset = attr.ib(type=int)
negative_utc = attr.ib(type=bool)
def to_dict(self):
return attr.asdict(self)
@attr.s
class Origin:
type = attr.ib(type=str)
url = attr.ib(type=str)
def to_dict(self):
return attr.asdict(self)
@attr.s
class OriginVisit:
origin = attr.ib(type=Origin)
date = attr.ib(type=datetime.datetime)
visit = attr.ib(type=Optional[int])
"""Should not be set before calling 'origin_visit_add()'."""
def to_dict(self):
ov = attr.asdict(self)
ov['origin'] = self.origin.to_dict()
ov['date'] = str(self.date)
if not ov['visit']:
del ov['visit']
return ov
class TargetType(Enum):
CONTENT = 'content'
DIRECTORY = 'directory'
REVISION = 'revision'
RELEASE = 'release'
SNAPSHOT = 'snapshot'
ALIAS = 'alias'
@attr.s
class SnapshotBranch:
target = attr.ib(type=bytes)
target_type = attr.ib(type=TargetType)
@target.validator
def check_target(self, attribute, value):
if self.target_type != TargetType.ALIAS:
if len(value) != 20:
raise ValueError('Wrong length for bytes identifier: %d' %
len(value))
def to_dict(self):
branch = attr.asdict(self)
branch['target_type'] = branch['target_type'].value
return branch
@attr.s
class Snapshot:
id = attr.ib(type=Sha1Git)
branches = attr.ib(type=Dict[bytes, Optional[SnapshotBranch]])
def to_dict(self):
return {
'id': self.id,
'branches': {
name: branch.to_dict()
for (name, branch) in self.branches.items()
}
}
@attr.s
class Release:
id = attr.ib(type=Sha1Git)
name = attr.ib(type=bytes)
message = attr.ib(type=bytes)
date = attr.ib(type=TimestampWithTimezone)
author = attr.ib(type=Optional[Person])
target = attr.ib(type=Optional[Sha1Git])
target_type = attr.ib(type=TargetType)
synthetic = attr.ib(type=bool)
def to_dict(self):
rel = attr.asdict(self)
rel['date'] = self.date.to_dict()
rel['target_type'] = rel['target_type'].value
return rel
@attr.s
class Revision:
id = attr.ib(type=Sha1Git)
message = attr.ib(type=bytes)
author = attr.ib(type=Person)
committer = attr.ib(type=Person)
date = attr.ib(type=TimestampWithTimezone)
committer_date = attr.ib(type=TimestampWithTimezone)
parents = attr.ib(type=List[Sha1Git])
type = attr.ib(type=str)
directory = attr.ib(type=Sha1Git)
metadata = attr.ib(type=Optional[dict])
synthetic = attr.ib(type=bool)
def to_dict(self):
rev = attr.asdict(self)
rev['date'] = self.date.to_dict()
rev['committer_date'] = self.committer_date.to_dict()
return rev
@attr.s
class DirectoryEntry:
name = attr.ib(type=bytes)
type = attr.ib(type=str,
validator=attr.validators.in_(['file', 'dir', 'rev']))
target = attr.ib(type=Sha1Git)
perms = attr.ib(type=int)
"""Usually one of the values of `swh.model.from_disk.DentryPerms`."""
def to_dict(self):
return attr.asdict(self)
@attr.s
class Directory:
id = attr.ib(type=Sha1Git)
entries = attr.ib(type=List[DirectoryEntry])
def to_dict(self):
dir_ = attr.asdict(self)
dir_['entries'] = [entry.to_dict() for entry in self.entries]
return dir_
@attr.s
class Content:
sha1 = attr.ib(type=bytes)
sha1_git = attr.ib(type=Sha1Git)
sha256 = attr.ib(type=bytes)
blake2s256 = attr.ib(type=bytes)
data = attr.ib(type=bytes)
length = attr.ib(type=int)
status = attr.ib(
type=str,
validator=attr.validators.in_(['visible', 'absent', 'hidden']))
@length.validator
def check_length(self, attribute, value):
"""Checks the length is positive."""
if value < 0:
raise ValueError('Length must be positive.')
def to_dict(self):
content = attr.asdict(self)
if content['data'] is None:
del content['data']
return content
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import attr
from hypothesis import given
from swh.model.hashutil import DEFAULT_ALGORITHMS
from swh.model.hypothesis_strategies import objects, object_dicts
target_types = (
'content', 'directory', 'revision', 'release', 'snapshot', 'alias')
@given(objects())
def test_generation(obj_type_and_obj):
(obj_type, object_) = obj_type_and_obj
attr.validate(object_)
@given(object_dicts())
def test_dicts_generation(obj_type_and_obj):
(obj_type, object_) = obj_type_and_obj
assert isinstance(object_, dict)
if obj_type == 'content':
if object_['status'] == 'visible':
assert set(object_) == \
set(DEFAULT_ALGORITHMS) | {'length', 'status', 'data'}
else:
assert set(object_) == \
set(DEFAULT_ALGORITHMS) | {'length', 'status'}
elif obj_type == 'release':
assert object_['target_type'] in target_types
elif obj_type == 'snapshot':
for branch in object_['branches'].values():
assert branch['target_type'] in target_types
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment