Skip to content
Snippets Groups Projects
Commit 293404ca authored by vlorentz's avatar vlorentz Committed by vlorentz
Browse files

UploadGraphToS3: Add parallelism

parent 5688517a
No related branches found
Tags v6.3.1
No related merge requests found
......@@ -77,7 +77,7 @@ The layout is otherwise the same as the file layout.
import collections
from pathlib import Path
from typing import List, Set
from typing import Dict, List, Set
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
......@@ -596,6 +596,7 @@ class UploadGraphToS3(luigi.Task):
local_graph_path = luigi.PathParameter(significant=False)
s3_graph_path = S3PathParameter()
parallelism = luigi.IntParameter(default=10, significant=False)
def requires(self) -> List[luigi.Task]:
"""Returns a :class:`CompressGraph` task that writes local files at the
......@@ -617,8 +618,7 @@ class UploadGraphToS3(luigi.Task):
def run(self) -> None:
"""Copies all files: first the graph itself, then :file:`meta/compression.json`."""
import subprocess
import tempfile
import multiprocessing.dummy
import luigi.contrib.s3
import tqdm
......@@ -626,54 +626,74 @@ class UploadGraphToS3(luigi.Task):
compression_metadata_path = self.local_graph_path / "meta" / "compression.json"
seen_compression_metadata = False
client = luigi.contrib.s3.S3Client()
# recursively copy local files to S3, and end with compression metadata
paths = list(self.local_graph_path.glob("**/*"))
for (i, path) in tqdm.tqdm(
list(enumerate(paths)),
desc="Uploading compressed graph",
):
paths = []
for path in self.local_graph_path.glob("**/*"):
if path == compression_metadata_path:
# Write it last
seen_compression_metadata = True
continue
if path.is_dir():
continue
relative_path = path.relative_to(self.local_graph_path)
self.set_progress_percentage(int(i * 100 / len(paths)))
if path.suffix == ".bin":
# Large sparse file; store it compressed on S3.
with tempfile.NamedTemporaryFile(
prefix=path.stem, suffix=".bin.zst"
) as fd:
self.set_status_message(f"Compressing {relative_path}")
subprocess.run(
["zstdmt", "--force", "--keep", path, "-o", fd.name], check=True
)
self.set_status_message(f"Uploading {relative_path} (compressed)")
client.put_multipart(
fd.name,
f"{self.s3_graph_path}/{relative_path}.zst",
ACL="public-read",
)
else:
self.set_status_message(f"Uploading {relative_path}")
client.put_multipart(
path, f"{self.s3_graph_path}/{relative_path}", ACL="public-read"
)
paths.append(path)
assert (
seen_compression_metadata
), "did not see meta/compression.json in directory listing"
# Write it last, to act as a stamp
client.put(
compression_metadata_path,
self._meta().path,
ACL="public-read",
)
self.__status_messages: Dict[Path, str] = {}
client = luigi.contrib.s3.S3Client()
with multiprocessing.dummy.Pool(self.parallelism) as p:
for (i, relative_path) in tqdm.tqdm(
enumerate(p.imap_unordered(self._upload_file, paths)),
total=len(paths),
desc="Uploading compressed graph",
):
self.set_progress_percentage(int(i * 100 / len(paths)))
self.set_status_message("\n".join(self.__status_messages.values()))
# Write it last, to act as a stamp
client.put(
compression_metadata_path,
self._meta().path,
ACL="public-read",
)
def _upload_file(self, path):
import subprocess
import tempfile
import luigi.contrib.s3
client = luigi.contrib.s3.S3Client()
relative_path = path.relative_to(self.local_graph_path)
if path.suffix == ".bin":
# Large sparse file; store it compressed on S3.
with tempfile.NamedTemporaryFile(prefix=path.stem, suffix=".bin.zst") as fd:
self.__status_messages[path] = f"Compressing {relative_path}"
subprocess.run(
["zstdmt", "--force", "--keep", path, "-o", fd.name], check=True
)
self.__status_messages[path] = f"Uploading {relative_path} (compressed)"
client.put_multipart(
fd.name,
f"{self.s3_graph_path}/{relative_path}.zst",
ACL="public-read",
)
else:
self.__status_messages[path] = f"Uploading {relative_path}"
client.put_multipart(
path, f"{self.s3_graph_path}/{relative_path}", ACL="public-read"
)
del self.__status_messages[path]
return relative_path
class DownloadGraphFromS3(luigi.Task):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment