Skip to content
Snippets Groups Projects
Commit c2483d7a authored by Quentin Campos's avatar Quentin Campos Committed by Antoine R. Dumont
Browse files

archiver: change the master's storage to an object storage

Summary:
Related T499

test.archiver: update the tests to follow changes in the archiver

Reviewers: #reviewers, qcampos

Reviewed By: #reviewers

Maniphest Tasks: T499

Differential Revision: https://forge.softwareheritage.org/D78

Closes D78
parent 903a75e9
No related branches found
No related tags found
No related merge requests found
......@@ -16,10 +16,10 @@ class ArchiverCopier():
has to archive.
server (RemoteArchive): The remote object storage that is used to
backup content.
master_storage (Storage): The master storage that contains the data
the copier needs to archive.
master_objstorage (ObjStorage): The master storage that contains the
data the copier needs to archive.
"""
def __init__(self, destination, content, master_storage):
def __init__(self, destination, content, master_objstorage):
""" Create a Copier for the archiver
Args:
......@@ -33,7 +33,7 @@ class ArchiverCopier():
_name, self.url = destination
self.content_ids = content
self.server = RemoteObjStorage(self.url)
self.master_storage = master_storage
self.master_objstorage = master_objstorage
def run(self):
""" Do the copy on the backup storage.
......@@ -47,14 +47,12 @@ class ArchiverCopier():
Returns:
A boolean that indicates if the whole content have been copied.
"""
self.content_ids = list(map(lambda x: hashutil.hex_to_hash(x[2:]),
self.content_ids))
contents = self.master_storage.content_get(self.content_ids)
self.content_ids = map(lambda x: hashutil.hex_to_hash(x[2:]),
self.content_ids)
try:
for content in contents:
content_data = content['data']
self.server.content_add(content_data)
for content_id in self.content_ids:
content = self.master_objstorage.get(content_id)
self.server.content_add(content, content_id)
except:
return False
return True
# Copyright (C) 2015 The Software Heritage developers
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import swh
import logging
import click
from datetime import datetime
from swh.core import hashutil, config
from swh.objstorage import PathSlicingObjStorage
from swh.objstorage.api.client import RemoteObjStorage
from swh.scheduler.celery_backend.config import app
from . import tasks # NOQA
......@@ -17,14 +18,17 @@ from .storage import ArchiverStorage
DEFAULT_CONFIG = {
'objstorage_type': ('str', 'local_storage'),
'objstorage_path': ('str', '/tmp/swh-storage/objects'),
'objstorage_slicing': ('str', '0:2/2:4/4:6'),
'objstorage_url': ('str', 'http://localhost:5003/'),
'batch_max_size': ('int', 50),
'archival_max_age': ('int', 3600),
'retention_policy': ('int', 2),
'asynchronous': ('bool', True),
'dbconn': ('str', 'dbname=softwareheritage-archiver-dev user=guest'),
'dbconn_storage': ('str', 'dbname=softwareheritage-dev user=guest')
'dbconn': ('str', 'dbname=softwareheritage-archiver-dev user=guest')
}
task_name = 'swh.storage.archiver.tasks.SWHArchiverTask'
......@@ -39,34 +43,54 @@ class ArchiverDirector():
to know which one needs archival and it delegates this task to
archiver workers.
Attributes:
master_storage: the local storage of the master server.
slave_storages: Iterable of remote obj storages to the slaves servers
used for backup.
master_objstorage: the local storage of the master server.
master_objstorage_args (dict): arguments of the master objstorage
initialization.
archiver_storage: a wrapper for archiver db operations.
db_conn_archiver: Either a libpq connection string,
or a psycopg2 connection for the archiver db.
slave_objstorages: Iterable of remote obj storages to the slaves
servers used for backup.
config: Archiver_configuration. A dictionary that must contain
the following keys.
objstorage_path (string): master's objstorage path
batch_max_size (int): The number of content items that can be
given to the same archiver worker.
archival_max_age (int): Delay given to the worker to copy all
the files in a given batch.
retention_policy (int): Required number of copies for the
content to be considered safe.
asynchronous (boolean): Indicate whenever the archival should
run in asynchronous mode or not.
the following keys:
objstorage_type (str): type of objstorage used (local_storage
or remote_storage).
If the storage is local, the arguments keys must be present
objstorage_path (str): master's objstorage path
objstorage_slicing (str): masters's objstorage slicing
Otherwise, if it's a remote objstorage, the keys must be:
objstorage_url (str): url of the remote objstorage
batch_max_size (int): The number of content items that can be
given to the same archiver worker.
archival_max_age (int): Delay given to the worker to copy all
the files in a given batch.
retention_policy (int): Required number of copies for the
content to be considered safe.
asynchronous (boolean): Indicate whenever the archival should
run in asynchronous mode or not.
"""
def __init__(self, db_conn_archiver, db_conn_storage, config):
def __init__(self, db_conn_archiver, config):
""" Constructor of the archiver director.
Args:
db_conn_archiver: Either a libpq connection string,
or a psycopg2 connection for the archiver db connection.
db_conn_storage: Either a libpq connection string,
or a psycopg2 connection for the db storage connection.
or a psycopg2 connection for the archiver db.
config: Archiver_configuration. A dictionary that must contain
the following keys.
objstorage_path (string): master's objstorage path
the following keys:
objstorage_type (str): type of objstorage used
(local_objstorage or remote_objstorage).
If the storage is local, the arguments keys must be present
objstorage_path (str): master's objstorage path
objstorage_slicing (str): masters's objstorage slicing
Otherwise, if it's a remote objstorage, the keys must be:
objstorage_url (str): url of the remote objstorage
batch_max_size (int): The number of content items that can be
given to the same archiver worker.
archival_max_age (int): Delay given to the worker to copy all
......@@ -76,25 +100,41 @@ class ArchiverDirector():
asynchronous (boolean): Indicate whenever the archival should
run in asynchronous mode or not.
"""
# Get the local storage of the master and remote ones for the slaves.
# Get the slave storages
self.db_conn_archiver = db_conn_archiver
self.archiver_storage = ArchiverStorage(db_conn_archiver)
self.master_storage_args = [db_conn_storage, config['objstorage_path']]
master_storage = swh.storage.get_storage('local_storage',
self.master_storage_args)
slaves = {
self.slave_objstorages = {
id: url
for id, url
in self.archiver_storage.archive_ls()
}
# Check that there is enough backup servers for the retention policy
if config['retention_policy'] > len(self.slave_objstorages) + 1:
raise ValueError(
"Can't have a retention policy of %d with %d backup servers"
% (config['retention_policy'], len(self.slave_objstorages))
)
# TODO Database should be initialized somehow before going in
# production. For now, assumes that the database contains
# data for all the current content.
# Get the master storage that contains content to be archived
if config['objstorage_type'] == 'local_objstorage':
master_objstorage_args = {
'root': config['objstorage_path'],
'slicing': config['objstorage_slicing']
}
master_objstorage = PathSlicingObjStorage(
**master_objstorage_args
)
elif config['objstorage_type'] == 'remote_objstorage':
master_objstorage_args = {'base_url': config['objstorage_url']}
master_objstorage = RemoteObjStorage(**master_objstorage_args)
else:
raise ValueError(
'Unknow objstorage class `%s`' % config['objstorage_type']
)
self.master_objstorage = master_objstorage
self.master_objstorage_args = master_objstorage_args
self.master_storage = master_storage
self.slave_storages = slaves
# Keep the full configuration
self.config = config
def run(self):
......@@ -107,6 +147,7 @@ class ArchiverDirector():
run_fn = self.run_async_worker
else:
run_fn = self.run_sync_worker
for batch in self.get_unarchived_content():
run_fn(batch)
......@@ -116,8 +157,8 @@ class ArchiverDirector():
task = app.tasks[task_name]
task.delay(batch,
archiver_args=self.db_conn_archiver,
master_storage_args=self.master_storage_args,
slave_storages=self.slave_storages,
master_objstorage_args=self.master_objstorage_args,
slave_objstorages=self.slave_objstorages,
config=self.config)
def run_sync_worker(self, batch):
......@@ -126,8 +167,8 @@ class ArchiverDirector():
task = app.tasks[task_name]
task(batch,
archiver_args=self.db_conn_archiver,
master_storage_args=self.master_storage_args,
slave_storages=self.slave_storages,
master_objstorage_args=self.master_objstorage_args,
slave_objstorages=self.slave_objstorages,
config=self.config)
def get_unarchived_content(self):
......@@ -168,7 +209,7 @@ class ArchiverDirector():
)
for _content_id, server_id, status, mtime in backups:
virtual_status = self.get_virtual_status(status, mtime)
server_data = (server_id, self.slave_storages[server_id])
server_data = (server_id, self.slave_objstorages[server_id])
missing_copy.setdefault(
db_content_id,
......@@ -177,7 +218,7 @@ class ArchiverDirector():
# Check the content before archival.
try:
self.master_storage.objstorage.check(content_id[0])
self.master_objstorage.check(content_id[0])
except Exception as e:
# Exception can be Error or ObjNotFoundError.
logger.error(e)
......@@ -234,8 +275,6 @@ class ArchiverDirector():
@click.argument('config-path', required=1)
@click.option('--dbconn', default=DEFAULT_CONFIG['dbconn'][1],
help="Connection string for the archiver database")
@click.option('--dbconn-storage', default=DEFAULT_CONFIG['dbconn_storage'][1],
help="Connection string for the storage database")
@click.option('--async/--sync', default=DEFAULT_CONFIG['asynchronous'][1],
help="Indicates if the archiver should run asynchronously")
def launch(config_path, dbconn, dbconn_storage, async):
......@@ -243,13 +282,12 @@ def launch(config_path, dbconn, dbconn_storage, async):
# command line > file config > default config
cl_config = {
'dbconn': dbconn,
'dbconn_storage': dbconn_storage,
'asynchronous': async
}
conf = config.read(config_path, DEFAULT_CONFIG)
conf.update(cl_config)
# Create connection data and run the archiver.
archiver = ArchiverDirector(conf['dbconn'], conf['dbconn_storage'], conf)
archiver = ArchiverDirector(conf['dbconn'], conf)
logger.info("Starting an archival at", datetime.now())
archiver.run()
......
......@@ -12,9 +12,9 @@ class SWHArchiverTask(Task):
"""
task_queue = 'swh_storage_archive_worker'
def run(self, batch, archiver_args, master_storage_args, slave_storages,
config):
aw = ArchiverWorker(batch, archiver_args, master_storage_args,
slave_storages, config)
def run(self, batch, archiver_args, master_objstorage_args,
slave_objstorages, config):
aw = ArchiverWorker(batch, archiver_args, master_objstorage_args,
slave_objstorages, config)
if aw.run():
self.log("Successful backup for a batch of size %s" % len(batch))
......@@ -112,29 +112,27 @@ class TestArchiver(DbsTestFixture, ServerTestFixture,
def __create_director(self, batch_size=5000, archival_max_age=3600,
retention_policy=1, asynchronous=False):
config = {
'objstorage_type': 'local_objstorage',
'objstorage_path': self.objroot,
'objstorage_slicing': '0:2/2:4/4:6',
'batch_max_size': batch_size,
'archival_max_age': archival_max_age,
'retention_policy': retention_policy,
'asynchronous': asynchronous # Avoid depending on queue for tests.
}
director = ArchiverDirector(db_conn_archiver=self.conn,
db_conn_storage=self.conn_storage,
config=config)
return director
def __create_worker(self, batch={}, config={}):
mstorage_args = [
self.archiver.master_storage.db.conn, # master storage db
# connection
self.objroot # object storage path
]
mobjstorage_args = self.archiver.master_objstorage_args
if not config:
config = self.archiver.config
return ArchiverWorker(batch,
archiver_args=self.conn,
master_storage_args=mstorage_args,
slave_storages=[self.storage_data],
master_objstorage_args=mobjstorage_args,
slave_objstorages=[self.storage_data],
config=config)
# Integration test
......
......@@ -6,11 +6,14 @@
import random
import logging
from datetime import datetime
from swh.objstorage import PathSlicingObjStorage
from swh.objstorage.api.client import RemoteObjStorage
from .storage import ArchiverStorage
from .copier import ArchiverCopier
from .. import get_storage
from datetime import datetime
logger = logging.getLogger()
......@@ -26,10 +29,10 @@ class ArchiverWorker():
that associates a content's sha1 id to the list of servers where
the content is present or missing
(see ArchiverDirector::get_unarchived_content).
master_storage_args: The connection argument to initialize the
master_objstorage_args: The connection argument to initialize the
master storage with the db connection url & the object storage
path.
slave_storages: A map that associates server_id to the remote server.
slave_objstorages: A map that associates server_id to the remote server
config: Archiver_configuration. A dictionary that must contains
the following keys.
objstorage_path (string): the path of the objstorage of the
......@@ -43,8 +46,8 @@ class ArchiverWorker():
asynchronous (boolean): Indicate whenever the archival should
run in asynchronous mode or not.
"""
def __init__(self, batch, archiver_args, master_storage_args,
slave_storages, config):
def __init__(self, batch, archiver_args, master_objstorage_args,
slave_objstorages, config):
""" Constructor of the ArchiverWorker class.
Args:
......@@ -53,8 +56,8 @@ class ArchiverWorker():
is present.
archiver_args: The archiver's arguments to establish connection to
db.
master_storage_args: The master storage arguments.
slave_storages: A map that associates server_id to the remote
master_objstorage_args: The master storage arguments.
slave_objstorages: A map that associates server_id to the remote
server.
config: Archiver_configuration. A dictionary that must contains
the following keys.
......@@ -71,11 +74,16 @@ class ArchiverWorker():
"""
self.batch = batch
self.archiver_storage = ArchiverStorage(archiver_args)
self.master_storage = get_storage('local_storage', master_storage_args)
self.slave_storages = slave_storages
self.slave_objstorages = slave_objstorages
self.config = config
def __choose_backup_servers(self, allowed_storage, backup_number):
if config['objstorage_type'] == 'local_objstorage':
master_objstorage = PathSlicingObjStorage(**master_objstorage_args)
else:
master_objstorage = RemoteObjStorage(**master_objstorage_args)
self.master_objstorage = master_objstorage
def _choose_backup_servers(self, allowed_storage, backup_number):
""" Choose the slave servers for archival.
Choose the given amount of servers among those which don't already
......@@ -88,8 +96,7 @@ class ArchiverWorker():
"""
# In case there is not enough backup servers to get all the backups
# we need, just do our best.
# TODO such situation can only be caused by an incorrect configuration
# setting. Do a verification previously.
# Such situation should not happen.
backup_number = min(backup_number, len(allowed_storage))
# TODO Find a better (or a good) policy to choose the backup servers.
......@@ -98,7 +105,7 @@ class ArchiverWorker():
# capacities.
return random.sample(allowed_storage, backup_number)
def __get_archival_status(self, content_id, server):
def _get_archival_status(self, content_id, server):
""" Get the archival status of the required content.
Attributes:
......@@ -118,8 +125,8 @@ class ArchiverWorker():
'mtime': t[3]
}
def __content_archive_update(self, content_id, archive_id,
new_status=None):
def _content_archive_update(self, content_id, archive_id,
new_status=None):
""" Update the status of a archive content and set it's mtime to now()
Change the last modification time of an archived content and change
......@@ -148,7 +155,7 @@ class ArchiverWorker():
content (str): Sha1 of a content.
destination: Tuple (archive id, archive url).
"""
archival_status = self.__get_archival_status(
archival_status = self._get_archival_status(
content,
destination
)
......@@ -187,7 +194,7 @@ class ArchiverWorker():
server_data = self.batch[content_id]
nb_present = len(server_data['present'])
nb_backup = self.config['retention_policy'] - nb_present
backup_servers = self.__choose_backup_servers(
backup_servers = self._choose_backup_servers(
server_data['missing'],
nb_backup
)
......@@ -214,8 +221,8 @@ class ArchiverWorker():
slaves_copy[destination]
))
for content_id in slaves_copy[destination]:
self.__content_archive_update(content_id, destination[0],
new_status='ongoing')
self._content_archive_update(content_id, destination[0],
new_status='ongoing')
# Spawn a copier for each destination
for destination in slaves_copy:
......@@ -236,9 +243,9 @@ class ArchiverWorker():
destination: Tuple (archive_id, archive_url) of the destination.
contents: List of contents to archive.
"""
ac = ArchiverCopier(destination, contents, self.master_storage)
ac = ArchiverCopier(destination, contents, self.master_objstorage)
if ac.run():
# Once the archival complete, update the database.
for content_id in contents:
self.__content_archive_update(content_id, destination[0],
new_status='present')
self._content_archive_update(content_id, destination[0],
new_status='present')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment