Skip to content
Snippets Groups Projects
Commit 584724d2 authored by David Douard's avatar David Douard
Browse files

Add a new SWHTask class to be used as base class for swh celery tasks

It is meant to be used to declare swh tasks via the task decorator
instead of subclassing the (now deprecated) Task class.

It is typically used like:

  from swh.scheduler.celery_backend.config import app
  from swh.scheduler.tasks import SWHTask

  @app.task(base=SWHTask)
  def ping():
      return 'pong'
parent b812434b
No related branches found
No related tags found
1 merge request!30Add a new SWHTask class to be used as base class for swh celery tasks
......@@ -7,9 +7,35 @@ import celery.app.task
from celery.utils.log import get_task_logger
class Task(celery.app.task.Task):
class SWHTask(celery.app.task.Task):
"""a schedulable task (abstract class)
Current implementation is based on Celery. See
http://docs.celeryproject.org/en/latest/reference/celery.app.task.html for
how to use tasks once instantiated
"""
_log = None
def on_failure(self, exc, task_id, args, kwargs, einfo):
self.send_event('task-result-exception')
def on_success(self, retval, task_id, args, kwargs):
self.send_event('task-result', result=retval)
@property
def log(self):
if self._log is None:
self._log = get_task_logger(self.name)
return self._log
class Task(SWHTask):
"""a schedulable task (abstract class)
DEPRECATED! Please use SWHTask as base for decorated functions instead.
Sub-classes must implement the run_task() method.
Current implementation is based on Celery. See
......@@ -25,14 +51,7 @@ class Task(celery.app.task.Task):
Should not be overridden as we need our special events to be sent for
the reccurrent scheduler. Override run_task instead."""
try:
result = self.run_task(*args, **kwargs)
except Exception as e:
self.send_event('task-result-exception')
raise e from None
else:
self.send_event('task-result', result=result)
return result
return self.run_task(*args, **kwargs)
def run_task(self, *args, **kwargs):
"""Perform the task.
......@@ -41,10 +60,3 @@ class Task(celery.app.task.Task):
scheduler using a celery event.
"""
raise NotImplementedError('tasks must implement the run_task() method')
@property
def log(self):
if not hasattr(self, '__log'):
self.__log = get_task_logger('%s.%s' %
(__name__, self.__class__.__name__))
return self.__log
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment