Skip to content
Snippets Groups Projects
Commit b3c109e4 authored by Antoine R. Dumont's avatar Antoine R. Dumont
Browse files

Adapt archiver director to read sha1 from stdin

Also, adds a force_copy flag in the configuration file to avoid checking
preexistence of sha1.

This is to be efficient for the first time copy in a new backend.
parent 438de69c
No related branches found
No related tags found
No related merge requests found
......@@ -5,12 +5,12 @@
import abc
import click
import sys
from swh.core import config, utils
from swh.core import config, utils, hashutil
from swh.scheduler.celery_backend.config import app
from . import tasks # noqa
from ..storage import Storage
from .storage import ArchiverStorage
......@@ -148,7 +148,15 @@ class ArchiverWithRetentionPolicyDirector(ArchiverDirectorBase):
yield content_id
class ArchiverToBackendDirector(ArchiverDirectorBase):
def read_sha1_from_stdin():
"""Read sha1 from stdin.
"""
for sha1 in sys.stdin:
yield {'content_id': hashutil.hex_to_hash(sha1.rstrip())}
class ArchiverStdinToBackendDirector(ArchiverDirectorBase):
"""A cloud archiver director in charge of reading contents and send
them in batch in the cloud.
......@@ -158,13 +166,10 @@ class ArchiverToBackendDirector(ArchiverDirectorBase):
"""
ADDITIONAL_CONFIG = {
'storage': ('dict', {
'dbconn': 'dbname=softwareheritage-dev user=guest',
'objroot': '/srv/softwareheritage/storage',
}),
'destination': ('dict', {
'host': 'azure',
})
}),
'force_copy': ('bool', False),
}
CONFIG_BASE_FILENAME = 'archiver/worker-to-backend'
......@@ -173,51 +178,50 @@ class ArchiverToBackendDirector(ArchiverDirectorBase):
def __init__(self):
super().__init__()
storage = self.config['storage']
self.storage = Storage(storage['dbconn'], storage['objroot'])
self.destination_host = self.config['destination']['host']
def read_cache_content_from_storage_by_batch(self, batch_max_size):
for contents in utils.grouper(self.storage.cache_content_get(),
batch_max_size):
yield contents
self.force_copy = self.config['force_copy']
def get_contents_to_archive(self):
"""Create batch of contents that needs to be archived
gen_content_ids = (
ids for ids in utils.grouper(read_sha1_from_stdin(),
self.config['batch_max_size'])
)
Yields:
sha1 of content to archive
if self.force_copy:
for content_ids in gen_content_ids:
content_ids = list(content_ids)
"""
for contents in self.read_cache_content_from_storage_by_batch(
self.config['batch_max_size']):
content_ids = [{'content_id': c['sha1']} for c in contents]
if not content_ids:
continue
print('Send %s contents to archive' % len(content_ids))
if not content_ids:
continue
for content in content_ids:
yield content['content_id']
# Filter the known
content_ids = list(
self.archiver_storage.content_archive_get_missing(
content_ids=content_ids,
backend_name=self.destination_host))
else:
for content_ids in gen_content_ids:
content_ids = list(
self.archiver_storage.content_archive_get_missing(
content_ids=content_ids,
backend_name=self.destination_host))
if not content_ids:
continue
if not content_ids:
continue
print('Sending %s new contents for archive' % len(content_ids))
print('Send %s contents to archive' % len(content_ids))
for content in content_ids:
yield content
for content in content_ids:
yield content
@click.command()
@click.option('--direct', is_flag=True,
help="""With this flag, the archiver sends content for backup to
help="""The archiver sends content for backup to
one storage.""")
def launch(direct):
if direct:
archiver = ArchiverToBackendDirector()
archiver = ArchiverStdinToBackendDirector()
else:
archiver = ArchiverWithRetentionPolicyDirector()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment