Skip to content
Snippets Groups Projects
Commit b39436e3 authored by vlorentz's avatar vlorentz
Browse files

luigi: Remove copies of stamp files to/from S3

They are only useful while exporting the dataset -- after the export is
finished, meta.json is good enough and stamp files only save a couple
of minutes when only some objects types are needed (ie. never in practice)
parent 23853dbf
No related branches found
No related tags found
No related merge requests found
......@@ -39,6 +39,7 @@ Tasks in this module work on "export directories", which have this layout::
``stamps`` files are written after corresponding directories are written.
Their presence indicates the corresponding directory was fully generated/copied.
This allows skipping work that was already done, while ignoring interrupted jobs.
They are omitted after the initial export (ie. when downloading to/from other machines).
``meta/export.json`` contains information about the dataset, for provenance tracking.
For example:
......@@ -111,7 +112,7 @@ for readability; but `they can be used interchangeably <https://luigi.readthedoc
# control
import enum
from pathlib import Path
from typing import Hashable, Iterator, List, TypeVar
from typing import Hashable, Iterator, List, TypeVar, Union
import luigi
......@@ -212,6 +213,19 @@ def stamps_paths(formats: List[Format], object_types: List[ObjectType]) -> List[
]
def _export_metadata_has_object_types(
export_metadata: Union[luigi.LocalTarget, "luigi.contrib.s3.S3Target"],
object_types: List[ObjectType],
) -> bool:
import json
with export_metadata.open() as fd:
meta = json.load(fd)
return set(meta["object_type"]) >= {
object_type.name for object_type in object_types
}
class ExportGraph(luigi.Task):
"""Exports the entire graph to the local filesystem.
......@@ -347,25 +361,20 @@ class UploadToS3(luigi.Task):
def output(self) -> List[luigi.Target]:
"""Returns stamp and meta paths on S3."""
return self._stamps() + [self._meta()]
def _stamps(self):
import luigi.contrib.s3
return [
luigi.contrib.s3.S3Target(f"{self.s3_export_path}/{path}")
for path in stamps_paths(self.formats, self.object_types)
]
return [self._meta()]
def _meta(self):
import luigi.contrib.s3
return luigi.contrib.s3.S3Target(f"{self.s3_export_path}/meta/export.json")
def complete(self) -> bool:
return super().complete() and _export_metadata_has_object_types(
self._meta(), self.object_types
)
def run(self) -> None:
"""Copies all files: first the export itself, then stamps, then
:file:`meta.json`.
"""
"""Copies all files: first the export itself, then :file:`meta.json`."""
import os
import luigi.contrib.s3
......@@ -390,13 +399,6 @@ class UploadToS3(luigi.Task):
local_dir / file_, f"{s3_dir}/{file_}", ACL="public-read"
)
for stamp in stamps_paths(self.formats, self.object_types):
client.put_multipart(
self.local_export_path / stamp,
f"{self.s3_export_path}/{stamp}",
ACL="public-read",
)
client.put(
self.local_export_path / "meta" / "export.json",
self._meta().path,
......@@ -438,28 +440,25 @@ class DownloadFromS3(luigi.Task):
]
def output(self) -> List[luigi.Target]:
"""Returns stamp and meta paths on S3."""
return self._stamps() + [self._meta()]
"""Returns stamp and meta paths on the local filesystem."""
return [self._meta()]
def _stamps(self):
return [
luigi.LocalTarget(self.local_export_path / path)
for path in stamps_paths(self.formats, self.object_types)
]
def complete(self) -> bool:
return super().complete() and _export_metadata_has_object_types(
self._meta(), self.object_types
)
def _meta(self):
return luigi.LocalTarget(self.local_export_path / "meta" / "export.json")
def run(self) -> None:
"""Copies all files: first the export itself, then stamps, then
:file:`meta.json`.
"""
"""Copies all files: first the export itself, then :file:`meta.json`."""
import luigi.contrib.s3
import tqdm
client = luigi.contrib.s3.S3Client()
# recursively copy local files to S3, and end with stamps and export metadata
# recursively copy local files to S3, and end with export metadata
for format_ in self.formats:
for object_type in self.object_types:
local_dir = self.local_export_path / format_.name / object_type.name
......@@ -479,14 +478,6 @@ class DownloadFromS3(luigi.Task):
str(local_dir / file_),
)
for stamp in stamps_paths(self.formats, self.object_types):
stamp_path = self.local_export_path / stamp
stamp_path.parent.mkdir(parents=True, exist_ok=True)
client.get(
f"{self.s3_export_path}/{stamp}",
str(stamp_path),
)
export_json_path = self.local_export_path / "meta" / "export.json"
export_json_path.parent.mkdir(exist_ok=True)
client.get(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment