From bd580500b6197dcf0eee070a9c62804146fdda1a Mon Sep 17 00:00:00 2001
From: "Antoine R. Dumont (@ardumont)" <antoine.romain.dumont@gmail.com>
Date: Thu, 14 Mar 2019 14:20:14 +0100
Subject: [PATCH] publisher: Initiate the journal cli to start running the
 publisher

This avoids the implicit configuration pattern we strive to remove

Related T1410
---
 setup.py                            |  4 ++
 swh/journal/cli.py                  | 89 +++++++++++++++++++++++++++++
 swh/journal/publisher.py            | 55 ++----------------
 swh/journal/tests/test_publisher.py | 30 ++++++----
 4 files changed, 116 insertions(+), 62 deletions(-)
 create mode 100644 swh/journal/cli.py

diff --git a/setup.py b/setup.py
index db1d2c8..90b0b1f 100755
--- a/setup.py
+++ b/setup.py
@@ -45,6 +45,10 @@ setup(
     url='https://forge.softwareheritage.org/diffusion/DJNL/',
     packages=find_packages(),
     scripts=[],
+    entry_points='''
+        [console_scripts]
+        swh-journal=swh.journal.cli:main
+    ''',
     install_requires=parse_requirements() + parse_requirements('swh'),
     setup_requires=['vcversioner'],
     extras_require={'testing': parse_requirements('test')},
diff --git a/swh/journal/cli.py b/swh/journal/cli.py
new file mode 100644
index 0000000..b498fbb
--- /dev/null
+++ b/swh/journal/cli.py
@@ -0,0 +1,89 @@
+# Copyright (C) 2016-2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import click
+import logging
+import os
+
+from swh.core import config
+from swh.journal.publisher import JournalPublisher
+
+CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
+
+
+@click.group(context_settings=CONTEXT_SETTINGS)
+@click.option('--config-file', '-C', default=None,
+              type=click.Path(exists=True, dir_okay=False,),
+              help="Configuration file.")
+@click.option('--log-level', '-l', default='INFO',
+              type=click.Choice(logging._nameToLevel.keys()),
+              help="Log level (default to INFO)")
+@click.pass_context
+def cli(ctx, config_file, log_level):
+    """Software Heritage Scheduler CLI interface
+
+    Default to use the the local scheduler instance (plugged to the
+    main scheduler db).
+
+    """
+    if not config_file:
+        config_file = os.environ.get('SWH_CONFIG_FILENAME')
+    if not config_file:
+        raise ValueError('You must either pass a config-file parameter '
+                         'or set SWH_CONFIG_FILENAME to target '
+                         'the config-file')
+
+    if not os.path.exists(config_file):
+        raise ValueError('%s does not exist' % config_file)
+
+    conf = config.read(config_file)
+    ctx.ensure_object(dict)
+
+    logger = logging.getLogger(__name__)
+    logger.setLevel(log_level)
+
+    _log = logging.getLogger('kafka')
+    _log.setLevel(logging.INFO)
+
+    ctx.obj['config'] = conf
+    ctx.obj['loglevel'] = log_level
+
+
+@cli.command()
+@click.pass_context
+def publisher(ctx):
+    """Manipulate publisher
+
+    """
+    mandatory_keys = [
+        'brokers', 'temporary_prefix', 'final_prefix', 'consumer_id',
+        'publisher_id', 'object_types', 'storage'
+    ]
+
+    conf = ctx.obj['config']
+    missing_keys = []
+    for key in mandatory_keys:
+        if not conf.get(key):
+            missing_keys.append(key)
+
+    if missing_keys:
+        raise click.ClickException(
+            'Configuration error: The following keys must be'
+            ' provided: %s' % (','.join(missing_keys), ))
+
+    publisher = JournalPublisher(conf)
+    try:
+        while True:
+            publisher.poll()
+    except KeyboardInterrupt:
+        ctx.exit(0)
+
+
+def main():
+    return cli(auto_envvar_prefix='SWH_JOURNAL')
+
+
+if __name__ == '__main__':
+    main()
diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py
index 2e86125..ff0e0f4 100644
--- a/swh/journal/publisher.py
+++ b/swh/journal/publisher.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2016-2018 The Software Heritage developers
+# Copyright (C) 2016-2019 The Software Heritage developers
 # See the AUTHORS file at the top-level directory of this distribution
 # License: GNU General Public License version 3, or any later version
 # See top-level LICENSE file for more information
@@ -8,14 +8,13 @@ import logging
 
 from kafka import KafkaProducer, KafkaConsumer
 
-from swh.core.config import SWHConfig
 from swh.storage import get_storage
 from swh.storage.algos import snapshot
 
 from .serializers import kafka_to_key, key_to_kafka
 
 
-class JournalPublisher(SWHConfig):
+class JournalPublisher:
     """The journal publisher is a layer in charge of:
 
     - consuming messages from topics (1 topic per object_type)
@@ -26,37 +25,10 @@ class JournalPublisher(SWHConfig):
     The main entry point for this class is the 'poll' method.
 
     """
-    DEFAULT_CONFIG = {
-        'brokers': ('list[str]', ['getty.internal.softwareheritage.org']),
-
-        'temporary_prefix': ('str', 'swh.tmp_journal.new'),
-        'final_prefix': ('str', 'swh.journal.objects'),
-
-        'consumer_id': ('str', 'swh.journal.publisher'),
-        'publisher_id': ('str', 'swh.journal.publisher'),
-
-        'object_types': ('list[str]', ['content', 'revision', 'release']),
-
-        'storage': ('dict', {
-            'cls': 'remote',
-            'args': {
-                'url': 'http://localhost:5002/',
-            }
-        }),
-
-        'max_messages': ('int', 10000),
-    }
-
-    CONFIG_BASE_FILENAME = 'journal/publisher'
-
-    def __init__(self, extra_configuration=None):
-        self.config = config = self.parse_config_file()
-        if extra_configuration:
-            config.update(extra_configuration)
-
+    def __init__(self, config):
+        self.config = config
         self._prepare_storage(config)
         self._prepare_journal(config)
-
         self.max_messages = self.config['max_messages']
 
     def _prepare_journal(self, config):
@@ -219,21 +191,4 @@ class JournalPublisher(SWHConfig):
 
 
 if __name__ == '__main__':
-    import click
-
-    @click.command()
-    @click.option('--verbose', is_flag=True, default=False,
-                  help='Be verbose if asked.')
-    def main(verbose):
-        logging.basicConfig(
-            level=logging.DEBUG if verbose else logging.INFO,
-            format='%(asctime)s %(process)d %(levelname)s %(message)s'
-        )
-        _log = logging.getLogger('kafka')
-        _log.setLevel(logging.INFO)
-
-        publisher = JournalPublisher()
-        while True:
-            publisher.poll()
-
-    main()
+    print('Please use the "swh-journal publisher run" command')
diff --git a/swh/journal/tests/test_publisher.py b/swh/journal/tests/test_publisher.py
index f634f1f..4040553 100644
--- a/swh/journal/tests/test_publisher.py
+++ b/swh/journal/tests/test_publisher.py
@@ -101,19 +101,18 @@ ORIGIN_VISITS = [
     }
 ]
 
+TEST_CONFIG = {
+    'brokers': ['localhost'],
+    'temporary_prefix': 'swh.tmp_journal.new',
+    'final_prefix': 'swh.journal.objects',
+    'consumer_id': 'swh.journal.test.publisher',
+    'publisher_id': 'swh.journal.test.publisher',
+    'object_types': ['content'],
+    'max_messages': 3,
+}
 
-class JournalPublisherTest(JournalPublisher):
-    def parse_config_file(self):
-        return {
-            'brokers': ['localhost'],
-            'temporary_prefix': 'swh.tmp_journal.new',
-            'final_prefix': 'swh.journal.objects',
-            'consumer_id': 'swh.journal.test.publisher',
-            'publisher_id': 'swh.journal.test.publisher',
-            'object_types': ['content'],
-            'max_messages': 3,
-        }
 
+class JournalPublisherTest(JournalPublisher):
     def _prepare_storage(self, config):
         self.storage = Storage()
         self.storage.content_add({'data': b'42', **c} for c in CONTENTS)
@@ -130,6 +129,13 @@ class JournalPublisherTest(JournalPublisher):
 
         print("publisher.origin-visits", self.origin_visits)
 
+
+class JournalPublisherNoKafkaInMemoryStorage(JournalPublisherTest):
+    """A journal publisher with:
+    - no kafka dependency
+    - in-memory storage
+    """
+
     def _prepare_journal(self, config):
         """No journal for now
 
@@ -139,7 +145,7 @@ class JournalPublisherTest(JournalPublisher):
 
 class TestPublisher(unittest.TestCase):
     def setUp(self):
-        self.publisher = JournalPublisherTest()
+        self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG)
         self.contents = [{b'sha1': c['sha1']} for c in CONTENTS]
         self.revisions = [{b'id': c['id']} for c in REVISIONS]
         self.releases = [{b'id': c['id']} for c in RELEASES]
-- 
GitLab