Skip to content
Snippets Groups Projects
Verified Commit dbf7f3dc authored by Antoine R. Dumont's avatar Antoine R. Dumont
Browse files

Add Directory Loader to allow tarball ingestion as Directory

In some marginal listing cases (Nix or Guix for now), we can receive raw tarball to
ingest. This commit adds a loader to ingest those. The output of the ingestion is a
snapshot with 1 branch, one HEAD branch targetting the ingested directory (contained
within the tarball).

This expects to receive a mandatory 'integrity' field. It is used to check the tarball
received out of the origin.

This can also optionally receive a list of mirror urls in case the main origin url is no
longer available. Those mirror urls are solely used as fallback to retrieve the tarball.

Related to T3781
parent 5482a48e
No related branches found
No related tags found
1 merge request!436Add Directory Loader to allow tarball ingestion as Directory
......@@ -8,17 +8,21 @@ import datetime
import hashlib
import logging
import os
import tempfile
import time
from typing import Any, ContextManager, Dict, Iterable, List, Optional, Union
from urllib.parse import urlparse
from requests.exceptions import HTTPError
import sentry_sdk
from swh.core.config import load_from_envvar
from swh.core.statsd import Statsd
from swh.core.tarball import uncompress
from swh.loader.core.metadata_fetchers import CredentialsType, get_fetchers_for_lister
from swh.loader.exception import NotFound
from swh.loader.package.utils import get_url_body
from swh.loader.package.utils import download, get_url_body
from swh.model import from_disk
from swh.model.model import (
BaseContent,
Content,
......@@ -648,15 +652,47 @@ class DVCSLoader(BaseLoader):
self.loaded_snapshot_id = snapshot.id
class ContentLoader(BaseLoader):
"""Basic loader for edge case content ingestion.
class NodeLoader(BaseLoader):
"""Common class for Content and Directory loaders.
The "integrity" field is a normalized information about the checksum used and the
corresponding base64 hash encoded value of the content.
corresponding base64 hash encoded value of the object retrieved (content or
directory).
The multiple "fallback" urls received are mirror urls so no need to keep those. We
only use them to fetch the actual object if the main origin is no longer available.
"""
def __init__(
self, *args, integrity: str, fallback_urls: List[str] = None, **kwargs
):
super().__init__(*args, **kwargs)
self.snapshot: Optional[Snapshot] = None
# Determine the content checksum stored in the integrity field
# hash-<b64-encoded-checksum>
# https://w3c.github.io/webappsec-subresource-integrity/#grammardef-hash-algo
self.checksum_algo, checksum_value_b64 = integrity.split("-")
self.expected_checksum: bytes = base64.decodebytes(checksum_value_b64.encode())
fallback_urls_ = fallback_urls or []
self.mirror_urls: List[str] = [self.origin.url, *fallback_urls_]
def prepare(self) -> None:
self.last_snapshot = snapshot_get_latest(self.storage, self.origin.url)
def load_status(self) -> Dict[str, Any]:
return {
"status": "uneventful"
if self.last_snapshot == self.snapshot
else "eventful"
}
def cleanup(self) -> None:
self.log.debug("cleanup")
The multiple "fallback" urls received for the same content are mirror urls so no
need to keep those. We only use them to fetch the actual content if the main origin
is no longer available.
class ContentLoader(NodeLoader):
"""Basic loader for edge case content ingestion.
The output snapshot is of the form:
......@@ -672,29 +708,14 @@ class ContentLoader(BaseLoader):
visit_type = "content"
def __init__(
self, *args, integrity: str, fallback_urls: List[str] = None, **kwargs
):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fallback_urls = fallback_urls or []
self.integrity: str = integrity
self.content: Optional[Content] = None
self.snapshot: Optional[Snapshot] = None
self.last_snapshot: Optional[Snapshot] = None
def prepare(self) -> None:
self.last_snapshot = snapshot_get_latest(self.storage, self.origin.url)
def fetch_data(self) -> bool:
"""Retrieve the content file as a Content Object"""
urls = {self.origin.url, *self.fallback_urls}
# Determine the content checksum stored in the integrity field
# hash-<b64-encoded-checksum>
# https://w3c.github.io/webappsec-subresource-integrity/#grammardef-hash-algo
hash_algo, hash_value_b64 = self.integrity.split("-")
expected_checksum = base64.decodebytes(hash_value_b64.encode())
data: Optional[bytes] = None
for url in urls:
for url in self.mirror_urls:
url_ = urlparse(url)
self.log.debug(
"prepare; origin_url=%s fallback=%s scheme=%s path=%s",
......@@ -708,8 +729,8 @@ class ContentLoader(BaseLoader):
self.content = Content.from_data(data)
# Ensure content received matched the integrity field received
actual_checksum = self.content.get_hash(hash_algo)
if actual_checksum == expected_checksum:
actual_checksum = self.content.get_hash(self.checksum_algo)
if actual_checksum == self.expected_checksum:
# match, we have found our content to ingest, exit loop
break
# otherwise continue
......@@ -747,12 +768,116 @@ class ContentLoader(BaseLoader):
def visit_status(self):
return "full" if self.content and self.snapshot is not None else "partial"
def load_status(self) -> Dict[str, Any]:
return {
"status": "uneventful"
if self.last_snapshot == self.snapshot
else "eventful"
}
def cleanup(self) -> None:
self.log.debug("cleanup")
class DirectoryLoader(NodeLoader):
"""Basic loader for edge case directory ingestion (through one tarball).
The output snapshot is of the form:
.. code::
id: <bytes>
branches:
HEAD:
target_type: directory
target: <directory-id>
"""
visit_type = "directory"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.directory: Optional[from_disk.Directory] = None
self.cnts: List[Content] = None
self.skipped_cnts: List[SkippedContent] = None
self.dirs: List[Directory] = None
def fetch_data(self) -> bool:
"""Fetch directory as a tarball amongst the self.mirror_urls.
Raises NotFound if no tarball is found
"""
expected_checksum_hashhex = self.expected_checksum.decode("utf-8")
for url in self.mirror_urls:
url_ = urlparse(url)
self.log.debug(
"prepare; origin_url=%s fallback=%s scheme=%s path=%s",
self.origin.url,
url,
url_.scheme,
url_.path,
)
with tempfile.TemporaryDirectory() as tmpdir:
try:
tarball_path, extrinsic_metadata = download(
url,
tmpdir,
# Ensure content received matched the integrity field received
hashes={self.checksum_algo: expected_checksum_hashhex},
extra_request_headers={"Accept-Encoding": "identity"},
)
except ValueError as e:
# Checksum mismatch
self.log.debug("Error: %s", e)
continue
except HTTPError:
self.log.debug(
"Not found %s, continue on next mirror url if any", url
)
# mirror url not found, continue on the next mirror url if any
continue
directory_path = os.path.join(tmpdir, "src")
os.makedirs(directory_path, exist_ok=True)
uncompress(tarball_path, dest=directory_path)
self.log.debug("uncompressed path to directory: %s", directory_path)
self.directory = from_disk.Directory.from_disk(
path=directory_path.encode("utf-8"),
max_content_length=self.max_content_size,
)
# Compute the merkle dag from the top-level directory
self.cnts, self.skipped_cnts, self.dirs = from_disk.iter_directory(
self.directory
)
if self.directory is not None:
return False # no more data to fetch
# if we reach here, we did not find any proper tarball, so consider the origin
# not found
raise NotFound(f"Unknown origin {self.origin.url}.")
def process_data(self) -> bool:
"""Build the snapshot out of the Directory retrieved."""
assert self.directory is not None
# Build the snapshot
self.snapshot = Snapshot(
branches={
b"HEAD": SnapshotBranch(
target=self.directory.hash,
target_type=TargetType.DIRECTORY,
),
}
)
return False # no more data to process
def store_data(self) -> None:
"""Store newly retrieved Content and Snapshot."""
self.log.debug("Number of skipped contents: %s", len(self.skipped_cnts))
self.storage.skipped_content_add(self.skipped_cnts)
self.log.debug("Number of contents: %s", len(self.cnts))
self.storage.content_add(self.cnts)
self.log.debug("Number of directories: %s", len(self.dirs))
self.storage.directory_add(self.dirs)
assert self.snapshot is not None
self.storage.snapshot_add([self.snapshot])
self.loaded_snapshot_id = self.snapshot.id
def visit_status(self):
return "full" if self.directory and self.snapshot is not None else "partial"
File added
......@@ -17,6 +17,7 @@ from swh.loader.core.loader import (
SENTRY_VISIT_TYPE_TAG_NAME,
BaseLoader,
ContentLoader,
DirectoryLoader,
DVCSLoader,
)
from swh.loader.core.metadata_fetchers import MetadataFetcherProtocol
......@@ -313,7 +314,7 @@ def _check_load_failure(
):
"""Check whether a failed load properly logged its exception, and that the
snapshot didn't get referenced in storage"""
assert isinstance(loader, (DVCSLoader, ContentLoader)) # was implicit so far
assert isinstance(loader, (DVCSLoader, ContentLoader, DirectoryLoader))
for record in caplog.records:
if record.levelname != "ERROR":
continue
......@@ -525,7 +526,9 @@ def test_content_loader_missing_field(swh_storage):
def test_content_loader_404(caplog, swh_storage, requests_mock_datadir):
unknown_origin = Origin(f"{CONTENT_MIRROR}/project/asdf/archives/unknown.lisp")
loader = ContentLoader(
swh_storage, unknown_origin.url, integrity="sha256-unusedfornow"
swh_storage,
unknown_origin.url,
integrity=CONTENT_INTEGRITY,
)
result = loader.load()
......@@ -548,7 +551,7 @@ def test_content_loader_404_with_fallback(caplog, swh_storage, requests_mock_dat
swh_storage,
unknown_origin.url,
fallback_urls=[fallback_url_ko],
integrity="sha256-unusedfornow",
integrity=CONTENT_INTEGRITY,
)
result = loader.load()
......@@ -599,3 +602,124 @@ def test_content_loader_ok_simple(swh_storage, requests_mock_datadir):
result2 = loader.load()
assert result2 == {"status": "uneventful"}
DIRECTORY_MIRROR = "https://example.org"
DIRECTORY_URL = f"{DIRECTORY_MIRROR}/archives/dummy-hello.tar.gz"
DIRECTORY_SHA256 = b"7608099df00abcf23ba84f36ce63ad4c8802c3b7d254ddec657488eaf5ffb8d2"
DIRECTORY_INTEGRITY = f"sha256-{base64.encodebytes(DIRECTORY_SHA256).decode()}"
def test_directory_loader_missing_field(swh_storage):
origin = Origin(DIRECTORY_URL)
with pytest.raises(TypeError, match="missing"):
DirectoryLoader(swh_storage, origin.url)
def test_directory_loader_404(caplog, swh_storage, requests_mock_datadir):
unknown_origin = Origin(f"{DIRECTORY_MIRROR}/archives/unknown.tar.gz")
loader = DirectoryLoader(
swh_storage,
unknown_origin.url,
integrity=DIRECTORY_INTEGRITY,
)
result = loader.load()
assert result == {"status": "uneventful"}
_check_load_failure(
caplog,
loader,
NotFound,
"Unknown origin",
status="not_found",
origin=unknown_origin,
)
def test_directory_loader_404_with_fallback(caplog, swh_storage, requests_mock_datadir):
unknown_origin = Origin(f"{DIRECTORY_MIRROR}/archives/unknown.tbz2")
fallback_url_ko = f"{DIRECTORY_MIRROR}/archives/elsewhere-unknown2.tbz2"
loader = DirectoryLoader(
swh_storage,
unknown_origin.url,
fallback_urls=[fallback_url_ko],
integrity=DIRECTORY_INTEGRITY,
)
result = loader.load()
assert result == {"status": "uneventful"}
_check_load_failure(
caplog,
loader,
NotFound,
"Unknown origin",
status="not_found",
origin=unknown_origin,
)
def test_directory_loader_404_with_integrity_check_failure(
caplog, swh_storage, requests_mock_datadir
):
"""Tarball with mismatched integrity is not ingested."""
origin = Origin(DIRECTORY_URL)
directory_hash = DIRECTORY_SHA256.decode().replace("e", "a").encode()
directory_integrity = f"sha256-{base64.encodebytes(directory_hash).decode()}"
loader = DirectoryLoader(
swh_storage,
origin.url,
integrity=directory_integrity, # making the integrity check fail
)
result = loader.load()
assert result == {"status": "uneventful"}
_check_load_failure(
caplog,
loader,
NotFound,
"Unknown origin",
status="not_found",
origin=origin,
)
def test_directory_loader_ok_with_fallback(caplog, swh_storage, requests_mock_datadir):
dead_origin = Origin(f"{DIRECTORY_MIRROR}/dead-origin-url")
fallback_url_ok = DIRECTORY_URL
fallback_url_ko = f"{DIRECTORY_MIRROR}/archives/unknown2.tgz"
loader = DirectoryLoader(
swh_storage,
dead_origin.url,
fallback_urls=[fallback_url_ok, fallback_url_ko],
integrity=DIRECTORY_INTEGRITY,
)
result = loader.load()
assert result == {"status": "eventful"}
def test_directory_loader_ok_simple(swh_storage, requests_mock_datadir):
origin = Origin(DIRECTORY_URL)
loader = DirectoryLoader(
swh_storage,
origin.url,
integrity=DIRECTORY_INTEGRITY,
)
result = loader.load()
assert result == {"status": "eventful"}
visit_status = assert_last_visit_matches(
swh_storage, origin.url, status="full", type="directory"
)
assert visit_status.snapshot is not None
result2 = loader.load()
assert result2 == {"status": "uneventful"}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment