Skip to content
Snippets Groups Projects
Verified Commit 5b0fbd27 authored by Antoine R. Dumont's avatar Antoine R. Dumont
Browse files

storage: Add Buffering proxy storage implementation

parent 2a94112c
No related branches found
No related tags found
No related merge requests found
......@@ -296,6 +296,8 @@ class PackageLoader:
logger.debug('snapshot: %s', snapshot)
self.storage.snapshot_add([snapshot])
if hasattr(self.storage, 'flush'):
self.storage.flush()
except Exception as e:
logger.warning('Fail to load %s. Reason: %s' % (self.url, e))
status_visit = 'partial'
......
......@@ -3,14 +3,66 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Optional, Sequence, Dict, Set
from functools import partial
from collections import deque
from swh.core.utils import grouper
from swh.storage import get_storage
from typing import Sequence, Dict, Set
class BufferingProxyStorage:
"""Storage implementation in charge of accumulating objects prior to
discussing with the "main" storage.
"""
def __init__(self, **storage):
self.storage = get_storage(**storage)
self._max_size = {
'content': self.config.get('content_packet_size', 10000),
'directory': self.config.get('directory_packet_size', 25000),
'revision': self.config.get('directory_packet_size', 100000),
}
self.object_types = ['content', 'directory', 'revision']
self._objects = {k: deque() for k in self.object_types}
def flush(self, object_types: Optional[Sequence[str]] = None) -> Dict:
if object_types is None:
object_types = self.object_types
summary = {}
for object_type in object_types:
q = self._objects[object_type]
for objs in grouper(q, n=self._max_size[object_type]):
add_fn = getattr(self.storage, '%s_add' % object_type)
s = add_fn(objs)
summary = {k: v + summary.get(k, 0)
for k, v in s.items()}
return summary
def object_add(self, objects: Sequence[Dict], *, object_type: str) -> Dict:
q = self._objects[object_type]
max_size = self._max_size[object_type]
q.extend(objects)
if len(q) > max_size:
return self.flush()
return {}
def __getattr__(self, key):
if key.endswith('_add'):
object_type = key.split('_')[0]
if object_type in self.object_types:
return partial(
self.object_add, object_type=object_type
)
return getattr(self.storage, key)
class ProxyStorage:
class FilteringProxyStorage:
"""Storage implementation in charge of filtering existing objects prior to
calling the storage api for ingestion
calling the storage api for ingestion.
"""
def __init__(self, **storage):
......
......@@ -22,9 +22,12 @@ logger = logging.getLogger(__name__)
def get_storage(cls, args):
if cls == 'proxy':
from swh.loader.package.storage import ProxyStorage
return ProxyStorage(**args)
if cls == 'filter':
from swh.loader.package.storage import FilteringProxyStorage
return FilteringProxyStorage(**args)
if cls == 'buffer':
from swh.loader.package.storage import BufferingProxyStorage
return BufferingProxyStorage(**args)
return initial_get_storage(cls, args)
......
storage:
cls: proxy
cls: filter
args:
cls: memory
args: {}
cls: buffer
args:
cls: memory
args: {}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment