Skip to content
Snippets Groups Projects
Commit eea3e15b authored by vlorentz's avatar vlorentz
Browse files

cli: Move the main code of export_graph to its own function

So it can be reused by a Luigi task
parent 5087a463
No related branches found
No related tags found
No related merge requests found
......@@ -8,6 +8,7 @@
import os
import pathlib
import sys
from typing import Any, Dict, List, Optional, Set
import click
......@@ -93,23 +94,10 @@ AVAILABLE_EXPORTERS = {
),
)
@click.pass_context
def export_graph(
ctx, export_path, export_id, formats, exclude, object_types, processes, margin
):
def export_graph(ctx, export_path, formats, exclude, object_types, **kwargs):
"""Export the Software Heritage graph as an edge dataset."""
from importlib import import_module
import logging
import resource
import uuid
from swh.dataset.journalprocessor import ParallelJournalProcessor
logger = logging.getLogger(__name__)
config = ctx.obj["config"]
if not export_id:
export_id = str(uuid.uuid4())
if object_types:
object_types = {o.strip() for o in object_types.split(",")}
invalid_object_types = object_types - set(MAIN_TABLES.keys())
......@@ -128,6 +116,38 @@ def export_graph(
option_name="formats", message=f"{f} is not an available format."
)
run_export_graph(
config,
pathlib.Path(export_path),
export_formats,
list(object_types),
exclude_obj_types=exclude_obj_types,
**kwargs,
)
def run_export_graph(
config: Dict[str, Any],
export_path: pathlib.Path,
export_formats: List[str],
object_types: List[str],
exclude_obj_types: Set[str],
export_id: Optional[str],
processes: int,
margin: Optional[float],
):
from importlib import import_module
import logging
import resource
import uuid
from swh.dataset.journalprocessor import ParallelJournalProcessor
logger = logging.getLogger(__name__)
if not export_id:
export_id = str(uuid.uuid4())
# Enforce order (from origin to contents) to reduce number of holes in the graph.
object_types = [
obj_type for obj_type in MAIN_TABLES.keys() if obj_type in object_types
......@@ -171,18 +191,14 @@ def export_graph(
if obj_type in exclude_obj_types:
continue
exporters = [
(
exporter_cls[f],
{"export_path": os.path.join(export_path, f)},
)
for f in export_formats
(exporter_cls[f], {"export_path": export_path / f}) for f in export_formats
]
parallel_exporter = ParallelJournalProcessor(
config,
exporters,
export_id,
obj_type,
node_sets_path=pathlib.Path(export_path) / ".node_sets" / obj_type,
node_sets_path=export_path / ".node_sets" / obj_type,
processes=processes,
offset_margin=margin,
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment