Skip to content
Snippets Groups Projects
Commit 0bf9c88d authored by vlorentz's avatar vlorentz
Browse files

luigi: Add LocalExport task

It allows other packages (eg. swh-graph) to depend on the presence of the local
dataset, with a configurable way to obtain it if missing
parent b39436e3
No related branches found
No related tags found
No related merge requests found
......@@ -486,6 +486,62 @@ class DownloadFromS3(luigi.Task):
)
class LocalExport(luigi.Task):
"""Task that depends on a local dataset being present -- either directly from
:class:`ExportGraph` or via :class:`DownloadFromS3`.
"""
local_export_path = PathParameter(is_dir=True)
formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists)
object_types = luigi.EnumListParameter(
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
export_task_type = luigi.TaskParameter(
default=DownloadFromS3,
significant=False,
description="""The task used to get the dataset if it is not present.
Should be either ``swh.dataset.luigi.ExportGraph`` or
``swh.dataset.luigi.DownloadFromS3``.""",
)
def requires(self) -> List[luigi.Task]:
"""Returns an instance of either :class:`ExportGraph` or :class:`DownloadFromS3`
depending on the value of :attr:`export_task_type`."""
if issubclass(self.export_task_type, ExportGraph):
return [
ExportGraph(
local_export_path=self.local_export_path,
formats=self.formats,
object_types=self.object_types,
)
]
elif issubclass(self.export_task_type, DownloadFromS3):
return [
DownloadFromS3(
local_export_path=self.local_export_path,
formats=self.formats,
object_types=self.object_types,
)
]
else:
raise ValueError(
f"Unexpected export_task_type: {self.export_task_type.__name__}"
)
def output(self) -> List[luigi.Target]:
"""Returns stamp and meta paths on the local filesystem."""
return [self._meta()]
def _meta(self):
return luigi.LocalTarget(self.local_export_path / "meta" / "export.json")
def complete(self) -> bool:
return super().complete() and _export_metadata_has_object_types(
self._meta(), self.object_types
)
class AthenaDatabaseTarget(luigi.Target):
"""Target for the existence of a database on Athena."""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment