Skip to content
Snippets Groups Projects
Commit 99a59937 authored by vlorentz's avatar vlorentz
Browse files

luigi: Rename classes to be globally unambiguous

Otherwise, UploadToS3, DownloadToS3, and RunAll would conflict with
tasks about to be defined in swh.graph; and Luigi requires task names
to be globally unique.
parent f8a13718
No related branches found
No related tags found
No related merge requests found
......@@ -97,14 +97,14 @@ files:
config=graph.staging.yml
processes=16
[RunAll]
[RunExportAll]
formats=edges,orc
s3_athena_output_location=s3://vlorentz-test2/tmp/athena-output/
And run this command, for example::
luigi --log-level INFO --local-scheduler --module swh.dataset.luigi RunAll \
--UploadToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/ \
luigi --log-level INFO --local-scheduler --module swh.dataset.luigi RunExportAll \
--UploadExportToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/ \
--s3-export-path=s3://vlorentz-test2/vlorentz_2022-11-09_staging/ \
--athena-db-name=vlorentz_20221109_staging
......@@ -341,14 +341,13 @@ class ExportGraph(luigi.Task):
json.dump(meta, fd, indent=4)
class UploadToS3(luigi.Task):
class UploadExportToS3(luigi.Task):
"""Uploads a local dataset export to S3; creating automatically if it does
not exist.
Example invocation::
luigi --local-scheduler --module swh.dataset.luigi UploadToS3 \
--config=graph.prod.yml \
luigi --local-scheduler --module swh.dataset.luigi UploadExportToS3 \
--local-export-path=export/ \
--formats=edges \
--s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
......@@ -419,15 +418,14 @@ class UploadToS3(luigi.Task):
)
class DownloadFromS3(luigi.Task):
class DownloadExportFromS3(luigi.Task):
"""Downloads a local dataset export from S3.
This performs the inverse operation of :class:`UploadToS3`
This performs the inverse operation of :class:`UploadExportToS3`
Example invocation::
luigi --local-scheduler --module swh.dataset.luigi DownloadFromS3 \
--config=graph.prod.yml \
luigi --local-scheduler --module swh.dataset.luigi DownloadExportFromS3 \
--local-export-path=export/ \
--formats=edges \
--s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
......@@ -444,8 +442,7 @@ class DownloadFromS3(luigi.Task):
"""Returns a :class:`ExportGraph` task that writes local files at the
expected location."""
return [
UploadToS3(
local_export_path=self.local_export_path,
UploadExportToS3(
formats=self.formats,
object_types=self.object_types,
s3_export_path=self.s3_export_path,
......@@ -501,7 +498,7 @@ class DownloadFromS3(luigi.Task):
class LocalExport(luigi.Task):
"""Task that depends on a local dataset being present -- either directly from
:class:`ExportGraph` or via :class:`DownloadFromS3`.
:class:`ExportGraph` or via :class:`DownloadExportFromS3`.
"""
local_export_path = PathParameter(is_dir=True)
......@@ -510,16 +507,17 @@ class LocalExport(luigi.Task):
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
)
export_task_type = luigi.TaskParameter(
default=DownloadFromS3,
default=DownloadExportFromS3,
significant=False,
description="""The task used to get the dataset if it is not present.
Should be either ``swh.dataset.luigi.ExportGraph`` or
``swh.dataset.luigi.DownloadFromS3``.""",
``swh.dataset.luigi.DownloadExportFromS3``.""",
)
def requires(self) -> List[luigi.Task]:
"""Returns an instance of either :class:`ExportGraph` or :class:`DownloadFromS3`
depending on the value of :attr:`export_task_type`."""
"""Returns an instance of either :class:`ExportGraph` or
:class:`DownloadExportFromS3` depending on the value of
:attr:`export_task_type`."""
if issubclass(self.export_task_type, ExportGraph):
return [
......@@ -529,9 +527,9 @@ class LocalExport(luigi.Task):
object_types=self.object_types,
)
]
elif issubclass(self.export_task_type, DownloadFromS3):
elif issubclass(self.export_task_type, DownloadExportFromS3):
return [
DownloadFromS3(
DownloadExportFromS3(
local_export_path=self.local_export_path,
formats=self.formats,
object_types=self.object_types,
......@@ -611,10 +609,10 @@ class CreateAthena(luigi.Task):
)
def requires(self) -> List[luigi.Task]:
"""Returns the corresponding :class:`UploadToS3` instance, with ORC as only
format."""
"""Returns the corresponding :class:`UploadExportToS3` instance,
with ORC as only format."""
return [
UploadToS3(
UploadExportToS3(
formats=[Format.orc], # type: ignore[attr-defined]
object_types=self.object_types,
s3_export_path=self.s3_export_path,
......@@ -637,15 +635,15 @@ class CreateAthena(luigi.Task):
)
class RunAll(luigi.Task):
class RunExportAll(luigi.Task):
"""Runs both the S3 and Athena export.
Example invocation::
luigi --local-scheduler --module swh.dataset.luigi RunAll \
luigi --local-scheduler --module swh.dataset.luigi RunExportAll \
--ExportGraph-config=graph.staging.yml \
--ExportGraph-processes=12 \
--UploadToS3-local-export-path=/tmp/export_2022-11-08_staging/ \
--UploadExportToS3-local-export-path=/tmp/export_2022-11-08_staging/ \
--formats=edges \
--s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 \
--athena-db-name=swh_20221108 \
......@@ -662,8 +660,8 @@ class RunAll(luigi.Task):
athena_db_name = luigi.Parameter()
def requires(self) -> List[luigi.Task]:
# CreateAthena depends on UploadToS3(formats=[edges]), so we need to
# explicitly depend on UploadToS3(formats=self.formats) here, to also
# CreateAthena depends on UploadExportToS3(formats=[edges]), so we need to
# explicitly depend on UploadExportToS3(formats=self.formats) here, to also
# export the formats requested by the user.
return [
CreateAthena(
......@@ -672,7 +670,7 @@ class RunAll(luigi.Task):
s3_athena_output_location=self.s3_athena_output_location,
athena_db_name=self.athena_db_name,
),
UploadToS3(
UploadExportToS3(
formats=self.formats,
object_types=self.object_types,
s3_export_path=self.s3_export_path,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment