Skip to content
Snippets Groups Projects
Commit deea8a95 authored by Antoine Lambert's avatar Antoine Lambert Committed by Antoine Lambert
Browse files

deposit: Handle download and aggregation of uploaded tarballs

Instead of using the deposit API raw private endpoint to get
aggregated version of tarballs uploaded with a deposit, let the
loader handle this possibly costly operation by using the new
deposit API upload-urls private endpoint returning URLs for
downloading uploaded tarballs.

Fixes swh-deposit#4657.
parent 6a604da4
No related branches found
No related tags found
1 merge request!542deposit: Handle download and aggregation of uploaded tarballs
Pipeline #13578 passed
Showing
with 178 additions and 21 deletions
# Copyright (C) 2019-2024 The Software Heritage developers
# Copyright (C) 2019-2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -8,22 +8,26 @@ from datetime import timezone
from functools import lru_cache
import json
import logging
import os
import re
import shutil
from typing import Any, Dict, Iterator, List, Mapping, Optional, Sequence, Tuple, Union
from urllib.parse import urlparse
import attr
import requests
import sentry_sdk
from swh.core import tarball
from swh.core.config import load_from_envvar
from swh.loader.core.loader import DEFAULT_CONFIG
from swh.loader.core.utils import download
from swh.loader.core.utils import DOWNLOAD_HASHES, download
from swh.loader.package.loader import (
BasePackageInfo,
PackageLoader,
RawExtrinsicMetadataCore,
)
from swh.model.hashutil import hash_to_hex
from swh.model.hashutil import MultiHash, hash_to_hex
from swh.model.model import (
MetadataAuthority,
MetadataAuthorityType,
......@@ -67,6 +71,63 @@ def build_branch_name(version: str) -> str:
return f"deposit/{version}"
def aggregate_tarballs(
tmpdir: str, archive_urls: List[str], filename: str
) -> Tuple[str, Mapping]:
"""Aggregate multiple tarballs into one and returns this new archive's
path.
Args:
extraction_dir: Path to use for the tarballs computation
archive_paths: Deposit's archive paths
Returns:
Aggregated archive path (aggregated or not))
"""
download_tarball_rootdir = os.path.join(tmpdir, "download")
os.makedirs(download_tarball_rootdir, exist_ok=True)
if len(archive_urls) > 1:
# root folder to build an aggregated tarball
aggregated_tarball_rootdir = os.path.join(tmpdir, "aggregate")
download_tarball_rootdir = os.path.join(tmpdir, "download")
os.makedirs(aggregated_tarball_rootdir, exist_ok=True)
os.makedirs(download_tarball_rootdir, exist_ok=True)
# uncompress in a temporary location all client's deposit archives
for archive_url in archive_urls:
parsed_archive_url = urlparse(archive_url)
archive_name = os.path.basename(parsed_archive_url.path)
archive_path = os.path.join(download_tarball_rootdir, archive_name)
download(archive_url, download_tarball_rootdir)
tarball.uncompress(archive_path, aggregated_tarball_rootdir)
# Aggregate into one big tarball the multiple smaller ones
temp_tarpath = shutil.make_archive(
aggregated_tarball_rootdir, "tar", aggregated_tarball_rootdir
)
# can already clean up temporary directory
shutil.rmtree(aggregated_tarball_rootdir)
h = MultiHash(hash_names=DOWNLOAD_HASHES)
with open(temp_tarpath, "rb") as f:
h.update(f.read())
computed_hashes = h.hexdigest()
length = computed_hashes.pop("length")
extrinsic_metadata = {
"length": length,
"filename": filename,
"checksums": computed_hashes,
"url": ",".join(archive_urls),
}
else:
temp_tarpath, extrinsic_metadata = download(
archive_urls[0], download_tarball_rootdir
)
return temp_tarpath, extrinsic_metadata
@attr.s
class DepositPackageInfo(BasePackageInfo):
filename = attr.ib(type=str) # instead of Optional[str]
......@@ -270,7 +331,9 @@ class DepositLoader(PackageLoader[DepositPackageInfo]):
self, p_info: DepositPackageInfo, tmpdir: str
) -> List[Tuple[str, Mapping]]:
"""Override to allow use of the dedicated deposit client"""
return [self.client.archive_get(p_info.id, tmpdir, p_info.filename)]
upload_urls = self.client.upload_urls_get(p_info.id)
assert upload_urls, f"No tarballs were uploaded for deposit {p_info.id}"
return [aggregate_tarballs(tmpdir, upload_urls, p_info.filename)]
def build_release(
self,
......@@ -432,12 +495,24 @@ class ApiClient:
kwargs["auth"] = self.auth
return method_fn(url, *args, **kwargs)
def archive_get(
self, deposit_id: DepositId, tmpdir: str, filename: str
) -> Tuple[str, Dict]:
"""Retrieve deposit's archive artifact locally"""
url = f"{self.base_url}/{deposit_id}/raw/"
return download(url, dest=tmpdir, filename=filename, auth=self.auth)
def upload_urls_get(
self,
deposit_id: DepositId,
) -> List[str]:
"""Return URLs for downloading tarballs uploaded with a deposit request.
Args:
deposit_id: a deposit id
Returns:
A list of URLs
"""
response = self.do("get", f"{self.base_url}/{deposit_id}/upload-urls/")
if not response.ok:
raise ValueError(
f"Problem when retrieving deposit upload URLs at {response.url}"
)
return response.json()
@lru_cache
def metadata_get(self, deposit_id: DepositId) -> Dict[str, Any]:
......
{
"origin": {
"url": "https://hal-test.archives-ouvertes.fr/some-external-id",
"type": "deposit"
},
"raw_metadata" : "<?xml version=\"1.0\"?><entry xmlns=\"http://www.w3.org/2005/Atom\" xmlns:codemeta=\"https://doi.org/10.5063/SCHEMA/CODEMETA-2.0\"><external_identifier>some-external-id</external_identifier><url>https://hal-test.archives-ouvertes.fr/some-external-id</url><codemeta:dateCreated>2017-10-07T15:17:08Z</codemeta:dateCreated><author>some awesome author</author><author>another one</author></entry>",
"provider": {
"provider_name": "hal",
"provider_type": "deposit_client",
"provider_url": "https://hal-test.archives-ouvertes.fr/",
"metadata": null
},
"tool": {
"name": "swh-deposit",
"version": "0.0.1",
"configuration": {
"sword_version": "2"
}
},
"deposit": {
"id": "555",
"client": "hal",
"collection": "hal",
"author": {
"name": "Software Heritage",
"fullname": "Software Heritage",
"email": "robot@softwareheritage.org"
},
"author_date": {
"timestamp": {
"seconds": 1507389428,
"microseconds": 0
},
"offset": 0
},
"committer": {
"name": "Software Heritage",
"fullname": "Software Heritage",
"email": "robot@softwareheritage.org"
},
"committer_date": {
"timestamp": {
"seconds": 1507389428,
"microseconds": 0
},
"offset": 0
},
"revision_parents": [],
"release_notes": null
}
}
["https://deposit.softwareheritage.org/uploads/hello-2.10.zip", "https://deposit.softwareheritage.org/uploads/hello-2.12.tar.gz"]
\ No newline at end of file
hello-2.10.zip
\ No newline at end of file
["https://deposit.softwareheritage.org/uploads/hello-2.10.zip"]
\ No newline at end of file
hello-2.10.zip
\ No newline at end of file
["https://deposit.softwareheritage.org/uploads/hello-2.10.zip"]
\ No newline at end of file
hello-2.12.tar.gz
\ No newline at end of file
["https://deposit.softwareheritage.org/uploads/hello-2.12.tar.gz"]
\ No newline at end of file
hello-2.10.zip
\ No newline at end of file
["https://deposit.softwareheritage.org/uploads/hello-2.10.zip"]
\ No newline at end of file
# Copyright (C) 2019-2024 The Software Heritage developers
# Copyright (C) 2019-2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -75,7 +75,6 @@ def test_deposit_loading_unknown_deposit(
no origin, no visit, no snapshot
"""
# private api url form: 'https://deposit.s.o/1/private/hal/666/raw/'
url = "some-url"
unknown_deposit_id = 667
loader = DepositLoader(
......@@ -103,10 +102,10 @@ def test_deposit_loading_unknown_deposit(
} == stats
NOT_FOUND_UPLOAD_URL = "https://deposit.softwareheritage.org/uploads/hello-2.10.zip"
requests_mock_datadir_missing_one = requests_mock_datadir_factory(
ignore_urls=[
f"{DEPOSIT_URL}/666/raw/",
]
ignore_urls=[NOT_FOUND_UPLOAD_URL]
)
......@@ -114,7 +113,7 @@ def test_deposit_loading_failure_to_retrieve_1_artifact(
swh_storage, deposit_client, requests_mock_datadir_missing_one, requests_mock
):
"""Deposit with missing artifact ends up with an uneventful/partial visit"""
# private api url form: 'https://deposit.s.o/1/private/hal/666/raw/'
url = "some-url-2"
deposit_id = 666
requests_mock_datadir_missing_one.put(re.compile("https"))
......@@ -161,7 +160,7 @@ def test_deposit_loading_failure_to_retrieve_1_artifact(
"status_detail": {
"loading": [
"Failed to load branch deposit/1 for some-url-2: 404 Client Error: None "
"for url: https://deposit.softwareheritage.org/1/private/666/raw/"
f"for url: {NOT_FOUND_UPLOAD_URL}"
]
},
}
......@@ -702,3 +701,34 @@ def test_generate_branch_name_uniqueness(swh_storage, deposit_client):
assert loader.generate_branch_name("A") == "deposit/a"
assert loader.generate_branch_name("a") == "deposit/a/1"
assert loader.generate_branch_name("a$") == "deposit/a/2"
def test_deposit_loading_ok_aggregate_tarballs(
swh_storage, deposit_client, requests_mock_datadir
):
"""Check that multiple tarballs uploaded with a deposit request are
aggregated into a single one by the loader.
"""
external_id = "hal-123456"
url = f"https://hal-test.archives-ouvertes.fr/{external_id}"
deposit_id = 555
releases = [{"id": deposit_id, "software_version": "1", "origin_url": url}]
requests_mock_datadir.get(f"{DEPOSIT_URL}/{deposit_id}/releases/", json=releases)
loader = DepositLoader(swh_storage, url, deposit_id, deposit_client)
actual_load_status = loader.load()
expected_snapshot_id = "498ba94959ea0591690821c3bf74b7bed745e6eb"
assert actual_load_status == {
"status": "eventful",
"snapshot_id": expected_snapshot_id,
}
assert_last_visit_matches(
loader.storage,
url,
status="full",
type="deposit",
snapshot=hash_to_bytes(expected_snapshot_id),
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment