From 99a599378f29f93b74f6ed86260369fddadc038d Mon Sep 17 00:00:00 2001
From: Valentin Lorentz <vlorentz@softwareheritage.org>
Date: Tue, 29 Nov 2022 11:47:05 +0100
Subject: [PATCH] luigi: Rename classes to be globally unambiguous

Otherwise, UploadToS3, DownloadToS3, and RunAll would conflict with
tasks about to be defined in swh.graph; and Luigi requires task names
to be globally unique.
---
 swh/dataset/luigi.py | 54 +++++++++++++++++++++-----------------------
 1 file changed, 26 insertions(+), 28 deletions(-)

diff --git a/swh/dataset/luigi.py b/swh/dataset/luigi.py
index cdcf173..d169e39 100644
--- a/swh/dataset/luigi.py
+++ b/swh/dataset/luigi.py
@@ -97,14 +97,14 @@ files:
     config=graph.staging.yml
     processes=16
 
-    [RunAll]
+    [RunExportAll]
     formats=edges,orc
     s3_athena_output_location=s3://vlorentz-test2/tmp/athena-output/
 
 And run this command, for example::
 
-    luigi --log-level INFO --local-scheduler --module swh.dataset.luigi RunAll \
-            --UploadToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/ \
+    luigi --log-level INFO --local-scheduler --module swh.dataset.luigi RunExportAll \
+            --UploadExportToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/ \
             --s3-export-path=s3://vlorentz-test2/vlorentz_2022-11-09_staging/ \
             --athena-db-name=vlorentz_20221109_staging
 
@@ -341,14 +341,13 @@ class ExportGraph(luigi.Task):
             json.dump(meta, fd, indent=4)
 
 
-class UploadToS3(luigi.Task):
+class UploadExportToS3(luigi.Task):
     """Uploads a local dataset export to S3; creating automatically if it does
     not exist.
 
     Example invocation::
 
-        luigi --local-scheduler --module swh.dataset.luigi UploadToS3 \
-                --config=graph.prod.yml \
+        luigi --local-scheduler --module swh.dataset.luigi UploadExportToS3 \
                 --local-export-path=export/ \
                 --formats=edges \
                 --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
@@ -419,15 +418,14 @@ class UploadToS3(luigi.Task):
         )
 
 
-class DownloadFromS3(luigi.Task):
+class DownloadExportFromS3(luigi.Task):
     """Downloads a local dataset export from S3.
 
-    This performs the inverse operation of :class:`UploadToS3`
+    This performs the inverse operation of :class:`UploadExportToS3`
 
     Example invocation::
 
-        luigi --local-scheduler --module swh.dataset.luigi DownloadFromS3 \
-                --config=graph.prod.yml \
+        luigi --local-scheduler --module swh.dataset.luigi DownloadExportFromS3 \
                 --local-export-path=export/ \
                 --formats=edges \
                 --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08
@@ -444,8 +442,7 @@ class DownloadFromS3(luigi.Task):
         """Returns a :class:`ExportGraph` task that writes local files at the
         expected location."""
         return [
-            UploadToS3(
-                local_export_path=self.local_export_path,
+            UploadExportToS3(
                 formats=self.formats,
                 object_types=self.object_types,
                 s3_export_path=self.s3_export_path,
@@ -501,7 +498,7 @@ class DownloadFromS3(luigi.Task):
 
 class LocalExport(luigi.Task):
     """Task that depends on a local dataset being present -- either directly from
-    :class:`ExportGraph` or via :class:`DownloadFromS3`.
+    :class:`ExportGraph` or via :class:`DownloadExportFromS3`.
     """
 
     local_export_path = PathParameter(is_dir=True)
@@ -510,16 +507,17 @@ class LocalExport(luigi.Task):
         enum=ObjectType, default=list(ObjectType), batch_method=merge_lists
     )
     export_task_type = luigi.TaskParameter(
-        default=DownloadFromS3,
+        default=DownloadExportFromS3,
         significant=False,
         description="""The task used to get the dataset if it is not present.
         Should be either ``swh.dataset.luigi.ExportGraph`` or
-        ``swh.dataset.luigi.DownloadFromS3``.""",
+        ``swh.dataset.luigi.DownloadExportFromS3``.""",
     )
 
     def requires(self) -> List[luigi.Task]:
-        """Returns an instance of either :class:`ExportGraph` or :class:`DownloadFromS3`
-        depending on the value of :attr:`export_task_type`."""
+        """Returns an instance of either :class:`ExportGraph` or
+        :class:`DownloadExportFromS3` depending on the value of
+        :attr:`export_task_type`."""
 
         if issubclass(self.export_task_type, ExportGraph):
             return [
@@ -529,9 +527,9 @@ class LocalExport(luigi.Task):
                     object_types=self.object_types,
                 )
             ]
-        elif issubclass(self.export_task_type, DownloadFromS3):
+        elif issubclass(self.export_task_type, DownloadExportFromS3):
             return [
-                DownloadFromS3(
+                DownloadExportFromS3(
                     local_export_path=self.local_export_path,
                     formats=self.formats,
                     object_types=self.object_types,
@@ -611,10 +609,10 @@ class CreateAthena(luigi.Task):
             )
 
     def requires(self) -> List[luigi.Task]:
-        """Returns the corresponding :class:`UploadToS3` instance, with ORC as only
-        format."""
+        """Returns the corresponding :class:`UploadExportToS3` instance,
+        with ORC as only format."""
         return [
-            UploadToS3(
+            UploadExportToS3(
                 formats=[Format.orc],  # type: ignore[attr-defined]
                 object_types=self.object_types,
                 s3_export_path=self.s3_export_path,
@@ -637,15 +635,15 @@ class CreateAthena(luigi.Task):
         )
 
 
-class RunAll(luigi.Task):
+class RunExportAll(luigi.Task):
     """Runs both the S3 and Athena export.
 
     Example invocation::
 
-        luigi --local-scheduler --module swh.dataset.luigi RunAll \
+        luigi --local-scheduler --module swh.dataset.luigi RunExportAll \
                 --ExportGraph-config=graph.staging.yml \
                 --ExportGraph-processes=12 \
-                --UploadToS3-local-export-path=/tmp/export_2022-11-08_staging/ \
+                --UploadExportToS3-local-export-path=/tmp/export_2022-11-08_staging/ \
                 --formats=edges \
                 --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 \
                 --athena-db-name=swh_20221108 \
@@ -662,8 +660,8 @@ class RunAll(luigi.Task):
     athena_db_name = luigi.Parameter()
 
     def requires(self) -> List[luigi.Task]:
-        # CreateAthena depends on UploadToS3(formats=[edges]), so we need to
-        # explicitly depend on UploadToS3(formats=self.formats) here, to also
+        # CreateAthena depends on UploadExportToS3(formats=[edges]), so we need to
+        # explicitly depend on UploadExportToS3(formats=self.formats) here, to also
         # export the formats requested by the user.
         return [
             CreateAthena(
@@ -672,7 +670,7 @@ class RunAll(luigi.Task):
                 s3_athena_output_location=self.s3_athena_output_location,
                 athena_db_name=self.athena_db_name,
             ),
-            UploadToS3(
+            UploadExportToS3(
                 formats=self.formats,
                 object_types=self.object_types,
                 s3_export_path=self.s3_export_path,
-- 
GitLab