Skip to content
Snippets Groups Projects
Commit b7680125 authored by vlorentz's avatar vlorentz
Browse files

Add CLI script to generate Luigi config and call it

It can be cumbersome to set paths for all (recursives) dependencies of
the task we want to run; this CLI endpoint takes care of most of them.
parent e65858a7
No related branches found
No related tags found
No related merge requests found
......@@ -6,7 +6,7 @@
import logging
from pathlib import Path
import shlex
from typing import TYPE_CHECKING, Any, Dict, Set, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
......@@ -248,6 +248,182 @@ def compress(ctx, input_dataset, output_directory, graph_name, steps):
webgraph.compress(graph_name, input_dataset, output_directory, steps, conf)
def get_all_subclasses(cls):
all_subclasses = []
for subclass in cls.__subclasses__():
all_subclasses.append(subclass)
all_subclasses.extend(get_all_subclasses(subclass))
return all_subclasses
@graph_cli_group.command()
@click.option(
"--base-directory",
required=True,
type=PathlibPath(),
help="""The base directory where all datasets and compressed graphs are.
Its subdirectories should be named after a date (and optional flavor).
For example: ``/poolswh/softwareheritage/``.""",
)
@click.option(
"--base-sensitive-directory",
required=False,
type=PathlibPath(),
help="""The base directory for any data that should not be publicly available
(eg. because it contains people's names).
For example: ``/poolswh/softwareheritage/``.""",
)
@click.option(
"--athena-prefix",
required=False,
type=str,
help="""A prefix for the Athena Database that will be created and/or used.
For example: ``swh``.""",
)
@click.option(
"--s3-prefix",
required=False,
type=str,
help="""The base S3 "directory" where all datasets and compressed graphs are.
Its subdirectories should be named after a date (and optional flavor).
For example: ``s3://softwareheritage/graph/``.""",
)
@click.option(
"--graph-base-directory",
required=False,
type=PathlibPath(),
help="""Overrides the path of the graph to use. Defaults to the value of
``{base_directory}/{dataset_name}/{compressed}/``.
For example: ``/dev/shm/swh-graph/default/``.""",
)
@click.option(
"--dataset-name",
required=True,
type=str,
help="""Should be a date and optionally a flavor, which will be used
as directory name. For example: ``2022-04-25`` or ``2022-11-12_staging``.""",
)
@click.option(
"--luigi-config",
type=PathlibPath(),
help="""Extra options to add to ``luigi.cfg``, following the same format.
This overrides any option that would be other set automatically.""",
)
@click.argument("luigi_param", nargs=-1)
@click.pass_context
def luigi(
ctx,
base_directory: Path,
graph_base_directory: Optional[Path],
base_sensitive_directory: Optional[Path],
s3_prefix: Optional[str],
athena_prefix: Optional[str],
dataset_name: str,
luigi_config: Optional[Path],
luigi_param: List[str],
):
"""
Calls Luigi with the given task and params, and automatically
configures paths based on --base-directory and --dataset-name.
The list of Luigi params should be prefixed with ``--`` so they are not interpreted
by the ``swh`` CLI. For example::
swh graph luigi \
--base-directory ~/tmp/ \
--dataset-name 2022-12-05_test ListOriginContributors \
-- \
RunAll \
--local-scheduler
to pass ``RunAll --local-scheduler`` as Luigi params
"""
import configparser
import os
import subprocess
import tempfile
import luigi
# Popular the list of subclasses of luigi.Task
import swh.dataset.luigi # noqa
import swh.graph.luigi # noqa
config = configparser.ConfigParser()
dataset_path = base_directory / dataset_name
default_values = dict(
local_export_path=dataset_path,
export_task_type="ExportGraph",
compression_task_type="CompressGraph",
local_graph_path=dataset_path / "compressed",
topological_order_path=dataset_path / "topology/topological_order_dfs.csv.zst",
origin_contributors_path=dataset_path / "datasets/contribution_graph.csv.zst",
)
if graph_base_directory:
default_values["local_graph_path"] = graph_base_directory
if s3_prefix:
dataset_s3_prefix = f"{s3_prefix.rstrip('/')}/{dataset_name}"
default_values["s3_export_path"] = dataset_s3_prefix
default_values["s3_graph_path"] = f"{dataset_s3_prefix}/compressed"
if base_sensitive_directory:
sensitive_path = base_sensitive_directory / dataset_name
default_values["deanonymized_origin_contributors_path"] = (
sensitive_path / "datasets/contribution_graph.deanonymized.csv.zst"
)
default_values["deanonymization_table_path"] = (
sensitive_path / "persons_sha256_to_name.csv.zst"
)
if athena_prefix:
default_values[
"athena_db_name"
] = f"{athena_prefix}_{dataset_name.replace('-', '')}"
for task_cls in get_all_subclasses(luigi.Task):
task_name = task_cls.__name__
# If the task has an argument with one of the known name, add the default value
# to its config.
task_config = {
arg_name: str(arg_value)
for arg_name, arg_value in default_values.items()
if hasattr(task_cls, arg_name)
}
if task_config:
config[task_name] = task_config
# If any config is provided, add it.
# This may override default arguments configured above.
if luigi_config is not None:
config.read(luigi_config)
with tempfile.NamedTemporaryFile(mode="w+t", prefix="luigi_", suffix=".cfg") as fd:
config.write(fd)
fd.flush()
proc = subprocess.run(
[
"luigi",
"--module",
"swh.dataset.luigi",
"--module",
"swh.graph.luigi",
*luigi_param,
],
env={
"LUIGI_CONFIG_PATH": fd.name,
**os.environ,
},
)
exit(proc.returncode)
def main():
return graph_cli_group(auto_envvar_prefix="SWH_GRAPH")
......
# Copyright (C) 2019 The Software Heritage developers
# Copyright (C) 2019-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Dict
from click.testing import CliRunner
import pytest
import yaml
from swh.graph.cli import graph_cli_group
......@@ -56,3 +58,49 @@ def test_pipeline():
assert int(properties["nodes"]) == 24
assert int(properties["arcs"]) == 28
@pytest.mark.parametrize("exit_code", [0, 1])
def test_luigi(mocker, tmpdir, exit_code):
"""calls Luigi with the given configuration"""
# bare bone configuration, to allow testing the compression pipeline
# with minimum RAM requirements on trivial graphs
runner = CliRunner()
subprocess_run = mocker.patch("subprocess.run")
subprocess_run.return_value.returncode = exit_code
with TemporaryDirectory(suffix=".swh-graph-test") as tmpdir:
result = runner.invoke(
graph_cli_group,
[
"luigi",
"--base-directory",
f"{tmpdir}/base_dir",
"--dataset-name",
"2022-12-07",
"--",
"foo",
"bar",
"--baz",
"qux",
],
catch_exceptions=False,
)
assert result.exit_code == exit_code, result
luigi_config_path = subprocess_run.mock_calls[0][2]["env"]["LUIGI_CONFIG_PATH"]
subprocess_run.assert_called_once_with(
[
"luigi",
"--module",
"swh.dataset.luigi",
"--module",
"swh.graph.luigi",
"foo",
"bar",
"--baz",
"qux",
],
env={"LUIGI_CONFIG_PATH": luigi_config_path, **os.environ},
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment