From a8bc684cceb5825323fe8bd61cae2531b3cc6d81 Mon Sep 17 00:00:00 2001 From: David Douard <david.douard@sdfa3.org> Date: Thu, 31 Jan 2019 12:14:54 +0100 Subject: [PATCH] cli: build the celery app from a celery section of the given configuration file for runner and listener commands. related to T1410 --- swh/scheduler/celery_backend/listener.py | 8 ++++---- swh/scheduler/cli.py | 14 ++++++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py index ad2d9094..7ae3c061 100644 --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -13,9 +13,9 @@ import click from arrow import utcnow from kombu import Queue -from celery.events import EventReceiver -from .config import app as main_app +import celery +from celery.events import EventReceiver class ReliableEventReceiver(EventReceiver): @@ -166,8 +166,8 @@ def event_monitor(app, backend): }) recv = ReliableEventReceiver( - main_app.connection(), - app=main_app, + celery.current_app.connection(), + app=celery.current_app, handlers={ 'task-started': task_started, 'task-result': task_succeeded, diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py index e0237b76..925c1819 100644 --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -497,7 +497,10 @@ def runner(ctx, period): This process is responsible for checking for ready-to-run tasks and schedule them.""" from swh.scheduler.celery_backend.runner import run_ready_tasks - from swh.scheduler.celery_backend.config import app + from swh.scheduler.celery_backend.config import build_app + + app = build_app(ctx.obj['config'].get('celery')) + app.set_current() logger = logging.getLogger(__name__ + '.runner') scheduler = ctx.obj['scheduler'] @@ -529,9 +532,12 @@ def listener(ctx): if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') - from swh.scheduler.celery_backend.listener import ( - event_monitor, main_app) - event_monitor(main_app, backend=scheduler) + from swh.scheduler.celery_backend.config import build_app + app = build_app(ctx.obj['config'].get('celery')) + app.set_current() + + from swh.scheduler.celery_backend.listener import event_monitor + event_monitor(app, backend=scheduler) @cli.command('api-server') -- GitLab