Skip to content
Snippets Groups Projects
Verified Commit c83f1f9d authored by Antoine R. Dumont's avatar Antoine R. Dumont
Browse files

swh.storage.filter: Add filtering storage implementation

Also add a sample_data fixture to read default test data from.

Related T1389
parent 02b25f07
No related tags found
1 merge request!839swh.storage.filter: Add filtering storage implementation
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
from hypothesis import settings
from typing import Dict
# define tests profile. Full documentation is at:
# https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles
settings.register_profile("fast", max_examples=5, deadline=5000)
settings.register_profile("slow", max_examples=20, deadline=5000)
@pytest.fixture
def sample_data() -> Dict:
"""Pre-defined sample storage object data to manipulate
Returns:
Dict of data (keys: content, directory, revision, person)
"""
sample_content = {
'blake2s256': b'\xbf?\x05\xed\xc1U\xd2\xc5\x168Xm\x93\xde}f(HO@\xd0\xacn\x04\x1e\x9a\xb9\xfa\xbf\xcc\x08\xc7', # noqa
'sha1': b'g\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk',
'sha1_git': b'\xf2\xae\xfa\xba\xfa\xa6B\x9b^\xf9Z\xf5\x14\x0cna\xb0\xef\x8b', # noqa
'sha256': b"\x87\x022\xedZN\x84\xe8za\xf8'(oA\xc9k\xb1\x80c\x80\xe7J\x06\xea\xd2\xd5\xbeB\x19\xb8\xce", # noqa
'length': 48,
'data': b'temp file for testing content storage conversion',
'status': 'visible',
}
sample_content2 = {
'blake2s256': b'\xbf?\x05\xed\xc1U\xd2\xc5\x168Xm\x93\xde}f(HO@\xd0\xacn\x04\x1e\x9a\xb9\xfa\xbf\xcc\x08\xc7', # noqa
'sha1': b'f\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk',
'sha1_git': b'\xc2\xae\xfa\xba\xfa\xa6B\x9b^\xf9Z\xf5\x14\x0cna\xb0\xef\x8b', # noqa
'sha256': b"\x77\x022\xedZN\x84\xe8za\xf8'(oA\xc9k\xb1\x80c\x80\xe7J\x06\xea\xd2\xd5\xbeB\x19\xb8\xce", # noqa
'length': 50,
'data': b'temp file for testing content storage conversion 2',
'status': 'visible',
}
sample_directory = {
'id': b'f\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk',
'entries': []
}
sample_person = {
'name': b'John Doe',
'email': b'john.doe@institute.org',
'fullname': b'John Doe <john.doe@institute.org>'
}
sample_revision = {
'id': b'f\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk',
'message': b'something',
'author': sample_person,
'committer': sample_person,
'date': 1567591673,
'committer_date': 1567591673,
'type': 'tar',
'directory': b'\xc2\xae\xfa\xba\xfa\xa6B\x9b^\xf9Z\xf5\x14\x0cna\xb0\xef\x8b', # noqa
'synthetic': False,
'metadata': {},
'parents': [],
}
return {
'content': [sample_content, sample_content2],
'person': [sample_person],
'directory': [sample_directory],
'revision': [sample_revision],
}
......@@ -13,7 +13,7 @@ class HashCollision(Exception):
pass
STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory'}
STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory', 'filter'}
def get_storage(cls, args):
......@@ -22,7 +22,7 @@ def get_storage(cls, args):
Args:
storage (dict): dictionary with keys:
- cls (str): storage's class, either local, remote, memory
- cls (str): storage's class, either local, remote, memory, filter
- args (dict): dictionary with keys
Returns:
......@@ -42,5 +42,7 @@ def get_storage(cls, args):
from .storage import Storage
elif cls == 'memory':
from .in_memory import Storage
elif cls == 'filter':
from .filter import FilteringProxyStorage as Storage
return Storage(**args)
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Dict, Generator, Sequence, Set
from swh.storage import get_storage
class FilteringProxyStorage:
"""Filtering Storage implementation. This is in charge of transparently
filtering out known objects prior to adding them to storage.
Sample configuration use case for filtering storage:
.. code-block: yaml
storage:
cls: filter
args:
storage:
cls: remote
args: http://storage.internal.staging.swh.network:5002/
"""
def __init__(self, storage):
self.storage = get_storage(**storage)
self.objects_seen = {
'content': set(), # set of content hashes (sha256) seen
'directory': set(),
'revision': set(),
}
def __getattr__(self, key):
return getattr(self.storage, key)
def content_add(self, content: Sequence[Dict]) -> Dict:
contents = list(content)
contents_to_add = self._filter_missing_contents(contents)
return self.storage.content_add(
x for x in contents if x['sha256'] in contents_to_add
)
def directory_add(self, directories: Sequence[Dict]) -> Dict:
directories = list(directories)
missing_ids = self._filter_missing_ids(
'directory',
(d['id'] for d in directories)
)
return self.storage.directory_add(
d for d in directories if d['id'] in missing_ids
)
def revision_add(self, revisions):
revisions = list(revisions)
missing_ids = self._filter_missing_ids(
'revision',
(d['id'] for d in revisions)
)
return self.storage.revision_add(
r for r in revisions if r['id'] in missing_ids
)
def _filter_missing_contents(
self, content_hashes: Sequence[Dict]) -> Set[bytes]:
"""Return only the content keys missing from swh
Args:
content_hashes: List of sha256 to check for existence in swh
storage
"""
objects_seen = self.objects_seen['content']
missing_hashes = []
for hashes in content_hashes:
if hashes['sha256'] in objects_seen:
continue
objects_seen.add(hashes['sha256'])
missing_hashes.append(hashes)
return set(self.storage.content_missing(
missing_hashes,
key_hash='sha256',
))
def _filter_missing_ids(
self,
object_type: str,
ids: Generator[bytes, None, None]) -> Set[bytes]:
"""Filter missing ids from the storage for a given object type.
Args:
object_type: object type to use {revision, directory}
ids: Sequence of object_type ids
Returns:
Missing ids from the storage for object_type
"""
objects_seen = self.objects_seen[object_type]
missing_ids = []
for id in ids:
if id in objects_seen:
continue
objects_seen.add(id)
missing_ids.append(id)
fn_by_object_type = {
'revision': self.storage.revision_missing,
'directory': self.storage.directory_missing,
}
fn = fn_by_object_type[object_type]
return set(fn(missing_ids))
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.storage.filter import FilteringProxyStorage
def test_filtering_proxy_storage_content(sample_data):
sample_content = sample_data['content'][0]
storage = FilteringProxyStorage(storage={'cls': 'memory', 'args': {}})
content = next(storage.content_get([sample_content['sha1']]))
assert not content
s = storage.content_add([sample_content])
assert s == {
'content:add': 1,
'content:add:bytes': 48,
'skipped_content:add': 0
}
content = next(storage.content_get([sample_content['sha1']]))
assert content is not None
s = storage.content_add([sample_content])
assert s == {
'content:add': 0,
'content:add:bytes': 0,
'skipped_content:add': 0
}
def test_filtering_proxy_storage_revision(sample_data):
sample_revision = sample_data['revision'][0]
storage = FilteringProxyStorage(storage={'cls': 'memory', 'args': {}})
revision = next(storage.revision_get([sample_revision['id']]))
assert not revision
s = storage.revision_add([sample_revision])
assert s == {
'revision:add': 1,
}
revision = next(storage.revision_get([sample_revision['id']]))
assert revision is not None
s = storage.revision_add([sample_revision])
assert s == {
'revision:add': 0,
}
def test_filtering_proxy_storage_directory(sample_data):
sample_directory = sample_data['directory'][0]
storage = FilteringProxyStorage(storage={'cls': 'memory', 'args': {}})
directory = next(storage.directory_missing([sample_directory['id']]))
assert directory
s = storage.directory_add([sample_directory])
assert s == {
'directory:add': 1,
}
directory = list(storage.directory_missing([sample_directory['id']]))
assert not directory
s = storage.directory_add([sample_directory])
assert s == {
'directory:add': 0,
}
......@@ -12,6 +12,7 @@ from swh.storage import get_storage
from swh.storage.api.client import RemoteStorage
from swh.storage.storage import Storage as DbStorage
from swh.storage.in_memory import Storage as MemoryStorage
from swh.storage.filter import FilteringProxyStorage
@patch('swh.storage.storage.psycopg2.pool')
......@@ -28,6 +29,9 @@ def test_get_storage(mock_pool):
'cls': 'memory', 'args': {},
},
}),
('filter', FilteringProxyStorage, {'storage': {
'cls': 'memory', 'args': {}}
})
]:
actual_storage = get_storage(cls, args=dummy_args)
assert actual_storage is not None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment