From 8a3aad37c42edf6814cae0660f2e033e49aa3496 Mon Sep 17 00:00:00 2001 From: David Douard <david.douard@sdfa3.org> Date: Fri, 8 Feb 2019 17:25:47 +0100 Subject: [PATCH] Normalize the configuration of VaultBackend and cooker - Ensure every service for which an (remote or local) access is needed uses the same config structure (dict with 'cls' and 'args' keys). - Extract instanciation (thus confguration) of remote/external services (storage, cache, scheduler) from the VaultBackend constructor; these are now passed as __init__ arguments; their instanciation/configuration is thus now handled by swh.vault.api.server and swh.vault.cooker.get_cooker (with some support in swh.vault.get_vault). --- swh/vault/__init__.py | 18 +++++++--- swh/vault/api/client.py | 4 +-- swh/vault/backend.py | 34 +++++++----------- swh/vault/cache.py | 2 +- swh/vault/cli.py | 67 +++++++---------------------------- swh/vault/cookers/__init__.py | 39 +++++++++++++++++++- swh/vault/cookers/base.py | 34 +++++++++--------- swh/vault/cooking_tasks.py | 4 +-- 8 files changed, 100 insertions(+), 102 deletions(-) diff --git a/swh/vault/__init__.py b/swh/vault/__init__.py index 11c6215..f9bcd02 100644 --- a/swh/vault/__init__.py +++ b/swh/vault/__init__.py @@ -2,9 +2,12 @@ # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging +logger = logging.getLogger(__name__) -def get_vault(cls, args): + +def get_vault(cls='remote', args={}): """ Get a vault object of class `vault_class` with arguments `vault_args`. @@ -15,16 +18,23 @@ def get_vault(cls, args): - args (dict): dictionary with keys Returns: - an instance of swh.storage.Storage (either local or remote) + an instance of VaultBackend (either local or remote) Raises: ValueError if passed an unknown storage class. """ - if cls == 'remote': from .api.client import RemoteVaultClient as Vault + elif cls == 'local': + from swh.scheduler import get_scheduler + from swh.storage import get_storage + from swh.vault.cache import VaultCache + from swh.vault.backend import VaultBackend as Vault + args['cache'] = VaultCache(**args['cache']) + args['storage'] = get_storage(**args['storage']) + args['scheduler'] = get_scheduler(**args['scheduler']) else: raise ValueError('Unknown storage class `%s`' % cls) - + logger.debug('Instantiating %s with %s' % (Vault, args)) return Vault(**args) diff --git a/swh/vault/api/client.py b/swh/vault/api/client.py index 43cc0a3..fcdfb9d 100644 --- a/swh/vault/api/client.py +++ b/swh/vault/api/client.py @@ -17,9 +17,9 @@ class VaultAPIError(Exception): class RemoteVaultClient(SWHRemoteAPI): """Client to the Software Heritage vault cache.""" - def __init__(self, base_url, timeout=None): + def __init__(self, url, timeout=None): super().__init__( - api_exception=VaultAPIError, url=base_url, timeout=timeout) + api_exception=VaultAPIError, url=url, timeout=timeout) # Web API endpoints diff --git a/swh/vault/backend.py b/swh/vault/backend.py index 9d99f5b..3eb0670 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -11,10 +11,8 @@ from email.mime.text import MIMEText from swh.core.db import BaseDb from swh.core.db.common import db_transaction from swh.model import hashutil -from swh.scheduler import get_scheduler from swh.scheduler.utils import create_oneshot_task_dict -from swh.vault.cache import VaultCache -from swh.vault.cookers import get_cooker +from swh.vault.cookers import get_cooker_cls cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' @@ -69,26 +67,21 @@ def batch_to_bytes(batch): for obj_type, obj_id in batch] -# TODO: This has to be factorized with other database base classes and helpers -# (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) -# The three first methods are imported from swh.scheduler.backend. class VaultBackend: """ Backend for the Software Heritage vault. """ - def __init__(self, **config): + def __init__(self, db, cache, scheduler, storage=None, **config): self.config = config - self.cache = VaultCache(self.config['cache']) + self.cache = cache + self.scheduler = scheduler + self.storage = storage self.smtp_server = smtplib.SMTP() - self.scheduler = get_scheduler(**self.config['scheduler']) - cfg = config['vault'] - assert cfg['cls'] == 'local' - args = cfg['args'] self._pool = psycopg2.pool.ThreadedConnectionPool( - args.get('min_pool_conns', 1), - args.get('max_pool_conns', 10), - args['db'], + config.get('min_pool_conns', 1), + config.get('max_pool_conns', 10), + db, cursor_factory=psycopg2.extras.RealDictCursor, ) self._db = None @@ -112,7 +105,7 @@ class VaultBackend: res['object_id'] = bytes(res['object_id']) return res - def _send_task(self, args): + def _send_task(self, *args): """Send a cooking task to the celery scheduler""" task = create_oneshot_task_dict('swh-vault-cooking', *args) added_tasks = self.scheduler.create_tasks([task]) @@ -123,11 +116,10 @@ class VaultBackend: """Create and send a cooking task""" obj_id = hashutil.hash_to_bytes(obj_id) hex_id = hashutil.hash_to_hex(obj_id) - args = [obj_type, hex_id] - backend_storage_config = {'storage': self.config['storage']} - cooker_class = get_cooker(obj_type) - cooker = cooker_class(*args, override_cfg=backend_storage_config) + cooker_class = get_cooker_cls(obj_type) + cooker = cooker_class(obj_type, hex_id, + backend=self, storage=self.storage) if not cooker.check_exists(): raise NotFoundExc("Object {} was not found.".format(hex_id)) @@ -136,7 +128,7 @@ class VaultBackend: VALUES (%s, %s, %s)''', (obj_type, obj_id, sticky)) db.conn.commit() - task_id = self._send_task(args) + task_id = self._send_task(obj_type, hex_id) cur.execute(''' UPDATE vault_bundle diff --git a/swh/vault/cache.py b/swh/vault/cache.py index 4c06004..2a6b231 100644 --- a/swh/vault/cache.py +++ b/swh/vault/cache.py @@ -15,7 +15,7 @@ class VaultCache: internal identifiers used in the underlying objstorage. """ - def __init__(self, objstorage): + def __init__(self, **objstorage): self.objstorage = get_objstorage(**objstorage) def add(self, obj_type, obj_id, content): diff --git a/swh/vault/cli.py b/swh/vault/cli.py index 3037962..98dc6c4 100644 --- a/swh/vault/cli.py +++ b/swh/vault/cli.py @@ -2,33 +2,29 @@ import logging import click import aiohttp -from swh.core import config -from swh.vault import get_vault -from swh.vault.api.server import make_app, DEFAULT_CONFIG +from swh.vault.api.server import make_app_from_configfile CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) -@click.group(context_settings=CONTEXT_SETTINGS) +@click.command(context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") -@click.option('--database', '-d', default=None, - help="Scheduling database DSN (imply cls is 'local')") -@click.option('--url', '-u', default=None, - help="Scheduler's url access (imply cls is 'remote')") @click.option('--log-level', '-l', default='INFO', type=click.Choice(logging._nameToLevel.keys()), help="Log level (default to INFO)") @click.option('--no-stdout', is_flag=True, default=False, help="Do NOT output logs on the console") +@click.option('--host', default='0.0.0.0', help="Host to run the server") +@click.option('--port', default=5005, type=click.INT, + help="Binding port of the server") +@click.option('--debug/--nodebug', default=True, + help="Indicates if the server should run in debug mode") @click.pass_context -def cli(ctx, config_file, database, url, log_level, no_stdout): - """Software Heritage Scheduler CLI interface - - Default to use the the local scheduler instance (plugged to the - main scheduler db). +def cli(ctx, config_file, log_level, no_stdout, host, port, debug): + """Software Heritage Vault API server """ from swh.scheduler.celery_backend.config import setup_log_handler @@ -37,49 +33,12 @@ def cli(ctx, config_file, database, url, log_level, no_stdout): format='[%(levelname)s] %(name)s -- %(message)s', log_console=not no_stdout) - ctx.ensure_object(dict) - - logger = logging.getLogger(__name__) - vault = None - conf = config.read(config_file, DEFAULT_CONFIG) - if 'vault' not in conf: - raise ValueError("missing 'vault' configuration") - - if database: - conf['vault']['cls'] = 'local' - conf['vault']['args']['db'] = database - elif url: - conf['vault']['cls'] = 'remote' - conf['vault']['args'] = {'url': url} - sched_conf = conf['vault'] try: - logger.debug('Instanciating vault with %s' % ( - sched_conf)) - vault = get_vault(**conf) - except ValueError: - # it's the subcommand to decide whether not having a proper - # vault instance is a problem. - pass - - ctx.obj['vault'] = vault - ctx.obj['config'] = conf - ctx.obj['loglevel'] = log_level - - -@cli.command('api-server') -@click.option('--host', default='0.0.0.0', help="Host to run the server") -@click.option('--port', default=5005, type=click.INT, - help="Binding port of the server") -@click.option('--debug/--nodebug', default=True, - help="Indicates if the server should run in debug mode") -@click.pass_context -def serve(ctx, host, port, debug): - if ctx.obj['config']['vault']['cls'] == 'remote': - click.echo("The API server can only be started with a 'local' " - "configuration", err=True) + app = make_app_from_configfile(config_file, debug=debug) + except EnvironmentError as e: + click.echo(e.msg, err=True) ctx.exit(1) - app = make_app(ctx.obj['config'], backend=ctx.obj['vault'], - debug=bool(debug)) + aiohttp.web.run_app(app, host=host, port=int(port)) diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py index 6dc658a..f4124da 100644 --- a/swh/vault/cookers/__init__.py +++ b/swh/vault/cookers/__init__.py @@ -2,7 +2,12 @@ # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import os +from swh.core.config import load_named_config, read as read_config +from swh.storage import get_storage +from swh.vault import get_vault +from swh.vault.cookers.base import DEFAULT_CONFIG_PATH, DEFAULT_CONFIG from swh.vault.cookers.directory import DirectoryCooker from swh.vault.cookers.revision_flat import RevisionFlatCooker from swh.vault.cookers.revision_gitfast import RevisionGitfastCooker @@ -13,4 +18,36 @@ COOKER_TYPES = { 'revision_gitfast': RevisionGitfastCooker, } -get_cooker = COOKER_TYPES.__getitem__ + +def get_cooker_cls(obj_type): + return COOKER_TYPES[obj_type] + + +def get_cooker(obj_type, obj_id): + if 'SWH_CONFIG_FILENAME' in os.environ: + cfg = read_config(os.environ['SWH_CONFIG_FILENAME'], DEFAULT_CONFIG) + else: + cfg = load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) + cooker_cls = get_cooker_cls(obj_type) + if 'vault' not in cfg: + raise ValueError("missing '%vault' configuration") + + vcfg = cfg['vault'] + if vcfg['cls'] != 'remote': + raise EnvironmentError( + "This vault backend can only be a 'remote' " + "configuration", err=True) + args = vcfg['args'] + if 'storage' not in args: + args['storage'] = cfg.get('storage') + + if not args.get('storage'): + raise ValueError( + "invalid configuration; missing 'storage' config entry.") + + storage = get_storage(**args.pop('storage')) + backend = get_vault(**vcfg) + + return cooker_cls(obj_type, obj_id, + backend=backend, storage=storage, + max_bundle_size=cfg['max_bundle_size']) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index aea710e..8e42f5f 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -7,12 +7,9 @@ import abc import io import logging -from swh.core import config from swh.model import hashutil -from swh.storage import get_storage -from swh.vault.api.client import RemoteVaultClient - +MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB DEFAULT_CONFIG_PATH = 'vault/cooker' DEFAULT_CONFIG = { 'storage': ('dict', { @@ -21,8 +18,13 @@ DEFAULT_CONFIG = { 'url': 'http://localhost:5002/', }, }), - 'vault_url': ('str', 'http://localhost:5005/'), - 'max_bundle_size': ('int', 2 ** 29), # 512 MiB + 'vault': ('dict', { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:5005/', + }, + }), + 'max_bundle_size': ('int', MAX_BUNDLE_SIZE), } @@ -61,7 +63,8 @@ class BaseVaultCooker(metaclass=abc.ABCMeta): """ CACHE_TYPE_KEY = None - def __init__(self, obj_type, obj_id, *, override_cfg=None): + def __init__(self, obj_type, obj_id, backend, storage, + max_bundle_size=MAX_BUNDLE_SIZE): """Initialize the cooker. The type of the object represented by the id depends on the @@ -69,20 +72,17 @@ class BaseVaultCooker(metaclass=abc.ABCMeta): own cooker class. Args: - storage: the storage object - cache: the cache where to store the bundle + obj_type: type of the object to be cooked into a bundle (directory, + revision_flat or revision_gitfast; see + swh.vault.cooker.COOKER_TYPES). obj_id: id of the object to be cooked into a bundle. + backend: the vault backend (swh.vault.backend.VaultBackend). """ - self.config = config.load_named_config(DEFAULT_CONFIG_PATH, - DEFAULT_CONFIG) - if override_cfg is not None: - self.config.update(override_cfg) - self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) - self.backend = RemoteVaultClient(self.config['vault_url']) - self.storage = get_storage(**self.config['storage']) - self.max_bundle_size = self.config['max_bundle_size'] + self.backend = backend + self.storage = storage + self.max_bundle_size = max_bundle_size @abc.abstractmethod def check_exists(self): diff --git a/swh/vault/cooking_tasks.py b/swh/vault/cooking_tasks.py index 5e4254b..4cf9ced 100644 --- a/swh/vault/cooking_tasks.py +++ b/swh/vault/cooking_tasks.py @@ -11,11 +11,11 @@ from swh.vault.cookers import get_cooker @app.task(name=__name__ + '.SWHCookingTask') def cook_bundle(obj_type, obj_id): """Main task to cook a bundle.""" - get_cooker(obj_type)(obj_type, obj_id).cook() + get_cooker(obj_type, obj_id).cook() # TODO: remove once the scheduler handles priority tasks @app.task(name=__name__ + '.SWHBatchCookingTask') def batch_cook_bundle(obj_type, obj_id): """Temporary task for the batch queue.""" - get_cooker(obj_type)(obj_type, obj_id).cook() + get_cooker(obj_type, obj_id).cook() -- GitLab