Skip to content
Snippets Groups Projects
Commit e6c2a869 authored by Nicolas Dandrimont's avatar Nicolas Dandrimont
Browse files

Implement listener on top of pika instead of celery

parent 68c42fb3
No related branches found
Tags v0.0.72
1 merge request!131Implement listener on top of pika instead of celery
......@@ -17,6 +17,9 @@ ignore_missing_imports = True
[mypy-kombu.*]
ignore_missing_imports = True
[mypy-pika.*]
ignore_missing_imports = True
[mypy-pkg_resources.*]
ignore_missing_imports = True
......
......@@ -7,6 +7,7 @@ celery >= 4
Click
elasticsearch > 5.4
flask
pika >= 1.1.0
psycopg2
pyyaml
vcversioner
......
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import json
import logging
import sys
import pika
from swh.core.statsd import statsd
from swh.scheduler import get_scheduler
logger = logging.getLogger(__name__)
def utcnow():
return datetime.datetime.now(tz=datetime.timezone.utc)
def get_listener(broker_url, queue_name, scheduler_backend):
connection = pika.BlockingConnection(
pika.URLParameters(broker_url)
)
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
exchange = 'celeryev'
routing_key = '#'
channel.queue_bind(queue=queue_name, exchange=exchange,
routing_key=routing_key)
channel.basic_qos(prefetch_count=1000)
channel.basic_consume(
queue=queue_name,
on_message_callback=get_on_message(scheduler_backend),
)
return channel
def get_on_message(scheduler_backend):
def on_message(channel, method_frame, properties, body):
try:
events = json.loads(body)
except Exception:
logger.warning('Could not parse body %r', body)
events = []
if not isinstance(events, list):
events = [events]
for event in events:
logger.debug('Received event %r', event)
process_event(event, scheduler_backend)
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
return on_message
def process_event(event, scheduler_backend):
uuid = event.get('uuid')
if not uuid:
return
event_type = event['type']
statsd.increment('swh_scheduler_listener_handled_event_total',
tags={'event_type': event_type})
if event_type == 'task-started':
scheduler_backend.start_task_run(
uuid, timestamp=utcnow(),
metadata={'worker': event.get('hostname')},
)
elif event_type == 'task-result':
result = event['result']
status = None
if isinstance(result, dict) and 'status' in result:
status = result['status']
if status == 'success':
status = 'eventful' if result.get('eventful') else 'uneventful'
if status is None:
status = 'eventful' if result else 'uneventful'
scheduler_backend.end_task_run(uuid, timestamp=utcnow(),
status=status, result=result)
elif event_type == 'task-failed':
scheduler_backend.end_task_run(uuid, timestamp=utcnow(),
status='failed')
if __name__ == '__main__':
url = sys.argv[1]
logging.basicConfig(level=logging.DEBUG)
scheduler_backend = get_scheduler('local', args={
'db': 'service=swh-scheduler'
})
channel = get_listener(url, 'celeryev.test', scheduler_backend)
logger.info('Start consuming')
channel.start_consuming()
......@@ -53,16 +53,21 @@ def listener(ctx):
This service is responsible for listening at task lifecycle events and
handle their workflow status in the database."""
scheduler = ctx.obj['scheduler']
if not scheduler:
scheduler_backend = ctx.obj['scheduler']
if not scheduler_backend:
raise ValueError('Scheduler class (local/remote) must be instantiated')
from swh.scheduler.celery_backend.config import build_app
app = build_app(ctx.obj['config'].get('celery'))
app.set_current()
broker = ctx.obj['config']\
.get('celery', {})\
.get('task_broker', 'amqp://guest@localhost/%2f')
from swh.scheduler.celery_backend.pika_listener import get_listener
from swh.scheduler.celery_backend.listener import event_monitor
event_monitor(app, backend=scheduler)
listener = get_listener(broker, 'celeryev.listener', scheduler_backend)
try:
listener.start_consuming()
finally:
listener.stop_consuming()
@cli.command('rpc-serve')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment