Skip to content
Snippets Groups Projects
Commit 4916c38c authored by Jenkins for Software Heritage's avatar Jenkins for Software Heritage
Browse files

New upstream version 0.0.54

parents d944d522 8ebbd216
No related branches found
Tags debian/upstream/0.0.54
No related merge requests found
Metadata-Version: 2.1
Name: swh.model
Version: 0.0.53
Version: 0.0.54
Summary: Software Heritage data model
Home-page: https://forge.softwareheritage.org/diffusion/DMOD/
Author: Software Heritage developers
......
Metadata-Version: 2.1
Name: swh.model
Version: 0.0.53
Version: 0.0.54
Summary: Software Heritage data model
Home-page: https://forge.softwareheritage.org/diffusion/DMOD/
Author: Software Heritage developers
......
......@@ -16,7 +16,7 @@ from .from_disk import DentryPerms
from .model import (
Person, Timestamp, TimestampWithTimezone, Origin, OriginVisit,
Snapshot, SnapshotBranch, TargetType, Release, Revision,
Directory, DirectoryEntry, Content
Directory, DirectoryEntry, Content, SkippedContent
)
from .identifiers import snapshot_identifier, identifier_to_bytes
......@@ -128,14 +128,12 @@ def directories():
entries=lists(directory_entries()))
@composite
def contents(draw):
(status, data, reason) = draw(one_of(
tuples(just('visible'), binary(), none()),
tuples(just('absent'), none(), pgsql_text()),
tuples(just('hidden'), binary(), none()),
))
def contents():
return one_of(present_contents(), skipped_contents())
@composite
def present_contents(draw):
return draw(builds(
Content,
length=integers(min_value=0, max_value=2**63-1),
......@@ -143,9 +141,25 @@ def contents(draw):
sha1_git=sha1_git(),
sha256=binary(min_size=32, max_size=32),
blake2s256=binary(min_size=32, max_size=32),
status=just(status),
data=just(data),
reason=just(reason),
status=one_of(just('visible'), just('hidden')),
data=binary(),
))
@composite
def skipped_contents(draw):
def optional(strategy):
return one_of(none(), strategy)
return draw(builds(
SkippedContent,
length=optional(integers(min_value=0, max_value=2**63-1)),
sha1=optional(sha1()),
sha1_git=optional(sha1_git()),
sha256=optional(binary(min_size=32, max_size=32)),
blake2s256=optional(binary(min_size=32, max_size=32)),
status=just('absent'),
reason=pgsql_text(),
))
......
......@@ -361,18 +361,42 @@ class Directory(BaseModel, HashableObject):
@attr.s(frozen=True)
class Content(BaseModel):
class BaseContent(BaseModel):
def to_dict(self):
content = super().to_dict()
if content['ctime'] is None:
del content['ctime']
return content
@classmethod
def from_dict(cls, d, use_subclass=True):
if use_subclass:
# Chooses a subclass to instantiate instead.
if d['status'] == 'absent':
return SkippedContent.from_dict(d)
else:
return Content.from_dict(d)
else:
return super().from_dict(d)
def get_hash(self, hash_name):
if hash_name not in DEFAULT_ALGORITHMS:
raise ValueError('{} is not a valid hash name.'.format(hash_name))
return getattr(self, hash_name)
@attr.s(frozen=True)
class Content(BaseContent):
sha1 = attr.ib(type=bytes)
sha1_git = attr.ib(type=Sha1Git)
sha256 = attr.ib(type=bytes)
blake2s256 = attr.ib(type=bytes)
length = attr.ib(type=int)
status = attr.ib(
type=str,
validator=attr.validators.in_(['visible', 'absent', 'hidden']))
reason = attr.ib(type=Optional[str],
default=None)
validator=attr.validators.in_(['visible', 'hidden']))
data = attr.ib(type=Optional[bytes],
default=None)
......@@ -382,29 +406,64 @@ class Content(BaseModel):
@length.validator
def check_length(self, attribute, value):
"""Checks the length is positive."""
if self.status == 'absent' and value < -1:
raise ValueError('Length must be positive or -1.')
elif self.status != 'absent' and value < 0:
raise ValueError('Length must be positive, unless status=absent.')
if self.status != 'absent' and value < 0:
raise ValueError('Length must be positive.')
def to_dict(self):
content = super().to_dict()
if content['data'] is None:
del content['data']
return content
@classmethod
def from_dict(cls, d):
return super().from_dict(d, use_subclass=False)
@attr.s(frozen=True)
class SkippedContent(BaseContent):
sha1 = attr.ib(type=Optional[bytes])
sha1_git = attr.ib(type=Optional[Sha1Git])
sha256 = attr.ib(type=Optional[bytes])
blake2s256 = attr.ib(type=Optional[bytes])
length = attr.ib(type=Optional[int])
status = attr.ib(
type=str,
validator=attr.validators.in_(['absent']))
reason = attr.ib(type=Optional[str],
default=None)
origin = attr.ib(type=Optional[Origin],
default=None)
ctime = attr.ib(type=Optional[datetime.datetime],
default=None)
@reason.validator
def check_reason(self, attribute, value):
"""Checks the reason is full if status != absent."""
assert self.reason == value
if self.status == 'absent' and value is None:
if value is None:
raise ValueError('Must provide a reason if content is absent.')
elif self.status != 'absent' and value is not None:
raise ValueError(
'Must not provide a reason if content is not absent.')
@length.validator
def check_length(self, attribute, value):
"""Checks the length is positive or -1."""
if value is not None and value < -1:
raise ValueError('Length must be positive or -1.')
def to_dict(self):
content = super().to_dict()
for field in ('data', 'reason', 'ctime'):
if content[field] is None:
del content[field]
if content['origin'] is None:
del content['origin']
return content
def get_hash(self, hash_name):
if hash_name not in DEFAULT_ALGORITHMS:
raise ValueError('{} is not a valid hash name.'.format(hash_name))
return getattr(self, hash_name)
@classmethod
def from_dict(cls, d):
d2 = d
d = d.copy()
if d.pop('data', None) is not None:
raise ValueError('SkippedContent has no "data" attribute %r' % d2)
return super().from_dict(d, use_subclass=False)
......@@ -55,7 +55,7 @@ def gen_content():
**h.digest()}
if status == 'absent':
content['reason'] = 'why not'
content['data'] = b''
content['data'] = None
return content
......
......@@ -5,7 +5,7 @@
from .generate_testdata import gen_contents, gen_origins, ORIGINS
from swh.model.model import Origin, Content
from swh.model.model import Origin, BaseContent
def test_gen_origins_empty():
......@@ -43,12 +43,12 @@ def test_gen_contents_empty():
def test_gen_contents_one():
contents = gen_contents(1)
assert len(contents) == 1
assert [Content.from_dict(d) for d in contents]
assert [BaseContent.from_dict(d) for d in contents]
def test_gen_contents_default():
contents = gen_contents()
assert len(contents) == 20
models = {Content.from_dict(d) for d in contents}
models = {BaseContent.from_dict(d) for d in contents}
# ensure we did not generate the same content twice
assert len(contents) == len(models)
v0.0.53-0-g4b779e1
\ No newline at end of file
v0.0.54-0-g8ebbd21
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment