Skip to content
Snippets Groups Projects
Commit aedd323f authored by Nicolas Dandrimont's avatar Nicolas Dandrimont
Browse files

Replace swh-worker-control with a swh scheduler celery-monitor subcommand

This new subcommand has two commands:

 - ping: checks whether the given worker instance answers within a given timeout
 - list-running: lists running tasks on the given worker instance
parent 8411335a
No related branches found
No related tags found
1 merge request!138Replace swh-worker-control with a swh scheduler celery-monitor subcommand
#!/usr/bin/env python3
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from fnmatch import fnmatch
from operator import itemgetter
import os
import sys
import click
def list_remote_workers(inspect):
ping_replies = inspect.ping()
if not ping_replies:
return {}
workers = list(sorted(ping_replies))
ret = {}
for worker_name in workers:
if not worker_name.startswith("celery@"):
print("Unsupported worker: %s" % worker_name, file=sys.stderr)
continue
type, host = worker_name[len("celery@") :].split(".", 1)
worker = {
"name": worker_name,
"host": host,
"type": type,
}
ret[worker_name] = worker
return ret
def make_filters(filter_host, filter_type):
"""Parse the filters and create test functions"""
def include(field, value):
def filter(worker, field=field, value=value):
return fnmatch(worker[field], value)
return filter
def exclude(field, value):
def filter(worker, field=field, value=value):
return not fnmatch(worker[field], value)
return filter
filters = []
for host in filter_host:
if host.startswith("-"):
filters.append(exclude("host", host[1:]))
else:
filters.append(include("host", host))
for type_ in filter_type:
if type_.startswith("-"):
filters.append(exclude("type", type_[1:]))
else:
filters.append(include("type", type_))
return filters
def filter_workers(workers, filters):
"""Filter workers according to the set criteria"""
return {
name: worker
for name, worker in workers.items()
if all(check(worker) for check in filters)
}
def get_clock_offsets(workers, inspect):
"""Add a clock_offset entry for each worker"""
err_msg = "Could not get monotonic clock for {worker}"
t = datetime.datetime.now(tz=datetime.timezone.utc)
for worker, clock in inspect._request("monotonic").items():
monotonic = clock.get("monotonic")
if monotonic is None:
monotonic = 0
click.echo(err_msg.format(worker=worker), err=True)
dt = datetime.timedelta(seconds=monotonic)
workers[worker]["clock_offset"] = t - dt
def worker_to_wallclock(worker, monotonic):
"""Convert a monotonic timestamp from a worker to a wall clock time"""
dt = datetime.timedelta(seconds=monotonic)
return worker["clock_offset"] + dt
@click.group()
@click.option(
"--instance-config",
metavar="CONFIG",
default=None,
help="Use this worker instance configuration",
)
@click.option(
"--host", metavar="HOSTNAME_FILTER", multiple=True, help="Filter by hostname"
)
@click.option(
"--type", metavar="WORKER_TYPE_FILTER", multiple=True, help="Filter by worker type"
)
@click.option(
"--timeout",
metavar="TIMEOUT",
type=float,
default=1.0,
help="Timeout for remote control communication",
)
@click.option("--debug/--no-debug", default=False, help="Turn on debugging")
@click.pass_context
def cli(ctx, debug, timeout, instance_config, host, type):
"""Manage the Software Heritage workers
Filters support globs; a filter starting with a "-" excludes the
corresponding values.
"""
if instance_config:
os.environ["SWH_WORKER_INSTANCE"] = instance_config
from swh.scheduler.celery_backend.config import app
full_inspect = app.control.inspect(timeout=timeout)
workers = filter_workers(
list_remote_workers(full_inspect), make_filters(host, type)
)
ctx.obj["workers"] = workers
destination = list(workers)
inspect = app.control.inspect(destination=destination, timeout=timeout)
ctx.obj["inspect"] = inspect
get_clock_offsets(workers, inspect)
ctx.obj["control"] = app.control
ctx.obj["destination"] = destination
ctx.obj["timeout"] = timeout
ctx.obj["debug"] = debug
@cli.command()
@click.pass_context
def list_workers(ctx):
"""List the currently running workers"""
workers = ctx.obj["workers"]
for worker_name, worker in sorted(workers.items()):
click.echo("{type} alive on {host}".format(**worker))
if not workers:
sys.exit(2)
@cli.command()
@click.pass_context
def list_tasks(ctx):
"""List the tasks currently running on workers"""
task_template = (
"{worker} {name}"
"[{id} "
"started={started:%Y-%m-%mT%H:%M:%S} "
"pid={worker_pid}] {args} {kwargs}"
)
inspect = ctx.obj["inspect"]
workers = ctx.obj["workers"]
active = inspect.active()
if not active:
click.echo("No reply from workers", err=True)
sys.exit(2)
has_tasks = False
for worker_name, tasks in sorted(active.items()):
worker = workers[worker_name]
if not tasks:
click.echo("No active tasks on {name}".format(**worker), err=True)
for task in sorted(tasks, key=itemgetter("time_start")):
task["started"] = worker_to_wallclock(worker, task["time_start"])
click.echo(task_template.format(worker=worker_name, **task))
has_tasks = True
if not has_tasks:
sys.exit(2)
@cli.command()
@click.pass_context
def list_queues(ctx):
"""List all the queues currently enabled on the workers"""
inspect = ctx.obj["inspect"]
active = inspect.active_queues()
if not active:
click.echo("No reply from workers", err=True)
sys.exit(2)
has_queues = False
for worker_name, queues in sorted(active.items()):
queues = sorted(queue["name"] for queue in queues)
if queues:
click.echo(
"{worker} {queues}".format(worker=worker_name, queues=" ".join(queues))
)
has_queues = True
else:
click.echo("No queues for {worker}".format(worker=worker_name), err=True)
if not has_queues:
sys.exit(2)
@cli.command()
@click.option("--noop", is_flag=True, default=False, help="Do not proceed")
@click.argument("queues", nargs=-1)
@click.pass_context
def remove_queues(ctx, noop, queues):
"""Cancel the queue for the given workers"""
msg_template = "Canceling queue {queue} on worker {worker}{noop}"
inspect = ctx.obj["inspect"]
control = ctx.obj["control"]
timeout = ctx.obj["timeout"]
active = inspect.active_queues()
if not queues:
queues = ["*"]
if not active:
click.echo("No reply from workers", err=True)
sys.exit(2)
for worker, active_queues in sorted(active.items()):
for queue in sorted(active_queues, key=itemgetter("name")):
if any(fnmatch(queue["name"], name) for name in queues):
msg = msg_template.format(
queue=queue["name"], worker=worker, noop=" (noop)" if noop else ""
)
click.echo(msg, err=True)
if not noop:
control.cancel_consumer(
queue["name"], destination=[worker], timeout=timeout
)
@cli.command()
@click.option("--noop", is_flag=True, default=False, help="Do not proceed")
@click.argument("queues", nargs=-1)
@click.pass_context
def add_queues(ctx, noop, queues):
"""Start the queue for the given workers"""
msg_template = "Starting queue {queue} on worker {worker}{noop}"
control = ctx.obj["control"]
timeout = ctx.obj["timeout"]
workers = ctx.obj["workers"]
if not workers:
click.echo("No reply from workers", err=True)
sys.exit(2)
for worker in sorted(workers):
for queue in queues:
msg = msg_template.format(
queue=queue, worker=worker, noop=" (noop)" if noop else ""
)
click.echo(msg, err=True)
if not noop:
ret = control.add_consumer(queue, destination=[worker], timeout=timeout)
print(ret)
if __name__ == "__main__":
cli(obj={})
......@@ -45,7 +45,6 @@ setup(
author_email="swh-devel@inria.fr",
url="https://forge.softwareheritage.org/diffusion/DSCH/",
packages=find_packages(),
scripts=["bin/swh-worker-control"],
setup_requires=["setuptools-scm"],
use_scm_version=True,
install_requires=parse_requirements() + parse_requirements("swh"),
......
......@@ -74,7 +74,7 @@ def cli(ctx, config_file, database, url, no_stdout):
ctx.obj["config"] = conf
from . import admin, task, task_type # noqa
from . import admin, celery_monitor, task, task_type # noqa
def main():
......
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from ast import literal_eval
import csv
import logging
import sys
import time
from typing import Any, Dict, Optional
import click
from . import cli
logger = logging.getLogger(__name__)
def destination_from_pattern(ctx: click.Context, pattern: Optional[str]):
"""Get the celery destination pattern from host and type values"""
if pattern is None:
logger.debug("Matching all workers")
elif "*" in pattern:
ctx.obj["inspect"].pattern = pattern
ctx.obj["inspect"].matcher = "glob"
logger.debug("Using glob pattern %s", pattern)
else:
destination = pattern.split(",")
ctx.obj["inspect"].destination = destination
logger.debug("Using destinations %s", ", ".join(destination))
@cli.group("celery-monitor")
@click.option(
"--timeout", type=float, default=3.0, help="Timeout for celery remote control"
)
@click.option("--pattern", help="Celery destination pattern", default=None)
@click.pass_context
def celery_monitor(ctx: click.Context, timeout: float, pattern: Optional[str]) -> None:
"""Monitoring of Celery"""
from swh.scheduler.celery_backend.config import app
ctx.obj["timeout"] = timeout
ctx.obj["inspect"] = app.control.inspect(timeout=timeout)
destination_from_pattern(ctx, pattern)
@celery_monitor.command("ping-workers")
@click.pass_context
def ping_workers(ctx: click.Context) -> None:
"""Check which workers respond to the celery remote control"""
response_times = {}
def ping_callback(response):
rtt = time.monotonic() - ping_time
for destination in response:
logger.debug("Got ping response from %s: %r", destination, response)
response_times[destination] = rtt
ctx.obj["inspect"].callback = ping_callback
ping_time = time.monotonic()
ret = ctx.obj["inspect"].ping()
if not ret:
logger.info("No response in %f seconds", time.monotonic() - ping_time)
ctx.exit(1)
for destination in ret:
logger.info(
"Got response from %s in %f seconds",
destination,
response_times[destination],
)
ctx.exit(0)
@celery_monitor.command("list-running")
@click.option(
"--format",
help="Output format",
default="pretty",
type=click.Choice(["pretty", "csv"]),
)
@click.pass_context
def list_running(ctx: click.Context, format: str):
"""List running tasks on the lister workers"""
response_times = {}
def active_callback(response):
rtt = time.monotonic() - active_time
for destination in response:
response_times[destination] = rtt
ctx.obj["inspect"].callback = active_callback
active_time = time.monotonic()
ret = ctx.obj["inspect"].active()
if not ret:
logger.info("No response in %f seconds", time.monotonic() - active_time)
ctx.exit(1)
def pretty_task_arguments(task: Dict[str, Any]) -> str:
arg_list = []
for arg in task["args"]:
arg_list.append(repr(arg))
for k, v in task["kwargs"].items():
arg_list.append(f"{k}={v!r}")
return f'{task["name"]}({", ".join(arg_list)})'
def get_task_data(worker: str, task: Dict[str, Any]) -> Dict[str, Any]:
duration = time.time() - task["time_start"]
return {
"worker": worker,
"name": task["name"],
"args": literal_eval(task["args"]),
"kwargs": literal_eval(task["kwargs"]),
"duration": duration,
"worker_pid": task["worker_pid"],
}
if format == "csv":
writer = csv.DictWriter(
sys.stdout, ["worker", "name", "args", "kwargs", "duration", "worker_pid"]
)
writer.writeheader()
def output(data: Dict[str, Any]):
writer.writerow(data)
elif format == "pretty":
def output(data: Dict[str, Any]):
print(
f"{data['worker']}: {pretty_task_arguments(data)} "
f"[for {data['duration']:f}s, pid={data['worker_pid']}]"
)
else:
logger.error("Unknown format %s", format)
ctx.exit(127)
for worker, active in sorted(ret.items()):
if not active:
logger.info("%s: no active tasks", worker)
continue
for task in sorted(active, key=lambda t: t["time_start"]):
output(get_task_data(worker, task))
ctx.exit(0)
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from click.testing import CliRunner
import pytest
from swh.scheduler.cli import cli
def invoke(*args, catch_exceptions=False):
result = CliRunner(mix_stderr=False).invoke(
cli, ["celery-monitor", *args], catch_exceptions=catch_exceptions,
)
return result
def test_celery_monitor():
"""Check that celery-monitor returns its help text"""
result = invoke()
assert "Commands:" in result.stdout
assert "Options:" in result.stdout
def test_celery_monitor_ping(caplog, swh_app, celery_session_worker):
caplog.set_level(logging.INFO, "swh.scheduler.cli.celery_monitor")
result = invoke("--pattern", celery_session_worker.hostname, "ping-workers")
assert result.exit_code == 0
assert len(caplog.records) == 1
(record,) = caplog.records
assert record.levelname == "INFO"
assert f"response from {celery_session_worker.hostname}" in record.message
@pytest.mark.parametrize(
"filter_args,filter_message,exit_code",
[
((), "Matching all workers", 0),
(
("--pattern", "celery@*.test-host"),
"Using glob pattern celery@*.test-host",
1,
),
(
("--pattern", "celery@test-type.test-host"),
"Using destinations celery@test-type.test-host",
1,
),
(
("--pattern", "celery@test-type.test-host,celery@test-type2.test-host"),
(
"Using destinations "
"celery@test-type.test-host, celery@test-type2.test-host"
),
1,
),
],
)
def test_celery_monitor_ping_filter(
caplog, swh_app, celery_session_worker, filter_args, filter_message, exit_code
):
caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
result = invoke("--timeout", "1.5", *filter_args, "ping-workers")
assert result.exit_code == exit_code, result.stdout
got_no_response_message = False
got_filter_message = False
for record in caplog.records:
# Check the proper filter has been generated
if record.levelname == "DEBUG":
if filter_message in record.message:
got_filter_message = True
# Check that no worker responded
if record.levelname == "INFO":
if "No response in" in record.message:
got_no_response_message = True
assert got_filter_message
if filter_args:
assert got_no_response_message
def test_celery_monitor_list_running(caplog, swh_app, celery_session_worker):
caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
result = invoke("--pattern", celery_session_worker.hostname, "list-running")
assert result.exit_code == 0, result.stdout
for record in caplog.records:
if record.levelname != "INFO":
continue
assert f"{celery_session_worker.hostname}: no active tasks" in record.message
@pytest.mark.parametrize("format", ["csv", "pretty"])
def test_celery_monitor_list_running_format(
caplog, swh_app, celery_session_worker, format
):
caplog.set_level(logging.DEBUG, "swh.scheduler.cli.celery_monitor")
result = invoke(
"--pattern", celery_session_worker.hostname, "list-running", "--format", format
)
assert result.exit_code == 0, result.stdout
for record in caplog.records:
if record.levelname != "INFO":
continue
assert f"{celery_session_worker.hostname}: no active tasks" in record.message
if format == "csv":
lines = result.stdout.splitlines()
assert lines == ["worker,name,args,kwargs,duration,worker_pid"]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment