Skip to content
Snippets Groups Projects

Fix scheduler listener on buster's celery version (4.1.0-4)

Compare and
1 file
+ 28
6
Compare changes
  • Side-by-side
  • Inline
@@ -5,6 +5,7 @@
import click
import datetime
import logging
import socket
from arrow import utcnow
@@ -26,22 +27,29 @@ class ReliableEventReceiver(EventReceiver):
exchange=self.exchange,
routing_key=self.routing_key,
auto_delete=False,
durable=True,
queue_arguments=self._get_queue_arguments())
durable=True)
def get_consumers(self, consumer, channel):
return [consumer(queues=[self.queue],
callbacks=[self._receive], no_ack=False,
accept=self.accept)]
def _receive(self, body, message):
type, body = self.event_from_message(body)
self.process(type, body, message)
def _receive(self, bodies, message):
logging.debug('## event-receiver: bodies: %s' % bodies)
logging.debug('## event-receiver: message: %s' % message)
if not isinstance(bodies, list): # celery<4 returned body as element
bodies = [bodies]
for body in bodies:
type, body = self.event_from_message(body)
self.process(type, body, message)
def process(self, type, event, message):
"""Process the received event by dispatching it to the appropriate
handler."""
handler = self.handlers.get(type) or self.handlers.get('*')
logging.debug('## event-receiver: type: %s' % type)
logging.debug('## event-receiver: event: %s' % event)
logging.debug('## event-receiver: handler: %s' % handler)
handler and handler(event, message)
@@ -93,6 +101,9 @@ def event_monitor(app, backend):
try_perform_actions()
def task_started(event, message):
logging.debug('#### task_started: event: %s' % event)
logging.debug('#### task_started: message: %s' % message)
queue_action({
'action': 'start_task_run',
'args': [event['uuid']],
@@ -106,8 +117,11 @@ def event_monitor(app, backend):
})
def task_succeeded(event, message):
logging.debug('#### task_succeeded: event: %s' % event)
logging.debug('#### task_succeeded: message: %s' % message)
result = event['result']
logging.debug('#### task_succeeded: result: %s' % result)
try:
status = result.get('status')
if status == 'success':
@@ -127,6 +141,9 @@ def event_monitor(app, backend):
})
def task_failed(event, message):
logging.debug('#### task_failed: event: %s' % event)
logging.debug('#### task_failed: message: %s' % message)
queue_action({
'action': 'end_task_run',
'args': [event['uuid']],
@@ -159,7 +176,12 @@ def event_monitor(app, backend):
'--database', '-d', help='Scheduling database DSN')
@click.option('--url', '-u',
help="(Optional) Scheduler's url access")
def main(cls, database, url):
@click.option('--verbose', is_flag=True, default=False,
help='Default to be silent')
def main(cls, database, url, verbose):
if verbose:
logging.basicConfig(level=logging.DEBUG if verbose else logging.INFO)
scheduler = None
override_config = {}
if cls == 'local':
Loading