Skip to content
Snippets Groups Projects
Commit a196c85d authored by Kumar Shivendu's avatar Kumar Shivendu
Browse files

feat: Incremental RPM loader implementation

parent 31ab1aa6
No related branches found
No related tags found
1 merge request!344feat: Introduce RPM loader
......@@ -75,6 +75,7 @@ setup(
loader.pypi=swh.loader.package.pypi:register
loader.maven=swh.loader.package.maven:register
loader.rubygems=swh.loader.package.rubygems:register
loader.rpm=swh.loader.package.rpm:register
""",
classifiers=[
"Programming Language :: Python :: 3",
......
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Mapping
def register() -> Mapping[str, Any]:
"""Register the current worker module's definition"""
from .loader import RpmLoader
return {
"task_modules": [f"{__name__}.tasks"],
"loader": RpmLoader,
}
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import logging
from os import path, walk
import string
import subprocess
import tempfile
from typing import Any, Dict, Iterator, List, Mapping, Optional, Sequence, Tuple
import attr
from packaging.version import parse as parse_version
from swh.core.tarball import uncompress
from swh.loader.package.loader import BasePackageInfo, PackageLoader
from swh.loader.package.utils import EMPTY_AUTHOR, release_name
from swh.model import from_disk
from swh.model.model import ObjectType, Release, Sha1Git, TimestampWithTimezone
from swh.storage.interface import StorageInterface
logger = logging.getLogger(__name__)
@attr.s
class RpmPackageInfo(BasePackageInfo):
name = attr.ib(type=str)
intrinsic_version = attr.ib(type=str)
"""Intrinsic version of the package, independent from the distribution (e.g. 1.18.0-5)"""
build_time = attr.ib(type=str, default=None)
"""Build time of the package in iso format. (e.g. 2017-02-10T04:59:31+00:00)"""
EXTID_TYPE = "rpm-sha256"
MANIFEST_FORMAT = string.Template("$name $intrinsic_version $build_time")
@classmethod
def from_metadata(cls, a_metadata: Dict[str, Any], version: str) -> RpmPackageInfo:
filename = a_metadata["url"].split("/")[-1]
assert filename.endswith(".rpm")
return cls(
name=a_metadata["name"], # nginx
url=a_metadata["url"], # url of the .rpm file
filename=filename, # nginx-1.18.0-5.fc34.src.rpm
version=version, # fedora34/everything/1.18.0-5
intrinsic_version=a_metadata["version"], # 1.18.0-5
build_time=a_metadata["buildTime"],
checksums=a_metadata["checksums"],
)
class RpmLoader(PackageLoader[RpmPackageInfo]):
visit_type = "rpm"
def __init__(
self,
storage: StorageInterface,
url: str,
packages: Dict[str, Dict[str, Any]],
**kwargs: Any,
):
"""RPM Loader implementation.
Args:
url: Origin url (e.g. rpm://Fedora/packages/nginx)
packages: versioned packages and associated artifacts, example::
{
'fedora34/everything/1.18.0-5': {
'name': 'nginx',
'version': '1.18.0-5',
'release': 34,
'edition': 'Everything',
'buildTime': '2022-11-01T12:00:55+00:00',
'url': 'https://archives.fedoraproject.org/nginx-1.18.0-5.fc34.src.rpm',
'checksums': {
'sha256': 'ac68fa26886c661b77bfb97bbe234a6c37d36a16c1eca126eabafbfc7fcb',
}
},
# ...
}
"""
super().__init__(storage=storage, url=url, **kwargs)
self.url = url
self.packages = packages
self.tarball_branches: Dict[bytes, Mapping[str, Any]] = {}
def get_versions(self) -> Sequence[str]:
"""Returns the keys of the packages input (e.g. fedora34/everything/1.18.0-5, etc...)"""
return list(sorted(self.packages, key=parse_version))
def get_default_version(self) -> str:
"""Get the latest release version of a rpm package"""
return self.get_versions()[-1]
def get_package_info(self, version: str) -> Iterator[Tuple[str, RpmPackageInfo]]:
yield (
release_name(version),
RpmPackageInfo.from_metadata(self.packages[version], version),
)
def uncompress(
self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], dest: str
) -> str:
rpm_path, _ = dl_artifacts[0]
return extract_rpm_package(rpm_path, dest=dest)
def build_release(
self, p_info: RpmPackageInfo, uncompressed_path: str, directory: Sha1Git
) -> Optional[Release]:
# extract tarballs that might be located in the root directory of the rpm
# package and adds a dedicated branch for it in the snapshot
root, _, files = next(walk(uncompressed_path))
for file in files:
file_path = path.join(root, file)
with tempfile.TemporaryDirectory() as tmpdir:
try:
uncompress(file_path, tmpdir)
except Exception:
# not a tarball
continue
tarball_dir = from_disk.Directory.from_disk(
path=tmpdir.encode("utf-8"),
max_content_length=self.max_content_size,
)
contents, skipped_contents, directories = from_disk.iter_directory(
tarball_dir
)
self.storage.skipped_content_add(skipped_contents)
self.storage.content_add(contents)
self.storage.directory_add(directories)
self.tarball_branches[file.encode()] = {
"target_type": "directory",
"target": tarball_dir.hash,
}
msg = (
f"Synthetic release for Rpm source package {p_info.name} "
f"version {p_info.version}\n"
)
return Release(
name=p_info.intrinsic_version.encode(),
message=msg.encode(),
author=EMPTY_AUTHOR,
date=TimestampWithTimezone.from_iso8601(p_info.build_time),
target=directory,
target_type=ObjectType.DIRECTORY,
synthetic=True,
)
def extra_branches(self) -> Dict[bytes, Mapping[str, Any]]:
return self.tarball_branches
def extract_rpm_package(rpm_path: str, dest: str) -> str:
"""Extracts an RPM package."""
logger.debug("rpm path: %s", rpm_path)
if not path.exists(rpm_path):
raise FileNotFoundError(f"RPM package {rpm_path} not found")
destdir = path.join(dest, "extracted")
logfile = path.join(dest, "extract.log")
logger.debug(
"extract RPM source package %s in %s" % (rpm_path, destdir),
extra={
"swh_type": "rpm_extract",
"swh_rpm": rpm_path,
"swh_destdir": destdir,
},
)
try:
with open(logfile, "w") as stdout:
rpm2cpio = subprocess.Popen(
("rpm2cpio", rpm_path), stdout=subprocess.PIPE, stderr=stdout
)
subprocess.check_call(
("cpio", "-idmv", "-D", destdir),
stdin=rpm2cpio.stdout,
stdout=stdout,
stderr=stdout,
)
rpm2cpio.wait()
except subprocess.CalledProcessError as e:
logdata = open(logfile, "r").read()
raise ValueError(
"rpm2cpio | cpio exited with code %s: %s" % (e.returncode, logdata)
) from None
return destdir
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from celery import shared_task
from swh.loader.package.rpm.loader import RpmLoader
@shared_task(name=__name__ + ".LoadRpm")
def load_rpm(**kwargs):
"""Load LoadRpm package"""
loader = RpmLoader.from_configfile(**kwargs)
return loader.load()
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import tempfile
import pytest
from swh.loader.package.rpm.loader import RpmLoader, extract_rpm_package
from swh.loader.package.utils import EMPTY_AUTHOR, download
from swh.loader.tests import assert_last_visit_matches, check_snapshot, get_stats
from swh.model.hashutil import hash_to_bytes
from swh.model.model import (
ObjectType,
Release,
Snapshot,
SnapshotBranch,
TargetType,
TimestampWithTimezone,
)
ORIGIN = "rpm://Fedora/packages/nginx"
RPM_URL = "https://archives.fedoraproject.org/nginx-1.18.0-5.fc34.src.rpm"
PACKAGES = {
"fedora34/everything/1.18.0-5": {
"name": "nginx",
"version": "1.18.0-5",
"release": 34,
"edition": "Everything",
"buildTime": "2022-11-01T12:00:55+00:00",
"url": RPM_URL,
"checksums": {
"sha256": "ac68fa26886c661b77bfb97bbe234a6c37d36a16c1eca126eabafbfc7fcbece4"
},
}
}
NEW_PACKAGES = {
**PACKAGES,
"fedora35/everything/1.20.0-5": {
# using the same .rpm file but for a new branch
"name": "nginx",
"version": "1.20.0-5",
"release": 35,
"edition": "Everything",
"buildTime": "2022-11-01T12:00:55+00:00",
"url": RPM_URL,
"checksums": {
"sha256": "ac68fa26886c661b77bfb97bbe234a6c37d36a16c1eca126eabafbfc7fcbece4"
},
},
}
@pytest.fixture()
def expected_stats():
return {
"content": 421,
"directory": 40,
"origin": 1,
"origin_visit": 1,
"release": 1,
"revision": 0,
"skipped_content": 0,
"snapshot": 1,
}
snapshot_id = "e3b199390a96f70afe73137f5082e34f0deb4872"
release_id = hash_to_bytes("5aafaa6f753002fc1b87e603c5e42f582f777f6d")
snapshot = Snapshot(
id=hash_to_bytes(snapshot_id),
branches={
b"releases/fedora34/everything/1.18.0-5": SnapshotBranch(
target=release_id,
target_type=TargetType.RELEASE,
),
b"HEAD": SnapshotBranch(
target=hash_to_bytes(
"72656c65617365732f6665646f726133342f65766572797468696e672f312e31382e302d35"
),
target_type=TargetType.ALIAS,
),
b"nginx-1.18.0.tar.gz": SnapshotBranch(
target=hash_to_bytes("b0d583b0c289290294657b4c975b2094b9b6803b"),
target_type=TargetType.DIRECTORY,
),
},
)
release = Release(
id=release_id,
name=b"1.18.0-5",
author=EMPTY_AUTHOR,
date=TimestampWithTimezone.from_iso8601("2022-11-01T12:00:55+00:00"),
message=(
b"Synthetic release for Rpm source package "
b"nginx version fedora34/everything/1.18.0-5\n"
),
target=hash_to_bytes("044965ae8affff6fd0bcb908bb345e626ca99ef6"),
target_type=ObjectType.DIRECTORY,
synthetic=True,
)
new_snapshot_id = "ec0c636be12a8dd26e9697ea79b30e7ef43f5ca7"
new_release_id = hash_to_bytes("4a554d436472947f0e325f0b24140c9616645a25")
new_snapshot = Snapshot(
id=hash_to_bytes(new_snapshot_id),
branches={
b"releases/fedora34/everything/1.18.0-5": SnapshotBranch(
target=release_id,
target_type=TargetType.RELEASE,
),
b"releases/fedora35/everything/1.20.0-5": SnapshotBranch(
target=new_release_id,
target_type=TargetType.RELEASE,
),
b"HEAD": SnapshotBranch(
target=hash_to_bytes(
"72656c65617365732f6665646f726133352f65766572797468696e672f312e32302e302d35"
),
target_type=TargetType.ALIAS,
),
b"nginx-1.18.0.tar.gz": SnapshotBranch(
target=hash_to_bytes("b0d583b0c289290294657b4c975b2094b9b6803b"),
target_type=TargetType.DIRECTORY,
),
},
)
new_release = Release(
id=new_release_id,
name=b"1.20.0-5",
author=EMPTY_AUTHOR,
date=TimestampWithTimezone.from_iso8601("2022-11-01T12:00:55+00:00"),
message=(
b"Synthetic release for Rpm source package "
b"nginx version fedora35/everything/1.20.0-5\n"
),
target=hash_to_bytes("044965ae8affff6fd0bcb908bb345e626ca99ef6"),
target_type=ObjectType.DIRECTORY,
synthetic=True,
)
def test_download_and_extract_rpm_package(requests_mock_datadir):
rpm_url = RPM_URL
with tempfile.TemporaryDirectory() as tmpdir:
rpm_path, _ = download(rpm_url, tmpdir)
extract_rpm_package(rpm_path, tmpdir)
# .spec and .tar.gz should be extracted from .rpm
assert os.path.exists(f"{tmpdir}/extracted/nginx.spec")
assert os.path.exists(f"{tmpdir}/extracted/nginx-1.18.0.tar.gz")
with open(f"{tmpdir}/extract.log", "r") as f:
logs = f.read()
assert logs.startswith("404.html")
def test_extract_non_rpm_package(requests_mock_datadir):
rpm_url = RPM_URL
with tempfile.TemporaryDirectory() as tmpdir:
rpm_path, _ = download(rpm_url, tmpdir)
extract_rpm_package(rpm_path, tmpdir)
with pytest.raises(ValueError):
extract_rpm_package(f"{tmpdir}/extracted/nginx.spec", tmpdir)
def test_extract_non_existent_rpm_package():
with tempfile.TemporaryDirectory() as tmpdir:
with pytest.raises(FileNotFoundError) as e:
extract_rpm_package(f"{tmpdir}/non-existent.src.rpm", tmpdir)
assert f"RPM package {tmpdir}/non-existent.src.rpm not found" in str(e)
def assert_stored(swh_storage, release: Release, snapshot: Snapshot, stats: dict):
assert_last_visit_matches(
swh_storage,
ORIGIN,
status="full",
type="rpm",
snapshot=hash_to_bytes(snapshot.id),
)
check_snapshot(snapshot, swh_storage)
assert swh_storage.release_get([release.id])[0] == release
assert get_stats(swh_storage) == stats
def test_rpm_first_visit(swh_storage, requests_mock_datadir, expected_stats):
loader = RpmLoader(swh_storage, ORIGIN, packages=PACKAGES)
actual_load_status = loader.load()
assert actual_load_status == {"status": "eventful", "snapshot_id": snapshot_id}
assert [m.url for m in requests_mock_datadir.request_history] == [RPM_URL]
assert_stored(swh_storage, release, snapshot, expected_stats)
def test_rpm_multiple_visits(swh_storage, requests_mock_datadir, expected_stats):
loader = RpmLoader(swh_storage, ORIGIN, packages=PACKAGES)
# First run: Discovered exactly 1 package
load_status = loader.load()
assert load_status == {"status": "eventful", "snapshot_id": snapshot_id}
# Second run: No updates
load_status = loader.load()
expected_stats["origin_visit"] += 1 # a new visit occurred but no new snapshot
assert load_status == {"status": "uneventful", "snapshot_id": snapshot_id}
assert [m.url for m in requests_mock_datadir.request_history] == [RPM_URL]
assert_stored(swh_storage, release, snapshot, expected_stats)
# Third run: New release (Updated snapshot)
loader.packages = NEW_PACKAGES
load_status = loader.load()
expected_stats["origin_visit"] += 1 # same rpm:// origin
expected_stats["release"] += 1 # new release (1.20.0-5)
expected_stats["snapshot"] += 1 # updated metadata (`packages` param)
assert load_status == {"status": "eventful", "snapshot_id": new_snapshot_id}
assert [m.url for m in requests_mock_datadir.request_history] == [RPM_URL, RPM_URL]
assert_stored(swh_storage, new_release, new_snapshot, expected_stats)
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import uuid
import pytest
from swh.scheduler.model import ListedOrigin, Lister
NAMESPACE = "swh.loader.package.rpm"
RPM_ORIGIN_URL = "https://src.fedoraproject.org/rpms/0xFFFF"
RPM_PACKAGES = {
"fedora36/everything/0.10-4": {
"name": "0xFFFF",
"version": "0.10-4",
"release": 36,
"edition": "Everything",
"buildTime": "2022-01-19T19:13:53+00:00",
"url": (
"https://archives.fedoraproject.org/pub/archive/fedora/linux/releases/"
"36/Everything/source/tree/Packages/0/0xFFFF-0.10-4.fc36.src.rpm"
),
"checksums": {
"sha256": "45eee8d990d502324ae665233c320b8a5469c25d735f1862e094c1878d6ff2cd"
},
}
}
@pytest.fixture
def fedora_lister():
return Lister(name="fedora", instance_name="fedora", id=uuid.uuid4())
@pytest.fixture
def fedora_listed_origin(fedora_lister):
return ListedOrigin(
lister_id=fedora_lister.id,
url=RPM_ORIGIN_URL,
visit_type="rpm",
extra_loader_arguments={
"packages": RPM_PACKAGES,
},
)
def test_rpm_loader_task_for_listed_origin(
loading_task_creation_for_listed_origin_test,
fedora_lister,
fedora_listed_origin,
):
loading_task_creation_for_listed_origin_test(
loader_class_name=f"{NAMESPACE}.loader.RpmLoader",
task_function_name=f"{NAMESPACE}.tasks.LoadRpm",
lister=fedora_lister,
listed_origin=fedora_listed_origin,
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment