Skip to content
Snippets Groups Projects
Commit f5132713 authored by David Douard's avatar David Douard
Browse files

hypothesis: split hypothesis strategies as a dict + entity instance

for each entity model `Model`, provide a `models_d` strategy that
produces dicts suitable for using as argument for the `Model.from_dict`
factory method, and reimplement the `models` generator using this
former hypothesis generator.

This is needed to help writing low level tests for model entities.
parent 10b06992
No related branches found
No related tags found
No related merge requests found
......@@ -3,22 +3,20 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import attr
import datetime
from hypothesis import assume
from hypothesis.strategies import (
binary, booleans, builds, characters, composite, dictionaries,
from_regex, integers, just, lists, none, one_of,
sampled_from, sets, text, tuples,
)
binary, booleans, builds, characters,
composite, datetimes, dictionaries, from_regex, integers, just, lists,
none, one_of, sampled_from, sets, text, )
from .from_disk import DentryPerms
from .model import (
Person, Timestamp, TimestampWithTimezone, Origin, OriginVisit,
OriginVisitUpdate, Snapshot, SnapshotBranch, TargetType, Release,
Revision, Directory, DirectoryEntry, Content, SkippedContent
)
Person, Timestamp, TimestampWithTimezone, Origin,
OriginVisit, OriginVisitUpdate, Snapshot, SnapshotBranch, ObjectType,
TargetType, Release, Revision, RevisionType, BaseContent, Directory,
DirectoryEntry, Content, SkippedContent, )
from .identifiers import snapshot_identifier, identifier_to_bytes
......@@ -51,178 +49,262 @@ def urls(draw):
return '%s://%s' % (protocol, domain)
def persons_d():
return builds(
dict,
fullname=binary(),
email=optional(binary()),
name=optional(binary()),
)
def persons():
return builds(Person, email=optional(binary()), name=optional(binary()))
return persons_d().map(Person.from_dict)
def timestamps():
def timestamps_d():
max_seconds = datetime.datetime.max.replace(
tzinfo=datetime.timezone.utc).timestamp()
min_seconds = datetime.datetime.min.replace(
tzinfo=datetime.timezone.utc).timestamp()
return builds(
Timestamp,
dict,
seconds=integers(min_seconds, max_seconds),
microseconds=integers(0, 1000000))
def timestamps():
return timestamps_d().map(Timestamp.from_dict)
@composite
def timestamps_with_timezone(
def timestamps_with_timezone_d(
draw,
timestamp=timestamps(),
timestamp=timestamps_d(),
offset=integers(min_value=-14*60, max_value=14*60),
negative_utc=booleans()):
timestamp = draw(timestamp)
offset = draw(offset)
negative_utc = draw(negative_utc)
assume(not (negative_utc and offset))
return TimestampWithTimezone(
return dict(
timestamp=timestamp,
offset=offset,
negative_utc=negative_utc)
def origins():
timestamps_with_timezone = timestamps_with_timezone_d().map(
TimestampWithTimezone.from_dict)
def origins_d():
return builds(
Origin,
dict,
url=urls())
def origin_visits():
def origins():
return origins_d().map(Origin.from_dict)
def origin_visits_d():
return builds(
OriginVisit,
dict,
visit=integers(0, 1000),
origin=urls(),
date=datetimes(),
status=sampled_from(['ongoing', 'full', 'partial']),
type=pgsql_text(),
snapshot=optional(sha1_git()),
)
def origin_visits():
return origin_visits_d().map(OriginVisit.from_dict)
def metadata_dicts():
return dictionaries(pgsql_text(), pgsql_text())
def origin_visit_updates():
def origin_visit_updates_d():
return builds(
OriginVisitUpdate,
dict,
visit=integers(0, 1000),
origin=urls(),
status=sampled_from(['ongoing', 'full', 'partial']),
date=datetimes(),
snapshot=optional(sha1_git()),
metadata=one_of(none(), metadata_dicts()))
def origin_visit_updates():
return origin_visit_updates_d().map(OriginVisitUpdate.from_dict)
@composite
def releases(draw):
(date, author) = draw(one_of(
tuples(none(), none()),
tuples(timestamps_with_timezone(), persons())))
rel = draw(builds(
Release,
author=none(),
date=none(),
target=sha1_git()))
return attr.evolve(
rel,
date=date,
author=author)
def releases_d(draw):
target_type = sampled_from([x.value for x in ObjectType])
name = binary()
message = binary()
synthetic = booleans()
target = sha1_git()
metadata = one_of(none(), revision_metadata())
return draw(one_of(
builds(
dict,
name=name,
message=message,
synthetic=synthetic,
author=none(),
date=none(),
target=target,
target_type=target_type,
metadata=metadata,
),
builds(
dict,
name=name,
message=message,
synthetic=synthetic,
date=timestamps_with_timezone_d(),
author=persons_d(),
target=target,
target_type=target_type,
metadata=metadata,
),
))
def releases():
return releases_d().map(Release.from_dict)
revision_metadata = metadata_dicts
def revisions():
def revisions_d():
return builds(
Revision,
author=persons(),
committer=persons(),
date=timestamps_with_timezone(),
committer_date=timestamps_with_timezone(),
dict,
message=binary(),
synthetic=booleans(),
author=persons_d(),
committer=persons_d(),
date=timestamps_with_timezone_d(),
committer_date=timestamps_with_timezone_d(),
parents=lists(sha1_git()),
directory=sha1_git(),
type=sampled_from([x.value for x in RevisionType]),
metadata=one_of(none(), revision_metadata()))
# TODO: metadata['extra_headers'] can have binary keys and values
def directory_entries():
def revisions():
return revisions_d().map(Revision.from_dict)
def directory_entries_d():
return builds(
DirectoryEntry,
dict,
name=binary(),
target=sha1_git(),
type=sampled_from(['file', 'dir', 'rev']),
perms=sampled_from([perm.value for perm in DentryPerms]))
def directories():
def directory_entries():
return directory_entries_d().map(DirectoryEntry)
def directories_d():
return builds(
Directory,
entries=lists(directory_entries()))
dict,
entries=lists(directory_entries_d()))
def directories():
return directories_d().map(Directory.from_dict)
def contents_d():
return one_of(present_contents_d(), skipped_contents_d())
def contents():
return one_of(present_contents(), skipped_contents())
def present_contents():
def present_contents_d():
return builds(
Content.from_data,
binary(max_size=4096),
dict,
data=binary(max_size=4096),
status=one_of(just('visible'), just('hidden')),
)
def present_contents():
return present_contents_d().map(lambda d: Content.from_data(**d))
@composite
def skipped_contents(draw):
def skipped_contents_d(draw):
result = BaseContent._hash_data(draw(binary(max_size=4096)))
result.pop('data')
nullify_attrs = draw(
sets(sampled_from(['sha1', 'sha1_git', 'sha256', 'blake2s256']))
)
for k in nullify_attrs:
result[k] = None
result['reason'] = draw(pgsql_text())
result['status'] = 'absent'
return result
new_attrs = {
k: None
for k in nullify_attrs
}
ret = draw(builds(
SkippedContent.from_data,
binary(max_size=4096),
reason=pgsql_text(),
))
return attr.evolve(ret, **new_attrs)
def skipped_contents():
return skipped_contents_d().map(SkippedContent.from_dict)
def branch_names():
return binary(min_size=1)
def branch_targets_object():
def branch_targets_object_d():
return builds(
SnapshotBranch,
dict,
target=sha1_git(),
target_type=sampled_from([
TargetType.CONTENT, TargetType.DIRECTORY, TargetType.REVISION,
TargetType.RELEASE, TargetType.SNAPSHOT]))
x.value for x in TargetType
if x.value not in ('alias', )]))
def branch_targets_alias():
def branch_targets_alias_d():
return builds(
SnapshotBranch,
target_type=just(TargetType.ALIAS))
dict,
target=sha1_git(),
target_type=just('alias')) # TargetType.ALIAS.value))
def branch_targets(*, only_objects=False):
def branch_targets_d(*, only_objects=False):
if only_objects:
return branch_targets_object()
return branch_targets_object_d()
else:
return one_of(branch_targets_alias(), branch_targets_object())
return one_of(branch_targets_alias_d(), branch_targets_object_d())
def branch_targets(*, only_objects=False):
return builds(
SnapshotBranch.from_dict,
branch_targets_d(only_objects=only_objects))
@composite
def snapshots(draw, *, min_size=0, max_size=100, only_objects=False):
def snapshots_d(draw, *, min_size=0, max_size=100, only_objects=False):
branches = draw(dictionaries(
keys=branch_names(),
values=one_of(
none(),
branch_targets(only_objects=only_objects)
branch_targets_d(only_objects=only_objects)
),
min_size=min_size,
max_size=max_size,
......@@ -231,33 +313,38 @@ def snapshots(draw, *, min_size=0, max_size=100, only_objects=False):
if not only_objects:
# Make sure aliases point to actual branches
unresolved_aliases = {
target.target
target['target']
for target in branches.values()
if (target
and target.target_type == 'alias'
and target.target not in branches)
and target['target_type'] == 'alias'
and target['target'] not in branches)
}
for alias in unresolved_aliases:
branches[alias] = draw(branch_targets(only_objects=True))
branches[alias] = draw(branch_targets_d(only_objects=True))
# Ensure no cycles between aliases
while True:
try:
id_ = snapshot_identifier({
'branches': {
name: branch.to_dict() if branch else None
name: branch or None
for (name, branch) in branches.items()}})
except ValueError as e:
for (source, target) in e.args[1]:
branches[source] = draw(branch_targets(only_objects=True))
branches[source] = draw(branch_targets_d(only_objects=True))
else:
break
return Snapshot(
return dict(
id=identifier_to_bytes(id_),
branches=branches)
def snapshots(*, min_size=0, max_size=100, only_objects=False):
return snapshots_d(min_size=0, max_size=100, only_objects=False).map(
Snapshot.from_dict)
def objects():
return one_of(
origins().map(lambda x: ('origin', x)),
......@@ -272,4 +359,16 @@ def objects():
def object_dicts():
return objects().map(lambda x: (x[0], x[1].to_dict()))
"""generates a random couple (type, dict)
which dict is suitable for <ModelForType>.from_dict() factory methods.
"""
return one_of(
origins_d().map(lambda x: ('origin', x)),
origin_visits_d().map(lambda x: ('origin_visit', x)),
snapshots_d().map(lambda x: ('snapshot', x)),
releases_d().map(lambda x: ('release', x)),
revisions_d().map(lambda x: ('revision', x)),
directories_d().map(lambda x: ('directory', x)),
contents_d().map(lambda x: ('content', x)),
)
......@@ -45,13 +45,13 @@ def test_dicts_generation(obj_type_and_obj):
assert_nested_dict(object_)
if obj_type == 'content':
if object_['status'] == 'visible':
assert set(object_) == \
assert set(object_) <= \
set(DEFAULT_ALGORITHMS) | {'length', 'status', 'data'}
elif object_['status'] == 'absent':
assert set(object_) == \
set(DEFAULT_ALGORITHMS) | {'length', 'status', 'reason'}
elif object_['status'] == 'hidden':
assert set(object_) == \
assert set(object_) <= \
set(DEFAULT_ALGORITHMS) | {'length', 'status', 'data'}
else:
assert False, object_
......@@ -60,3 +60,27 @@ def test_dicts_generation(obj_type_and_obj):
elif obj_type == 'snapshot':
for branch in object_['branches'].values():
assert branch is None or branch['target_type'] in target_types
@given(objects())
def test_model_to_dicts(obj_type_and_obj):
(obj_type, object_) = obj_type_and_obj
obj_dict = object_.to_dict()
assert_nested_dict(obj_dict)
if obj_type == 'content':
if obj_dict['status'] == 'visible':
assert set(obj_dict) == \
set(DEFAULT_ALGORITHMS) | {'length', 'status', 'data'}
elif obj_dict['status'] == 'absent':
assert set(obj_dict) == \
set(DEFAULT_ALGORITHMS) | {'length', 'status', 'reason'}
elif obj_dict['status'] == 'hidden':
assert set(obj_dict) == \
set(DEFAULT_ALGORITHMS) | {'length', 'status', 'data'}
else:
assert False, obj_dict
elif obj_type == 'release':
assert obj_dict['target_type'] in target_types
elif obj_type == 'snapshot':
for branch in obj_dict['branches'].values():
assert branch is None or branch['target_type'] in target_types
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment