Skip to content
Snippets Groups Projects
Verified Commit 864f86c9 authored by Antoine R. Dumont's avatar Antoine R. Dumont
Browse files

cli.journal_client: Adapt according to latest journal client change

- Rename --max-messages to --stop-after-objects
- Drop the explicit loop within the current implementation and let the base one
  do the job

Following the journal migration (v0.0.27).
parent 317e8009
No related branches found
No related tags found
1 merge request!19(ci: unstuck master build) search: cli: Rename --max-messages to --stop-after-objects
# Add here internal Software Heritage dependencies, one per line.
swh.core[http]
swh.journal
swh.journal >= 0.0.27
swh.model
# Copyright (C) 2019 The Software Heritage developers
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -46,17 +46,17 @@ def journal_client(ctx):
@journal_client.command('objects')
@click.option('--max-messages', '-m', default=None, type=int,
@click.option('--stop-after-objects', '-s', default=None, type=int,
help='Maximum number of objects to replay. Default is to '
'run forever.')
@click.pass_context
def journal_client_objects(ctx, max_messages):
def journal_client_objects(ctx, stop_after_objects):
"""Listens for new objects from the SWH Journal, and schedules tasks
to run relevant indexers (currently, only origin)
on these new objects."""
client = get_journal_client(
ctx, object_types=['origin', 'origin_visit'],
max_messages=max_messages)
stop_after_objects=stop_after_objects)
search = get_search(**ctx.obj['config']['search'])
worker_fn = functools.partial(
......@@ -65,13 +65,14 @@ def journal_client_objects(ctx, max_messages):
)
nb_messages = 0
try:
while not max_messages or nb_messages < max_messages:
nb_messages += client.process(worker_fn)
print('Processed %d messages.' % nb_messages)
nb_messages = client.process(worker_fn)
print('Processed %d messages.' % nb_messages)
except KeyboardInterrupt:
ctx.exit(0)
else:
print('Done.')
finally:
client.close()
@cli.command('rpc-serve')
......
# Copyright (C) 2019 The Software Heritage developers
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -73,7 +73,7 @@ class CliTestCase(BaseElasticsearchTest):
return_value=mock_consumer):
result = invoke(False, [
'journal-client', 'objects',
'--max-messages', '1',
'--stop-after-objects', '1',
], JOURNAL_OBJECTS_CONFIG,
elasticsearch_host=self._elasticsearch_host)
......@@ -113,7 +113,7 @@ class CliTestCase(BaseElasticsearchTest):
return_value=mock_consumer):
result = invoke(False, [
'journal-client', 'objects',
'--max-messages', '1',
'--stop-after-objects', '1',
], JOURNAL_OBJECTS_CONFIG,
elasticsearch_host=self._elasticsearch_host)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment