Skip to content
Snippets Groups Projects
Verified Commit 03d5a2cf authored by Antoine R. Dumont's avatar Antoine R. Dumont
Browse files

swh.storage.buffer: Add buffering proxy storage implementation

Related T1389
parent c83f1f9d
No related tags found
1 merge request!840swh.storage.buffer: Add buffering proxy storage implementation
......@@ -13,7 +13,7 @@ class HashCollision(Exception):
pass
STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory', 'filter'}
STORAGE_IMPLEMENTATION = {'local', 'remote', 'memory', 'filter', 'buffer'}
def get_storage(cls, args):
......@@ -22,7 +22,8 @@ def get_storage(cls, args):
Args:
storage (dict): dictionary with keys:
- cls (str): storage's class, either local, remote, memory, filter
- cls (str): storage's class, either local, remote, memory, filter,
buffer
- args (dict): dictionary with keys
Returns:
......@@ -44,5 +45,7 @@ def get_storage(cls, args):
from .in_memory import Storage
elif cls == 'filter':
from .filter import FilteringProxyStorage as Storage
elif cls == 'buffer':
from .buffer import BufferingProxyStorage as Storage
return Storage(**args)
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import deque
from functools import partial
from typing import Optional, Iterable, Dict
from swh.core.utils import grouper
from swh.storage import get_storage
class BufferingProxyStorage:
"""Storage implementation in charge of accumulating objects prior to
discussing with the "main" storage.
Sample configuration use case for buffering storage:
.. code-block:: yaml
storage:
cls: buffer
args:
storage:
cls: remote
args: http://storage.internal.staging.swh.network:5002/
min_batch_size:
content: 10000
content_bytes: 100000000
directory: 5000
revision: 1000
"""
def __init__(self, storage, min_batch_size=None):
self.storage = get_storage(**storage)
if min_batch_size is None:
min_batch_size = {}
self.min_batch_size = {
'content': min_batch_size.get('content', 10000),
'content_bytes': min_batch_size.get('content_bytes',
100*1024*1024),
'directory': min_batch_size.get('directory', 25000),
'revision': min_batch_size.get('revision', 100000),
}
self.object_types = ['content', 'directory', 'revision']
self._objects = {k: deque() for k in self.object_types}
def __getattr__(self, key):
if key.endswith('_add'):
object_type = key.split('_')[0]
if object_type in self.object_types:
return partial(
self.object_add, object_type=object_type
)
return getattr(self.storage, key)
def content_add(self, content: Iterable[Dict]) -> Dict:
"""Enqueue contents to write to the storage.
Following policies apply:
- First, check if the queue's threshold is hit. If it is flush content
to the storage.
- If not, check if the total size of enqueued contents's threshold is
hit. If it is flush content to the storage.
"""
s = self.object_add(content, object_type='content')
if not s:
q = self._objects['content']
total_size = sum(c['length'] for c in q)
if total_size >= self.min_batch_size['content_bytes']:
return self.flush(['content'])
return s
def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict:
if object_types is None:
object_types = self.object_types
summary = {} # type: Dict[str, Dict]
for object_type in object_types:
q = self._objects[object_type]
for objs in grouper(q, n=self.min_batch_size[object_type]):
add_fn = getattr(self.storage, '%s_add' % object_type)
s = add_fn(objs)
summary = {k: v + summary.get(k, 0)
for k, v in s.items()}
q.clear()
return summary
def object_add(self, objects: Iterable[Dict], *, object_type: str) -> Dict:
"""Enqueue objects to write to the storage. This checks if the queue's
threshold is hit. If it is actually write those to the storage.
"""
q = self._objects[object_type]
threshold = self.min_batch_size[object_type]
q.extend(objects)
if len(q) >= threshold:
return self.flush()
return {}
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.storage.buffer import BufferingProxyStorage
def test_buffering_proxy_storage_content_threshold_not_hit(sample_data):
contents = sample_data['content']
storage = BufferingProxyStorage(
storage={'cls': 'memory', 'args': {}},
min_batch_size={
'content': 10,
}
)
s = storage.content_add([contents[0], contents[1]])
assert s == {}
# contents have not been written to storage
missing_contents = storage.content_missing(
[contents[0], contents[1]])
assert set(missing_contents) == set(
[contents[0]['sha1'], contents[1]['sha1']])
s = storage.flush()
assert s == {
'content:add': 1 + 1,
'content:add:bytes': contents[0]['length'] + contents[1]['length'],
'skipped_content:add': 0
}
missing_contents = storage.content_missing(
[contents[0], contents[1]])
assert list(missing_contents) == []
def test_buffering_proxy_storage_content_threshold_nb_hit(sample_data):
contents = sample_data['content']
storage = BufferingProxyStorage(
storage={'cls': 'memory', 'args': {}},
min_batch_size={
'content': 1,
}
)
s = storage.content_add([contents[0]])
assert s == {
'content:add': 1,
'content:add:bytes': contents[0]['length'],
'skipped_content:add': 0
}
missing_contents = storage.content_missing([contents[0]])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_content_threshold_bytes_hit(sample_data):
contents = sample_data['content']
content_bytes_min_batch_size = 20
storage = BufferingProxyStorage(
storage={'cls': 'memory', 'args': {}},
min_batch_size={
'content': 10,
'content_bytes': content_bytes_min_batch_size,
}
)
assert contents[0]['length'] > content_bytes_min_batch_size
s = storage.content_add([contents[0]])
assert s == {
'content:add': 1,
'content:add:bytes': contents[0]['length'],
'skipped_content:add': 0
}
missing_contents = storage.content_missing([contents[0]])
assert list(missing_contents) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_directory_threshold_not_hit(sample_data):
directories = sample_data['directory']
storage = BufferingProxyStorage(
storage={'cls': 'memory', 'args': {}},
min_batch_size={
'directory': 10,
}
)
s = storage.directory_add([directories[0]])
assert s == {}
directory_id = directories[0]['id']
missing_directories = storage.directory_missing(
[directory_id])
assert list(missing_directories) == [directory_id]
s = storage.flush()
assert s == {
'directory:add': 1,
}
missing_directories = storage.directory_missing(
[directory_id])
assert list(missing_directories) == []
def test_buffering_proxy_storage_directory_threshold_hit(sample_data):
directories = sample_data['directory']
storage = BufferingProxyStorage(
storage={'cls': 'memory', 'args': {}},
min_batch_size={
'directory': 1,
}
)
s = storage.directory_add([directories[0]])
assert s == {
'directory:add': 1,
}
missing_directories = storage.directory_missing(
[directories[0]['id']])
assert list(missing_directories) == []
s = storage.flush()
assert s == {}
def test_buffering_proxy_storage_revision_threshold_not_hit(sample_data):
revisions = sample_data['revision']
storage = BufferingProxyStorage(
storage={'cls': 'memory', 'args': {}},
min_batch_size={
'revision': 10,
}
)
s = storage.revision_add([revisions[0]])
assert s == {}
revision_id = revisions[0]['id']
missing_revisions = storage.revision_missing(
[revision_id])
assert list(missing_revisions) == [revision_id]
s = storage.flush()
assert s == {
'revision:add': 1,
}
missing_revisions = storage.revision_missing(
[revision_id])
assert list(missing_revisions) == []
def test_buffering_proxy_storage_revision_threshold_hit(sample_data):
revisions = sample_data['revision']
storage = BufferingProxyStorage(
storage={'cls': 'memory', 'args': {}},
min_batch_size={
'revision': 1,
}
)
s = storage.revision_add([revisions[0]])
assert s == {
'revision:add': 1,
}
missing_revisions = storage.revision_missing(
[revisions[0]['id']])
assert list(missing_revisions) == []
s = storage.flush()
assert s == {}
......@@ -12,6 +12,7 @@ from swh.storage import get_storage
from swh.storage.api.client import RemoteStorage
from swh.storage.storage import Storage as DbStorage
from swh.storage.in_memory import Storage as MemoryStorage
from swh.storage.buffer import BufferingProxyStorage
from swh.storage.filter import FilteringProxyStorage
......@@ -31,7 +32,10 @@ def test_get_storage(mock_pool):
}),
('filter', FilteringProxyStorage, {'storage': {
'cls': 'memory', 'args': {}}
})
}),
('buffer', BufferingProxyStorage, {'storage': {
'cls': 'memory', 'args': {}}
}),
]:
actual_storage = get_storage(cls, args=dummy_args)
assert actual_storage is not None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment