Skip to content
Snippets Groups Projects
Commit 295e4138 authored by Antoine Lambert's avatar Antoine Lambert
Browse files

to_disk: Speedup directory cooking with multi-threading

Previously when cooking a directory, contents bytes were fetched
sequentially which could take a good amount of time for large
directories.

In order to speedup the cooking process, retrieve the contents bytes
in parallel with the help of the concurrent.futures module from the
Python standard library which fits particularly well for making loops
of I/O-bound tasks concurrent and for issuing tasks asynchronously.
parent dd4c7199
No related branches found
No related tags found
No related merge requests found
......@@ -22,7 +22,9 @@ class DirectoryCooker(BaseVaultCooker):
def prepare_bundle(self):
with tempfile.TemporaryDirectory(prefix="tmp-vault-directory-") as td:
directory_builder = DirectoryBuilder(self.storage, td.encode(), self.obj_id)
directory_builder = DirectoryBuilder(
self.storage, td.encode(), self.obj_id, self.thread_pool_size
)
directory_builder.build()
with tarfile.open(fileobj=self.fileobj, mode="w:gz") as tar:
tar.add(td, arcname=str(self.swhid))
......@@ -30,7 +30,10 @@ class RevisionFlatCooker(BaseVaultCooker):
revdir = root / hashutil.hash_to_hex(revision["id"])
revdir.mkdir()
directory_builder = DirectoryBuilder(
self.storage, str(revdir).encode(), revision["directory"]
self.storage,
str(revdir).encode(),
revision["directory"],
self.thread_pool_size,
)
directory_builder.build()
with tarfile.open(fileobj=self.fileobj, mode="w:gz") as tar:
......
# Copyright (C) 2017-2019 The Software Heritage developers
# Copyright (C) 2017-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -22,7 +22,7 @@ from swh.model.swhids import ObjectType
from swh.model.toposort import toposort
from swh.vault.cookers.base import BaseVaultCooker
from swh.vault.cookers.utils import revision_log
from swh.vault.to_disk import get_filtered_files_content
from swh.vault.to_disk import get_filtered_file_content
class RevisionGitfastCooker(BaseVaultCooker):
......@@ -82,9 +82,8 @@ class RevisionGitfastCooker(BaseVaultCooker):
obj_id = file_data["sha1"]
if obj_id in self.obj_done:
return
contents = list(get_filtered_files_content(self.storage, [file_data]))
content = contents[0]["content"]
self.write_cmd(BlobCommand(mark=self.mark(obj_id), data=content))
content = get_filtered_file_content(self.storage, file_data)
self.write_cmd(BlobCommand(mark=self.mark(obj_id), data=content["content"]))
self.obj_done.add(obj_id)
def _author_tuple_format(self, author, date):
......
# Copyright (C) 2020-2022 The Software Heritage developers
# Copyright (C) 2020-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -7,7 +7,7 @@ import pytest
from swh.model.from_disk import DentryPerms
from swh.model.model import Content, Directory, DirectoryEntry, SkippedContent
from swh.vault.to_disk import DirectoryBuilder, get_filtered_files_content
from swh.vault.to_disk import DirectoryBuilder, get_filtered_file_content
def test_get_filtered_files_content(swh_storage):
......@@ -37,7 +37,9 @@ def test_get_filtered_files_content(swh_storage):
},
]
res = list(get_filtered_files_content(swh_storage, files_data))
res = [
get_filtered_file_content(swh_storage, file_data) for file_data in files_data
]
assert res == [
{
......@@ -76,7 +78,7 @@ def test_get_filtered_files_content__unknown_status(swh_storage):
]
with pytest.raises(AssertionError, match="unexpected status 'blah'"):
list(get_filtered_files_content(swh_storage, files_data))
[get_filtered_file_content(swh_storage, file_data) for file_data in files_data]
def _fill_storage(swh_storage, exclude_cnt3=False, exclude_dir1=False):
......
# Copyright (C) 2016-2020 The Software Heritage developers
# Copyright (C) 2016-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import collections
import functools
import concurrent
import os
from typing import Any, Dict, Iterator, List
from typing import Any, Dict, List
from swh.model import hashutil
from swh.model.from_disk import DentryPerms, mode_to_perms
......@@ -26,18 +26,18 @@ SKIPPED_MESSAGE = (
HIDDEN_MESSAGE = b"This content is hidden."
def get_filtered_files_content(
storage: StorageInterface, files_data: List[Dict]
) -> Iterator[Dict[str, Any]]:
"""Retrieve the files specified by files_data and apply filters for skipped
and missing contents.
def get_filtered_file_content(
storage: StorageInterface, file_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Retrieve the file specified by file_data and apply filters for skipped
and missing content.
Args:
storage: the storage from which to retrieve the objects
files_data: list of file entries as returned by directory_ls()
file_data: a file entry as returned by directory_ls()
Yields:
The entries given in files_data with a new 'content' key that points to
Returns:
The entry given in file_data with a new 'content' key that points to
the file content in bytes.
The contents can be replaced by a specific message to indicate that
......@@ -45,43 +45,42 @@ def get_filtered_files_content(
their sizes were too big for us to archive it).
"""
for file_data in files_data:
status = file_data["status"]
if status == "visible":
hashes: HashDict = {
"sha1": file_data["sha1"],
"sha1_git": file_data["sha1_git"],
}
data = storage.content_get_data(hashes)
if data is None:
content = SKIPPED_MESSAGE
else:
content = data
elif status == "absent":
status = file_data["status"]
if status == "visible":
hashes: HashDict = {
"sha1": file_data["sha1"],
"sha1_git": file_data["sha1_git"],
}
data = storage.content_get_data(hashes)
if data is None:
content = SKIPPED_MESSAGE
elif status == "hidden":
content = HIDDEN_MESSAGE
elif status is None:
content = MISSING_MESSAGE
else:
assert False, (
f"unexpected status {status!r} "
f"for content {hashutil.hash_to_hex(file_data['target'])}"
)
yield {"content": content, **file_data}
def apply_chunked(func, input_list, chunk_size):
"""Apply func on input_list divided in chunks of size chunk_size"""
for i in range(0, len(input_list), chunk_size):
yield from func(input_list[i : i + chunk_size])
content = data
elif status == "absent":
content = SKIPPED_MESSAGE
elif status == "hidden":
content = HIDDEN_MESSAGE
elif status is None:
content = MISSING_MESSAGE
else:
assert False, (
f"unexpected status {status!r} "
f"for content {hashutil.hash_to_hex(file_data['target'])}"
)
return {"content": content, **file_data}
class DirectoryBuilder:
"""Reconstructs the on-disk representation of a directory in the storage."""
def __init__(self, storage: StorageInterface, root: bytes, dir_id: bytes):
def __init__(
self,
storage: StorageInterface,
root: bytes,
dir_id: bytes,
thread_pool_size: int = 10,
):
"""Initialize the directory builder.
Args:
......@@ -92,6 +91,7 @@ class DirectoryBuilder:
self.storage = storage
self.root = root
self.dir_id = dir_id
self.thread_pool_size = thread_pool_size
def build(self) -> None:
"""Perform the reconstruction of the directory in the given root."""
......@@ -121,13 +121,18 @@ class DirectoryBuilder:
def _create_files(self, files_data: List[Dict[str, Any]]) -> None:
"""Create the files in the tree and fetch their contents."""
f = functools.partial(get_filtered_files_content, self.storage)
files_data = apply_chunked(f, files_data, 1000)
for file_data in files_data:
def worker(file_data: Dict[str, Any]) -> None:
file_data = get_filtered_file_content(self.storage, file_data)
path = os.path.join(self.root, file_data["path"])
self._create_file(path, file_data["content"], file_data["perms"])
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=min(self.thread_pool_size, len(files_data) + 1)
)
futures = [executor.submit(worker, file_data) for file_data in files_data]
concurrent.futures.wait(futures)
def _create_revisions(self, revs_data: List[Dict[str, Any]]) -> None:
"""Create the revisions in the tree as broken symlinks to the target
identifier."""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment