diff --git a/swh/scheduler/celery_backend/listener.py b/swh/scheduler/celery_backend/listener.py index ad2d9094c5ed152b4ea6a9a9cca84cf249ad6794..7ae3c061fffe0b7bc214120e6fb9324044fe0ba1 100644 --- a/swh/scheduler/celery_backend/listener.py +++ b/swh/scheduler/celery_backend/listener.py @@ -13,9 +13,9 @@ import click from arrow import utcnow from kombu import Queue -from celery.events import EventReceiver -from .config import app as main_app +import celery +from celery.events import EventReceiver class ReliableEventReceiver(EventReceiver): @@ -166,8 +166,8 @@ def event_monitor(app, backend): }) recv = ReliableEventReceiver( - main_app.connection(), - app=main_app, + celery.current_app.connection(), + app=celery.current_app, handlers={ 'task-started': task_started, 'task-result': task_succeeded, diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py index e0237b76e9dbd320fca9ba94fd551c06842526ad..925c1819d81312ae2f2399690a17299896fee1ab 100644 --- a/swh/scheduler/cli.py +++ b/swh/scheduler/cli.py @@ -497,7 +497,10 @@ def runner(ctx, period): This process is responsible for checking for ready-to-run tasks and schedule them.""" from swh.scheduler.celery_backend.runner import run_ready_tasks - from swh.scheduler.celery_backend.config import app + from swh.scheduler.celery_backend.config import build_app + + app = build_app(ctx.obj['config'].get('celery')) + app.set_current() logger = logging.getLogger(__name__ + '.runner') scheduler = ctx.obj['scheduler'] @@ -529,9 +532,12 @@ def listener(ctx): if not scheduler: raise ValueError('Scheduler class (local/remote) must be instantiated') - from swh.scheduler.celery_backend.listener import ( - event_monitor, main_app) - event_monitor(main_app, backend=scheduler) + from swh.scheduler.celery_backend.config import build_app + app = build_app(ctx.obj['config'].get('celery')) + app.set_current() + + from swh.scheduler.celery_backend.listener import event_monitor + event_monitor(app, backend=scheduler) @cli.command('api-server')