From b3c109e43dcb50a9f74badd2ad817b8c8874cb23 Mon Sep 17 00:00:00 2001 From: "Antoine R. Dumont (@ardumont)" <antoine.romain.dumont@gmail.com> Date: Fri, 16 Sep 2016 22:10:46 +0200 Subject: [PATCH] Adapt archiver director to read sha1 from stdin Also, adds a force_copy flag in the configuration file to avoid checking preexistence of sha1. This is to be efficient for the first time copy in a new backend. --- swh/archiver/director.py | 76 +++++++++++++++++++++------------------- 1 file changed, 40 insertions(+), 36 deletions(-) diff --git a/swh/archiver/director.py b/swh/archiver/director.py index 2a27334..f7977a0 100644 --- a/swh/archiver/director.py +++ b/swh/archiver/director.py @@ -5,12 +5,12 @@ import abc import click +import sys -from swh.core import config, utils +from swh.core import config, utils, hashutil from swh.scheduler.celery_backend.config import app from . import tasks # noqa -from ..storage import Storage from .storage import ArchiverStorage @@ -148,7 +148,15 @@ class ArchiverWithRetentionPolicyDirector(ArchiverDirectorBase): yield content_id -class ArchiverToBackendDirector(ArchiverDirectorBase): +def read_sha1_from_stdin(): + """Read sha1 from stdin. + + """ + for sha1 in sys.stdin: + yield {'content_id': hashutil.hex_to_hash(sha1.rstrip())} + + +class ArchiverStdinToBackendDirector(ArchiverDirectorBase): """A cloud archiver director in charge of reading contents and send them in batch in the cloud. @@ -158,13 +166,10 @@ class ArchiverToBackendDirector(ArchiverDirectorBase): """ ADDITIONAL_CONFIG = { - 'storage': ('dict', { - 'dbconn': 'dbname=softwareheritage-dev user=guest', - 'objroot': '/srv/softwareheritage/storage', - }), 'destination': ('dict', { 'host': 'azure', - }) + }), + 'force_copy': ('bool', False), } CONFIG_BASE_FILENAME = 'archiver/worker-to-backend' @@ -173,51 +178,50 @@ class ArchiverToBackendDirector(ArchiverDirectorBase): def __init__(self): super().__init__() - storage = self.config['storage'] - self.storage = Storage(storage['dbconn'], storage['objroot']) self.destination_host = self.config['destination']['host'] - - def read_cache_content_from_storage_by_batch(self, batch_max_size): - for contents in utils.grouper(self.storage.cache_content_get(), - batch_max_size): - yield contents + self.force_copy = self.config['force_copy'] def get_contents_to_archive(self): - """Create batch of contents that needs to be archived + gen_content_ids = ( + ids for ids in utils.grouper(read_sha1_from_stdin(), + self.config['batch_max_size']) + ) - Yields: - sha1 of content to archive + if self.force_copy: + for content_ids in gen_content_ids: + content_ids = list(content_ids) - """ - for contents in self.read_cache_content_from_storage_by_batch( - self.config['batch_max_size']): - content_ids = [{'content_id': c['sha1']} for c in contents] + if not content_ids: + continue + + print('Send %s contents to archive' % len(content_ids)) - if not content_ids: - continue + for content in content_ids: + yield content['content_id'] - # Filter the known - content_ids = list( - self.archiver_storage.content_archive_get_missing( - content_ids=content_ids, - backend_name=self.destination_host)) + else: + for content_ids in gen_content_ids: + content_ids = list( + self.archiver_storage.content_archive_get_missing( + content_ids=content_ids, + backend_name=self.destination_host)) - if not content_ids: - continue + if not content_ids: + continue - print('Sending %s new contents for archive' % len(content_ids)) + print('Send %s contents to archive' % len(content_ids)) - for content in content_ids: - yield content + for content in content_ids: + yield content @click.command() @click.option('--direct', is_flag=True, - help="""With this flag, the archiver sends content for backup to + help="""The archiver sends content for backup to one storage.""") def launch(direct): if direct: - archiver = ArchiverToBackendDirector() + archiver = ArchiverStdinToBackendDirector() else: archiver = ArchiverWithRetentionPolicyDirector() -- GitLab