Skip to content
Snippets Groups Projects
hypothesis_strategies.py 12.3 KiB
Newer Older
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from hypothesis import assume
from hypothesis.extra.dateutil import timezones
from hypothesis.strategies import (
David Douard's avatar
David Douard committed
    binary,
    booleans,
    builds,
    characters,
    composite,
    datetimes,
    dictionaries,
    from_regex,
    integers,
    just,
David Douard's avatar
David Douard committed
    none,
    one_of,
    sampled_from,
    sets,
    text,
David Douard's avatar
David Douard committed
)
from .identifiers import identifier_to_bytes, snapshot_identifier
    BaseContent,
    Content,
    Directory,
    DirectoryEntry,
    ObjectType,
David Douard's avatar
David Douard committed
    Origin,
    OriginVisit,
David Douard's avatar
David Douard committed
    Release,
    Revision,
    RevisionType,
    SkippedContent,
    Snapshot,
    SnapshotBranch,
    TargetType,
    Timestamp,
    TimestampWithTimezone,
David Douard's avatar
David Douard committed
)
David Douard's avatar
David Douard committed
    blacklist_categories=("Cs",), blacklist_characters=["\u0000"]
)  # postgresql does not like these
def optional(strategy):
    return one_of(none(), strategy)


def pgsql_text():
    return text(alphabet=pgsql_alphabet)


def sha1_git():
    return binary(min_size=20, max_size=20)


def sha1():
    return binary(min_size=20, max_size=20)


    # datetimes in Software Heritage are not used for software artifacts
    # (which may be much older than 2000), but only for objects like scheduler
    # task runs, and origin visits, which were created by Software Heritage,
    # so at least in 2015.
    # We're forbidding old datetimes, because until 1956, many timezones had seconds
    # in their "UTC offsets" (see
    # <https://en.wikipedia.org/wiki/Time_zone#Worldwide_time_zones>), which is not
    # encodable in ISO8601; and we need our datetimes to be ISO8601-encodable in the
    # RPC protocol
    min_value = datetime.datetime(2000, 1, 1, 0, 0, 0)
    return datetimes(min_value=min_value, timezones=timezones())
David Douard's avatar
David Douard committed
    protocol = draw(sampled_from(["git", "http", "https", "deb"]))
    domain = draw(from_regex(r"\A([a-z]([a-z0-9-]*)\.){1,3}[a-z0-9]+\Z"))
David Douard's avatar
David Douard committed
    return "%s://%s" % (protocol, domain)
@composite
def persons_d(draw):
    fullname = draw(binary())
    email = draw(optional(binary()))
    name = draw(optional(binary()))
    assume(not (len(fullname) == 32 and email is None and name is None))
    return dict(fullname=fullname, name=name, email=email)
    return persons_d().map(Person.from_dict)
    max_seconds = datetime.datetime.max.replace(
David Douard's avatar
David Douard committed
        tzinfo=datetime.timezone.utc
    ).timestamp()
    min_seconds = datetime.datetime.min.replace(
David Douard's avatar
David Douard committed
        tzinfo=datetime.timezone.utc
    ).timestamp()
        seconds=integers(min_seconds, max_seconds),
David Douard's avatar
David Douard committed
        microseconds=integers(0, 1000000),
    )
def timestamps():
    return timestamps_d().map(Timestamp.from_dict)


David Douard's avatar
David Douard committed
    draw,
    timestamp=timestamps_d(),
    offset=integers(min_value=-14 * 60, max_value=14 * 60),
    negative_utc=booleans(),
):
    timestamp = draw(timestamp)
    offset = draw(offset)
    negative_utc = draw(negative_utc)
    assume(not (negative_utc and offset))
David Douard's avatar
David Douard committed
    return dict(timestamp=timestamp, offset=offset, negative_utc=negative_utc)
timestamps_with_timezone = timestamps_with_timezone_d().map(
David Douard's avatar
David Douard committed
    TimestampWithTimezone.from_dict
)
David Douard's avatar
David Douard committed
    return builds(dict, url=urls())
def origins():
    return origins_d().map(Origin.from_dict)


def origin_visits_d():
        visit=integers(1, 1000),
        type=pgsql_text(),
def origin_visits():
    return origin_visits_d().map(OriginVisit.from_dict)


def metadata_dicts():
    return dictionaries(pgsql_text(), pgsql_text())


def origin_visit_statuses_d():
        visit=integers(1, 1000),
        type=optional(sampled_from(["git", "svn", "pypi", "debian"])),
        status=sampled_from(
            ["created", "ongoing", "full", "partial", "not_found", "failed"]
        ),
        metadata=optional(metadata_dicts()),
David Douard's avatar
David Douard committed
    )
def origin_visit_statuses():
    return origin_visit_statuses_d().map(OriginVisitStatus.from_dict)
def releases_d(draw):
    target_type = sampled_from([x.value for x in ObjectType])
    name = binary()
    message = optional(binary())
    synthetic = booleans()
    target = sha1_git()
    metadata = optional(revision_metadata())
David Douard's avatar
David Douard committed
    return draw(
        one_of(
            builds(
                dict,
                name=name,
                message=message,
                synthetic=synthetic,
                author=none(),
                date=none(),
                target=target,
                target_type=target_type,
                metadata=metadata,
            ),
            builds(
                dict,
                name=name,
                message=message,
                synthetic=synthetic,
                date=timestamps_with_timezone_d(),
                author=persons_d(),
                target=target,
                target_type=target_type,
                metadata=metadata,
            ),
        )
    )


def releases():
    return releases_d().map(Release.from_dict)
revision_metadata = metadata_dicts
def extra_headers():
    return lists(
        tuples(binary(min_size=0, max_size=50), binary(min_size=0, max_size=500))
    ).map(tuple)


        message=optional(binary()),
        synthetic=booleans(),
        author=persons_d(),
        committer=persons_d(),
        date=timestamps_with_timezone_d(),
        committer_date=timestamps_with_timezone_d(),
        parents=tuples(sha1_git()),
        type=sampled_from([x.value for x in RevisionType]),
        metadata=optional(revision_metadata()),
David Douard's avatar
David Douard committed
    )
    # TODO: metadata['extra_headers'] can have binary keys and values
def revisions():
    return revisions_d().map(Revision.from_dict)


def directory_entries_d():
David Douard's avatar
David Douard committed
        type=sampled_from(["file", "dir", "rev"]),
        perms=sampled_from([perm.value for perm in DentryPerms]),
    )
def directory_entries():
    return directory_entries_d().map(DirectoryEntry)


def directories_d():
    return builds(dict, entries=tuples(directory_entries_d()))


def directories():
    return directories_d().map(Directory.from_dict)


def contents_d():
    return one_of(present_contents_d(), skipped_contents_d())
def contents():
    return one_of(present_contents(), skipped_contents())

        ctime=optional(aware_datetimes()),
David Douard's avatar
David Douard committed
        status=one_of(just("visible"), just("hidden")),
def present_contents():
    return present_contents_d().map(lambda d: Content.from_data(**d))


def skipped_contents_d(draw):
    result = BaseContent._hash_data(draw(binary(max_size=4096)))
David Douard's avatar
David Douard committed
    result.pop("data")
David Douard's avatar
David Douard committed
        sets(sampled_from(["sha1", "sha1_git", "sha256", "blake2s256"]))
    for k in nullify_attrs:
        result[k] = None
David Douard's avatar
David Douard committed
    result["reason"] = draw(pgsql_text())
    result["status"] = "absent"
    result["ctime"] = draw(optional(aware_datetimes()))
def skipped_contents():
    return skipped_contents_d().map(SkippedContent.from_dict)
    return binary(min_size=1)
David Douard's avatar
David Douard committed
        target_type=sampled_from(
            [x.value for x in TargetType if x.value not in ("alias",)]
        ),
    )
David Douard's avatar
David Douard committed
        dict, target=sha1_git(), target_type=just("alias")
    )  # TargetType.ALIAS.value))
def branch_targets_d(*, only_objects=False):
        return branch_targets_object_d()
        return one_of(branch_targets_alias_d(), branch_targets_object_d())


def branch_targets(*, only_objects=False):
David Douard's avatar
David Douard committed
    return builds(SnapshotBranch.from_dict, branch_targets_d(only_objects=only_objects))
def snapshots_d(draw, *, min_size=0, max_size=100, only_objects=False):
David Douard's avatar
David Douard committed
    branches = draw(
        dictionaries(
            keys=branch_names(),
            values=optional(branch_targets_d(only_objects=only_objects)),
David Douard's avatar
David Douard committed
            min_size=min_size,
            max_size=max_size,
        )
    )

    if not only_objects:
        # Make sure aliases point to actual branches
        unresolved_aliases = {
David Douard's avatar
David Douard committed
            branch: target["target"]
            for branch, target in branches.items()
David Douard's avatar
David Douard committed
            if (
                target
                and target["target_type"] == "alias"
                and target["target"] not in branches
            )
        }
        for alias_name, alias_target in unresolved_aliases.items():
            # Override alias branch with one pointing to a real object
            # if max_size constraint is reached
            alias = alias_target if len(branches) < max_size else alias_name
            branches[alias] = draw(branch_targets_d(only_objects=True))
    # Ensure no cycles between aliases
David Douard's avatar
David Douard committed
            id_ = snapshot_identifier(
                {
                    "branches": {
                        name: branch or None for (name, branch) in branches.items()
                    }
                }
            )
        except ValueError as e:
            for (source, target) in e.args[1]:
                branches[source] = draw(branch_targets_d(only_objects=True))
David Douard's avatar
David Douard committed
    return dict(id=identifier_to_bytes(id_), branches=branches)
def snapshots(*, min_size=0, max_size=100, only_objects=False):
David Douard's avatar
David Douard committed
    return snapshots_d(
        min_size=min_size, max_size=max_size, only_objects=only_objects
    ).map(Snapshot.from_dict)
def objects(blacklist_types=("origin_visit_status",), split_content=False):
    which obj is an instance of the Model class corresponding to obj_type.

    `blacklist_types` is a list of obj_type to exclude from the strategy.

    If `split_content` is True, generates Content and SkippedContent under different
    obj_type, resp. "content" and "skipped_content".
    strategies = [
        ("origin", origins),
        ("origin_visit", origin_visits),
        ("origin_visit_status", origin_visit_statuses),
        ("snapshot", snapshots),
        ("release", releases),
        ("revision", revisions),
        ("directory", directories),
    ]
    if split_content:
        strategies.append(("content", present_contents))
        strategies.append(("skipped_content", skipped_contents))
    else:
        strategies.append(("content", contents))
    args = [
        obj_gen().map(lambda x, obj_type=obj_type: (obj_type, x))
        for (obj_type, obj_gen) in strategies
        if obj_type not in blacklist_types
    ]
    return one_of(*args)
def object_dicts(blacklist_types=("origin_visit_status",), split_content=False):
    """generates a random couple (type, dict)

    which dict is suitable for <ModelForType>.from_dict() factory methods.

    `blacklist_types` is a list of obj_type to exclude from the strategy.

    If `split_content` is True, generates Content and SkippedContent under different
    obj_type, resp. "content" and "skipped_content".

    strategies = [
        ("origin", origins_d),
        ("origin_visit", origin_visits_d),
        ("origin_visit_status", origin_visit_statuses_d),
        ("snapshot", snapshots_d),
        ("release", releases_d),
        ("revision", revisions_d),
        ("directory", directories_d),
    ]
    if split_content:
        strategies.append(("content", present_contents_d))
        strategies.append(("skipped_content", skipped_contents_d))
    else:
        strategies.append(("content", contents_d))
    args = [
        obj_gen().map(lambda x, obj_type=obj_type: (obj_type, x))
        for (obj_type, obj_gen) in strategies
        if obj_type not in blacklist_types
    ]
    return one_of(*args)