Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • anlambert/swh-vault
  • lunar/swh-vault
  • swh/devel/swh-vault
  • douardda/swh-vault
  • olasd/swh-vault
  • marmoute/swh-vault
  • rboyer/swh-vault
7 results
Show changes
Showing
with 537 additions and 423 deletions
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
# Copyright (C) 2018-2022 The Software Heritage developers
# Copyright (C) 2018-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import importlib
import logging
from typing import Dict
import warnings
logger = logging.getLogger(__name__)
from typing import TYPE_CHECKING, Any, Dict
if TYPE_CHECKING:
from .interface import VaultInterface
BACKEND_TYPES: Dict[str, str] = {
"remote": ".api.client.RemoteVaultClient",
"postgresql": ".backend.VaultBackend",
"memory": ".in_memory_backend.InMemoryVaultBackend",
# deprecated
"local": ".backend.VaultBackend",
}
logger = logging.getLogger(__name__)
def get_vault(cls: str = "remote", **kwargs):
def get_vault(cls: str, **kwargs) -> "VaultInterface":
"""
Get a vault object of class `vault_class` with arguments
`vault_args`.
Args:
cls: vault's class, either 'remote' or 'local'
cls: vault's class
kwargs: arguments to pass to the class' constructor
Returns:
an instance of VaultBackend (either local or remote)
an instance of VaultBackend
Raises:
ValueError if passed an unknown storage class.
"""
if "args" in kwargs:
warnings.warn(
'Explicit "args" key is deprecated, use keys directly instead.',
DeprecationWarning,
)
kwargs = kwargs["args"]
class_path = BACKEND_TYPES.get(cls)
if class_path is None:
raise ValueError(
f"Unknown Vault class `{cls}`. " f"Supported: {', '.join(BACKEND_TYPES)}"
)
(module_path, class_name) = class_path.rsplit(".", 1)
module = importlib.import_module(module_path, package=__package__)
Vault = getattr(module, class_name)
from swh.core.config import get_swh_backend_module
_, Vault = get_swh_backend_module("vault", cls)
assert Vault is not None
return Vault(**kwargs)
get_datastore = get_vault
default_cfg = {
"default_interval": "1 day",
"min_interval": "1 day",
"max_interval": "1 day",
"backoff_factor": 1,
"max_queue_length": 10000,
}
def register_tasks() -> Dict[str, Any]:
return {
"task_modules": [f"{__name__}.cooking_tasks"],
"task_types": {
"vault-cook-bundle": default_cfg,
"vault-batch-cook-bundle": default_cfg,
},
}
# Copyright (C) 2016-2022 The Software Heritage developers
# Copyright (C) 2016-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -11,7 +11,7 @@ from typing import Any, Dict, Optional
from swh.core.api import RPCServerApp
from swh.core.api import encode_data_server as encode_data
from swh.core.api import error_handler
from swh.core.config import config_basepath, merge_configs, read_raw_config
from swh.core.config import merge_configs, read_raw_config
from swh.vault import get_vault as get_swhvault
from swh.vault.backend import NotFoundExc
from swh.vault.interface import VaultInterface
......@@ -79,14 +79,11 @@ def check_config(cfg: Dict[str, Any]) -> Dict[str, Any]:
raise ValueError("missing 'vault' configuration")
vcfg = cfg["vault"]
if vcfg["cls"] not in ("local", "postgresql"):
if vcfg["cls"] == "remote":
raise EnvironmentError(
"The vault backend can only be started with a 'postgresql' configuration",
"The vault backend of a vault server cannot be a 'remote' configuration"
)
# TODO: Soft-deprecation of args key. Remove when ready.
vcfg.update(vcfg.get("args", {}))
# Default to top-level value if any
vcfg = {**cfg, **vcfg}
......@@ -110,7 +107,7 @@ def make_app_from_configfile(
if not os.path.isfile(config_path):
raise ValueError(f"Configuration path {config_path} should exist.")
app_config = read_raw_config(config_basepath(config_path))
app_config = read_raw_config(config_path)
app_config["vault"] = check_config(app_config)
app.config.update(merge_configs(DEFAULT_CONFIG, app_config))
......
# Copyright (C) 2017-2022 The Software Heritage developers
# Copyright (C) 2017-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import collections
from datetime import timedelta
from email.mime.text import MIMEText
import logging
import smtplib
from typing import Any, Dict, List, Optional, Tuple
import psycopg2.extras
import psycopg2.pool
from psycopg.rows import dict_row
import psycopg_pool
import sentry_sdk
from swh.core.db import BaseDb
from swh.core.db.common import db_transaction
from swh.model.swhids import CoreSWHID
from swh.scheduler import get_scheduler
from swh.scheduler.utils import create_oneshot_task_dict
from swh.scheduler.utils import create_oneshot_task
from swh.storage import get_storage
from swh.vault.cache import VaultCache
from swh.vault.cookers import COOKER_TYPES, get_cooker_cls
......@@ -66,18 +67,15 @@ The Software Heritage Developers
"""
class VaultBackend:
class VaultDB:
"""
Backend for the Software Heritage Vault.
PostgreSQL backend for the Software Heritage Vault.
"""
current_version = 4
def __init__(self, **config):
self.config = config
self.cache = VaultCache(**config["cache"])
self.scheduler = get_scheduler(**config["scheduler"])
self.storage = get_storage(**config["storage"])
if "db" not in self.config:
raise ValueError(
......@@ -85,11 +83,11 @@ class VaultBackend:
"in the vault configuration file"
)
db_conn = config["db"]
self._pool = psycopg2.pool.ThreadedConnectionPool(
config.get("min_pool_conns", 1),
config.get("max_pool_conns", 10),
db_conn,
cursor_factory=psycopg2.extras.RealDictCursor,
self._pool = psycopg_pool.ConnectionPool(
conninfo=db_conn,
min_size=config.get("min_pool_conns", 1),
max_size=config.get("max_pool_conns", 10),
kwargs={"row_factory": dict_row},
)
self._db = None
......@@ -102,6 +100,18 @@ class VaultBackend:
if db is not self._db:
db.put_conn()
class VaultBackend(VaultDB):
"""
Backend for the Software Heritage Vault.
"""
def __init__(self, **config):
super().__init__(**config)
self.cache = VaultCache(**config["cache"])
self.scheduler = get_scheduler(**config["scheduler"])
self.storage = get_storage(**config["storage"])
@db_transaction()
def progress(
self,
......@@ -129,9 +139,9 @@ class VaultBackend:
def _send_task(self, bundle_type: str, swhid: CoreSWHID):
"""Send a cooking task to the celery scheduler"""
task = create_oneshot_task_dict("cook-vault-bundle", bundle_type, str(swhid))
task = create_oneshot_task("cook-vault-bundle", bundle_type, str(swhid))
added_tasks = self.scheduler.create_tasks([task])
return added_tasks[0]["id"]
return added_tasks[0].id
@db_transaction()
def create_task(
......@@ -149,22 +159,13 @@ class VaultBackend:
if not cooker.check_exists():
raise NotFoundExc(f"{bundle_type} {swhid} was not found.")
cur.execute(
"""
INSERT INTO vault_bundle (type, swhid, sticky)
VALUES (%s, %s, %s)""",
(bundle_type, str(swhid), sticky),
)
db.conn.commit()
task_id = self._send_task(bundle_type, swhid)
cur.execute(
"""
UPDATE vault_bundle
SET task_id = %s
WHERE type = %s AND swhid = %s""",
(task_id, bundle_type, str(swhid)),
INSERT INTO vault_bundle (type, swhid, sticky, task_id)
VALUES (%s, %s, %s, %s)""",
(bundle_type, str(swhid), sticky, task_id),
)
@db_transaction()
......@@ -200,8 +201,14 @@ class VaultBackend:
if bundle_type not in COOKER_TYPES:
raise NotFoundExc(f"{bundle_type} is an unknown type.")
# If there's a failed bundle entry, delete it first.
if info is not None and info["task_status"] == "failed":
if info is not None and (
info["task_status"] == "failed"
or (
info["task_status"] == "done"
and not self.cache.is_cached(bundle_type, swhid)
)
):
# If there's a failed bundle entry or bundle no longer in cache, delete it first.
cur.execute(
"DELETE FROM vault_bundle WHERE type = %s AND swhid = %s",
(bundle_type, str(swhid)),
......@@ -229,10 +236,6 @@ class VaultBackend:
def batch_cook(
self, batch: List[Tuple[str, str]], db=None, cur=None
) -> Dict[str, int]:
# Import execute_values at runtime only, because it requires
# psycopg2 >= 2.7 (only available on postgresql servers)
from psycopg2.extras import execute_values
for bundle_type, _ in batch:
if bundle_type not in COOKER_TYPES:
raise NotFoundExc(f"{bundle_type} is an unknown type.")
......@@ -255,11 +258,10 @@ class VaultBackend:
)
# Insert all the bundles, return the new ones
execute_values(
cur,
cur.executemany(
"""
INSERT INTO vault_bundle (type, swhid)
VALUES %s ON CONFLICT DO NOTHING""",
VALUES (%s, %s) ON CONFLICT DO NOTHING""",
batch,
)
......@@ -274,11 +276,10 @@ class VaultBackend:
# Insert the batch-bundle entries
batch_id_bundle_ids = [(batch_id, row["id"]) for row in bundles]
execute_values(
cur,
cur.executemany(
"""
INSERT INTO vault_batch_bundle (batch_id, bundle_id)
VALUES %s ON CONFLICT DO NOTHING""",
VALUES (%s, %s) ON CONFLICT DO NOTHING""",
batch_id_bundle_ids,
)
db.conn.commit()
......@@ -294,25 +295,23 @@ class VaultBackend:
args_batch = [(bundle_type, swhid) for bundle_type, swhid in batch_new]
# TODO: change once the scheduler handles priority tasks
tasks = [
create_oneshot_task_dict("swh-vault-batch-cooking", *args)
for args in args_batch
create_oneshot_task("swh-vault-batch-cooking", *args) for args in args_batch
]
added_tasks = self.scheduler.create_tasks(tasks)
tasks_ids_bundle_ids = [
(task_id, bundle_type, swhid)
for task_id, (bundle_type, swhid) in zip(
[task["id"] for task in added_tasks], batch_new
[task.id for task in added_tasks], batch_new
)
]
# Update the task ids
execute_values(
cur,
cur.executemany(
"""
UPDATE vault_bundle
SET task_id = s_task_id
FROM (VALUES %s) AS sub (s_task_id, s_type, s_swhid)
FROM (VALUES (%s, %s, %s)) AS sub (s_task_id, s_type, s_swhid)
WHERE type = s_type::cook_type AND swhid = s_swhid """,
tasks_ids_bundle_ids,
)
......@@ -370,6 +369,25 @@ class VaultBackend:
self.update_access_ts(bundle_type, swhid, cur=cur)
return self.cache.get(bundle_type, swhid)
@db_transaction()
def download_url(
self,
bundle_type: str,
swhid: CoreSWHID,
content_disposition: Optional[str] = None,
expiry: Optional[timedelta] = None,
raise_notfound=True,
db=None,
cur=None,
) -> Optional[str]:
"""Obtain a bundle direct download link from the cache if supported"""
available = self.is_available(bundle_type, swhid, cur=cur)
if not available:
if raise_notfound:
raise NotFoundExc(f"{bundle_type} {swhid} is not available.")
return None
return self.cache.download_url(bundle_type, swhid, content_disposition, expiry)
@db_transaction()
def update_access_ts(self, bundle_type: str, swhid: CoreSWHID, db=None, cur=None):
"""Update the last access timestamp of a bundle"""
......@@ -450,9 +468,12 @@ class VaultBackend:
# * send this url as part of the cook request and store it in
# the table
# * use this url for the notification e-mail
url = "https://archive.softwareheritage.org/api/1/vault/{}/{}/" "raw".format(
bundle_type.replace("_", "-"), swhid
# UPDATE: for now, let's just retrieve the URL from a config entry, if
# any, so we can use it on mirror instances
base_url = self.config.get("notification", {}).get(
"api_url", "https://archive.softwareheritage.org/api/1"
)
url = f"{base_url}/vault/{bundle_type.replace('_', '-')}/{swhid}/raw"
if status == "done":
text = NOTIF_EMAIL_BODY_SUCCESS.strip()
......@@ -475,7 +496,7 @@ class VaultBackend:
"send_notification called on a '{}' bundle".format(status)
)
msg["From"] = NOTIF_EMAIL_FROM
msg["From"] = self.config.get("notification", {}).get("from", NOTIF_EMAIL_FROM)
msg["To"] = email
self._smtp_send(msg)
......
# Copyright (C) 2016-2022 The Software Heritage developers
# Copyright (C) 2016-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import timedelta
from typing import Optional
from swh.model import hashutil
from swh.model.swhids import CoreSWHID
from swh.objstorage.factory import get_objstorage
......@@ -27,6 +30,16 @@ class VaultCache:
sid = self._get_internal_id(bundle_type, swhid)
return self.objstorage.get(sid)
def download_url(
self,
bundle_type,
swhid: CoreSWHID,
content_disposition: Optional[str] = None,
expiry: Optional[timedelta] = None,
) -> Optional[str]:
sid = self._get_internal_id(bundle_type, swhid)
return self.objstorage.download_url(sid, content_disposition, expiry)
def delete(self, bundle_type, swhid: CoreSWHID):
sid = self._get_internal_id(bundle_type, swhid)
return self.objstorage.delete(sid)
......
......@@ -84,7 +84,7 @@ def cook(
conf = config.read(config_file)
try:
from swh.graph.client import RemoteGraphClient # optional dependency
from swh.graph.http_client import RemoteGraphClient # optional dependency
graph = RemoteGraphClient(**conf["graph"]) if conf.get("graph") else None
except ModuleNotFoundError:
......
......@@ -11,8 +11,8 @@ from typing import Any, Dict, List, Type
from swh.core.config import load_named_config
from swh.core.config import read as read_config
from swh.model.swhids import CoreSWHID, ObjectType
from swh.objstorage.factory import get_objstorage
from swh.storage import get_storage
from swh.vault import get_vault
from swh.vault.cookers.base import DEFAULT_CONFIG, DEFAULT_CONFIG_PATH, BaseVaultCooker
from swh.vault.cookers.directory import DirectoryCooker
from swh.vault.cookers.git_bare import GitBareCooker
......@@ -73,12 +73,10 @@ def check_config(cfg: Dict[str, Any]) -> Dict[str, Any]:
"This vault backend can only be a 'remote' configuration"
)
# TODO: Soft-deprecation of args key. Remove when ready.
vcfg.update(vcfg.get("args", {}))
# Default to top-level value if any
if "storage" not in vcfg:
vcfg["storage"] = cfg.get("storage")
for key in ("storage", "objstorage", "graph"):
if key not in vcfg and key in cfg:
vcfg[key] = cfg[key]
if not vcfg.get("storage"):
raise ValueError("invalid configuration: missing 'storage' config entry.")
......@@ -98,6 +96,8 @@ def get_cooker(bundle_type: str, swhid: CoreSWHID):
EnvironmentError in case the vault configuration reference a non remote class.
"""
from swh.vault import get_vault
if "SWH_CONFIG_FILENAME" in os.environ:
cfg = read_config(os.environ["SWH_CONFIG_FILENAME"], DEFAULT_CONFIG)
else:
......@@ -111,7 +111,7 @@ def get_cooker(bundle_type: str, swhid: CoreSWHID):
backend = get_vault(**vcfg)
try:
from swh.graph.client import RemoteGraphClient # optional dependency
from swh.graph.http_client import RemoteGraphClient # optional dependency
graph = RemoteGraphClient(**vcfg["graph"]) if vcfg.get("graph") else None
except ModuleNotFoundError:
......@@ -122,6 +122,11 @@ def get_cooker(bundle_type: str, swhid: CoreSWHID):
else:
graph = None
if vcfg.get("objstorage"):
objstorage = get_objstorage(**vcfg["objstorage"])
else:
objstorage = None
kwargs = {
k: v for (k, v) in cfg.items() if k in ("max_bundle_size", "thread_pool_size")
}
......@@ -130,6 +135,7 @@ def get_cooker(bundle_type: str, swhid: CoreSWHID):
swhid,
backend=backend,
storage=storage,
objstorage=objstorage,
graph=graph,
**kwargs,
)
# Copyright (C) 2016-2018 The Software Heritage developers
# Copyright (C) 2016-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -7,13 +7,14 @@ import abc
import io
import logging
import traceback
from typing import ClassVar, Set
from typing import ClassVar, Optional, Set
from psycopg2.extensions import QueryCanceledError
from psycopg.errors import QueryCanceled
import sentry_sdk
import swh.model.swhids
from swh.model.swhids import CoreSWHID, ObjectType
from swh.objstorage.interface import ObjStorageInterface
from swh.storage.interface import StorageInterface
MAX_BUNDLE_SIZE = 2**29 # 512 MiB
......@@ -71,7 +72,7 @@ class BaseVaultCooker(metaclass=abc.ABCMeta):
backend,
storage: StorageInterface,
graph=None,
objstorage=None,
objstorage: Optional[ObjStorageInterface] = None,
max_bundle_size: int = MAX_BUNDLE_SIZE,
thread_pool_size: int = 10,
):
......@@ -132,7 +133,7 @@ class BaseVaultCooker(metaclass=abc.ABCMeta):
try:
try:
self.prepare_bundle()
except QueryCanceledError:
except QueryCanceled:
raise PolicyError(
"Timeout reached while assembling the requested bundle"
)
......
......@@ -22,7 +22,13 @@ class DirectoryCooker(BaseVaultCooker):
def prepare_bundle(self):
with tempfile.TemporaryDirectory(prefix="tmp-vault-directory-") as td:
directory_builder = DirectoryBuilder(self.storage, td.encode(), self.obj_id)
directory_builder = DirectoryBuilder(
storage=self.storage,
root=td.encode(),
dir_id=self.obj_id,
thread_pool_size=self.thread_pool_size,
objstorage=self.objstorage,
)
directory_builder.build()
with tarfile.open(fileobj=self.fileobj, mode="w:gz") as tar:
tar.add(td, arcname=str(self.swhid))
# Copyright (C) 2021-2022 The Software Heritage developers
# Copyright (C) 2021-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -29,17 +29,17 @@ When swh-graph is not available, steps 1 and 2 are merged, because revisions nee
to be loaded in order to compute the subgraph.
"""
import concurrent
import datetime
import enum
import glob
import logging
import multiprocessing.dummy
import os.path
import re
import subprocess
import tarfile
import tempfile
from typing import Any, Dict, Iterable, Iterator, List, NoReturn, Optional, Set, Tuple
from typing import Any, Dict, Iterable, List, NoReturn, Optional, Set
import zlib
import sentry_sdk
......@@ -48,19 +48,21 @@ from swh.core.api.classes import stream_results_optional
from swh.model import git_objects
from swh.model.hashutil import hash_to_bytehex, hash_to_hex
from swh.model.model import (
Directory,
DirectoryEntry,
Person,
Release,
ReleaseTargetType,
Revision,
RevisionType,
Sha1Git,
Snapshot,
SnapshotBranch,
TargetType,
SnapshotTargetType,
TimestampWithTimezone,
)
from swh.model.model import Content, Directory, DirectoryEntry
from swh.model.model import ObjectType as ModelObjectType
from swh.model.swhids import CoreSWHID, ObjectType
from swh.objstorage.interface import objid_from_dict
from swh.storage.algos.revisions_walker import DFSRevisionsWalker
from swh.storage.algos.snapshot import snapshot_get_all_branches
from swh.vault.cookers.base import BaseVaultCooker
......@@ -103,6 +105,9 @@ class GitBareCooker(BaseVaultCooker):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.obj_type = RootObjectType[self.swhid.object_type.name]
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self.thread_pool_size
)
def check_exists(self) -> bool:
"""Returns whether the root object is present in the archive."""
......@@ -131,7 +136,7 @@ class GitBareCooker(BaseVaultCooker):
stack[-n:] = []
return obj_ids
def prepare_bundle(self):
def prepare_bundle(self) -> None:
"""Main entry point. Initializes the state, creates the bundle, and
sends it to the backend."""
# Objects we will visit soon (aka. "todo-lists"):
......@@ -145,7 +150,7 @@ class GitBareCooker(BaseVaultCooker):
self._walker_state: Optional[Any] = None
# Set of errors we expect git-fsck to raise at the end:
self._expected_fsck_errors = set()
self._expected_fsck_errors: Set[str] = set()
with tempfile.TemporaryDirectory(prefix="swh-vault-gitbare-") as workdir:
# Initialize a Git directory
......@@ -264,20 +269,21 @@ class GitBareCooker(BaseVaultCooker):
return revision.id
def write_refs(self, snapshot=None):
def write_refs(self, snapshot=None) -> None:
"""Writes all files in :file:`.git/refs/`.
For non-snapshot objects, this is only ``master``."""
refs: Dict[bytes, bytes] # ref name -> target
if self.obj_type == RootObjectType.DIRECTORY:
if self.obj_type is RootObjectType.DIRECTORY:
# We need a synthetic revision pointing to the directory
rev_id = self._make_stub_directory_revision(self.obj_id)
refs = {b"refs/heads/master": hash_to_bytehex(rev_id)}
elif self.obj_type == RootObjectType.REVISION:
elif self.obj_type is RootObjectType.REVISION:
refs = {b"refs/heads/master": hash_to_bytehex(self.obj_id)}
elif self.obj_type == RootObjectType.RELEASE:
elif self.obj_type is RootObjectType.RELEASE:
(release,) = self.storage.release_get([self.obj_id])
assert release, self.obj_id
if release.name and re.match(rb"^[a-zA-Z0-9_.-]+$", release.name):
release_name = release.name
......@@ -288,17 +294,17 @@ class GitBareCooker(BaseVaultCooker):
b"refs/tags/" + release_name: hash_to_bytehex(self.obj_id),
}
if release.target_type.value == ModelObjectType.REVISION:
if release.target_type.value == ReleaseTargetType.REVISION.value:
# Not necessary, but makes it easier to browse
refs[b"ref/heads/master"] = hash_to_bytehex(release.target)
refs[b"refs/heads/master"] = hash_to_bytehex(release.target)
# TODO: synthesize a master branch for other target types
elif self.obj_type == RootObjectType.SNAPSHOT:
elif self.obj_type is RootObjectType.SNAPSHOT:
if snapshot is None:
# refs were already written in a previous step
return
branches = []
for (branch_name, branch) in snapshot.branches.items():
for branch_name, branch in snapshot.branches.items():
if branch is None:
logging.error(
"%s has dangling branch: %r", snapshot.swhid(), branch_name
......@@ -308,7 +314,7 @@ class GitBareCooker(BaseVaultCooker):
refs = {
branch_name: (
b"ref: " + branch.target
if branch.target_type == TargetType.ALIAS
if branch.target_type == SnapshotTargetType.ALIAS
else hash_to_bytehex(branch.target)
)
for (branch_name, branch) in branches
......@@ -316,7 +322,7 @@ class GitBareCooker(BaseVaultCooker):
else:
assert_never(self.obj_type, f"Unexpected root object type: {self.obj_type}")
for (ref_name, ref_target) in refs.items():
for ref_name, ref_target in refs.items():
path = os.path.join(self.gitdir.encode(), ref_name)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "wb") as fd:
......@@ -378,8 +384,9 @@ class GitBareCooker(BaseVaultCooker):
def load_objects(self) -> None:
"""Repeatedly loads objects in the todo-lists, until all lists are empty."""
while self._rel_stack or self._rev_stack or self._dir_stack or self._cnt_stack:
futures = []
while self._rel_stack or self._rev_stack or self._dir_stack or self._cnt_stack:
nb_remaining = (
len(self._rel_stack)
+ len(self._rev_stack)
......@@ -413,9 +420,19 @@ class GitBareCooker(BaseVaultCooker):
content_ids = self._pop(self._cnt_stack, CONTENT_BATCH_SIZE)
if content_ids:
self.load_contents(content_ids)
futures += [
self.executor.submit(self.load_content, content_id)
for content_id in content_ids
]
self.nb_loaded += len(content_ids)
self.backend.set_progress(
self.BUNDLE_TYPE,
self.swhid,
"Fetching contents bytes ...",
)
concurrent.futures.wait(futures)
def push_revision_subgraph(self, obj_id: Sha1Git) -> None:
"""Fetches the graph of revisions induced by the given ``obj_id`` and adds
them to ``self._rev_stack``.
......@@ -425,7 +442,7 @@ class GitBareCooker(BaseVaultCooker):
loaded_from_graph = False
if self.graph:
from swh.graph.client import GraphArgumentException
from swh.graph.http_client import GraphArgumentException
# First, try to cook using swh-graph, as it is more efficient than
# swh-storage for querying the history
......@@ -457,13 +474,16 @@ class GitBareCooker(BaseVaultCooker):
# swh-graph, fall back to self.storage.revision_log.
# self.storage.revision_log also gives us the full revisions,
# so we load them right now instead of just pushing them on the stack.
walker = DFSRevisionsWalker(
self.storage, obj_id, state=self._walker_state, ignore_displayname=True
)
for revision in walker:
self.write_revision_node(Revision.from_dict(revision))
walker = DFSRevisionsWalker(self.storage, obj_id, state=self._walker_state)
for rev_d in walker:
if isinstance(rev_d, Revision):
# TODO: Remove this conditional after swh-storage v3.0.0 is released
revision = rev_d
else:
revision = Revision.from_dict(rev_d)
self.write_revision_node(revision)
self.nb_loaded += 1
self._push(self._dir_stack, [revision["directory"]])
self._push(self._dir_stack, [revision.directory])
# Save the state, so the next call to the walker won't return the same
# revisions
self._walker_state = walker.export_state()
......@@ -482,7 +502,7 @@ class GitBareCooker(BaseVaultCooker):
directory_ids = []
content_ids = []
from swh.graph.client import GraphArgumentException
from swh.graph.http_client import GraphArgumentException
# First, try to cook using swh-graph, as it is more efficient than
# swh-storage for querying the history
......@@ -536,19 +556,19 @@ class GitBareCooker(BaseVaultCooker):
logging.warning("Dangling branch: %r", branch)
continue
assert isinstance(branch, SnapshotBranch) # for mypy
if branch.target_type is TargetType.REVISION:
if branch.target_type is SnapshotTargetType.REVISION:
self.push_revision_subgraph(branch.target)
elif branch.target_type is TargetType.RELEASE:
elif branch.target_type is SnapshotTargetType.RELEASE:
self.push_releases_subgraphs([branch.target])
elif branch.target_type is TargetType.ALIAS:
elif branch.target_type is SnapshotTargetType.ALIAS:
# Nothing to do, this for loop also iterates on the target branch
# (if it exists)
pass
elif branch.target_type is TargetType.DIRECTORY:
elif branch.target_type is SnapshotTargetType.DIRECTORY:
self._push(self._dir_stack, [branch.target])
elif branch.target_type is TargetType.CONTENT:
elif branch.target_type is SnapshotTargetType.CONTENT:
self._push(self._cnt_stack, [branch.target])
elif branch.target_type is TargetType.SNAPSHOT:
elif branch.target_type is SnapshotTargetType.SNAPSHOT:
if swhid.object_id != obj_id:
raise NotImplementedError(
f"{swhid} has a snapshot as a branch."
......@@ -563,9 +583,7 @@ class GitBareCooker(BaseVaultCooker):
def load_revisions(self, obj_ids: List[Sha1Git]) -> None:
"""Given a list of revision ids, loads these revisions and their directories;
but not their parent revisions (ie. this is not recursive)."""
ret: List[Optional[Revision]] = self.storage.revision_get(
obj_ids, ignore_displayname=True
)
ret: List[Optional[Revision]] = self.storage.revision_get(obj_ids)
revisions: List[Revision] = list(filter(None, ret))
if len(ret) != len(revisions):
......@@ -582,7 +600,7 @@ class GitBareCooker(BaseVaultCooker):
def load_releases(self, obj_ids: List[Sha1Git]) -> List[Release]:
"""Loads release objects, and returns them."""
ret = self.storage.release_get(obj_ids, ignore_displayname=True)
ret = self.storage.release_get(obj_ids)
releases = list(filter(None, ret))
if len(ret) != len(releases):
......@@ -599,15 +617,15 @@ class GitBareCooker(BaseVaultCooker):
for release in self.load_releases(obj_ids):
self.nb_loaded += 1
assert release.target, "{release.swhid(}) has no target"
if release.target_type is ModelObjectType.REVISION:
if release.target_type is ReleaseTargetType.REVISION:
self.push_revision_subgraph(release.target)
elif release.target_type is ModelObjectType.DIRECTORY:
elif release.target_type is ReleaseTargetType.DIRECTORY:
self._push(self._dir_stack, [release.target])
elif release.target_type is ModelObjectType.CONTENT:
elif release.target_type is ReleaseTargetType.CONTENT:
self._push(self._cnt_stack, [release.target])
elif release.target_type is ModelObjectType.RELEASE:
elif release.target_type is ReleaseTargetType.RELEASE:
self.push_releases_subgraphs([release.target])
elif release.target_type is ModelObjectType.SNAPSHOT:
elif release.target_type is ReleaseTargetType.SNAPSHOT:
raise NotImplementedError(
f"{release.swhid()} targets a snapshot: {release.target!r}"
)
......@@ -628,12 +646,10 @@ class GitBareCooker(BaseVaultCooker):
raw_manifests = self.storage.directory_get_raw_manifest(obj_ids)
with multiprocessing.dummy.Pool(min(self.thread_pool_size, len(obj_ids))) as p:
for _ in p.imap_unordered(
lambda obj_id: self.load_directory(obj_id, raw_manifests.get(obj_id)),
obj_ids,
):
pass
concurrent.futures.wait(
self.executor.submit(self.load_directory, obj_id, raw_manifests.get(obj_id))
for obj_id in obj_ids
)
def load_directory(self, obj_id: Sha1Git, raw_manifest: Optional[bytes]) -> None:
# Load the directory
......@@ -662,49 +678,38 @@ class GitBareCooker(BaseVaultCooker):
if stack is not None:
self._push(stack, [entry.target])
def load_contents(self, obj_ids: List[Sha1Git]) -> None:
def load_content(self, obj_id: Sha1Git) -> None:
# TODO: add support of filtered objects, somehow?
# It's tricky, because, by definition, we can't write a git object with
# the expected hash, so git-fsck *will* choke on it.
contents = self.storage.content_get(obj_ids, "sha1_git")
visible_contents = []
for (obj_id, content) in zip(obj_ids, contents):
if content is None:
# FIXME: this may also happen for missing content
self.write_content(obj_id, SKIPPED_MESSAGE)
self._expect_mismatched_object_error(obj_id)
elif content.status == "visible":
visible_contents.append(content)
elif content.status == "hidden":
self.write_content(obj_id, HIDDEN_MESSAGE)
self._expect_mismatched_object_error(obj_id)
elif content.status == "absent":
assert False, f"content_get returned absent content {content.swhid()}"
content = self.storage.content_get([obj_id], "sha1_git")[0]
if content is None:
# FIXME: this may also happen for missing content
self.write_content(obj_id, SKIPPED_MESSAGE)
self._expect_mismatched_object_error(obj_id)
elif content.status == "visible":
hashes = objid_from_dict(content.hashes())
if self.objstorage is None:
datum = self.storage.content_get_data(hashes)
else:
# TODO: When content.status will have type Literal, replace this with
# assert_never
assert False, f"{content.swhid} has status: {content.status!r}"
contents_and_data: Iterator[Tuple[Content, Optional[bytes]]]
if self.objstorage is None:
contents_and_data = (
(content, self.storage.content_get_data(content.sha1))
for content in visible_contents
)
else:
contents_and_data = zip(
visible_contents,
self.objstorage.get_batch(c.hashes() for c in visible_contents),
)
datum = self.objstorage.get(hashes)
for (content, datum) in contents_and_data:
if datum is None:
logger.error(
"%s is visible, but is missing data. Skipping.", content.swhid()
)
continue
self.write_content(content.sha1_git, datum)
else:
self.write_content(content.sha1_git, datum)
elif content.status == "hidden":
self.write_content(obj_id, HIDDEN_MESSAGE)
self._expect_mismatched_object_error(obj_id)
elif content.status == "absent":
assert False, f"content_get returned absent content {content.swhid()}"
else:
# TODO: When content.status will have type Literal, replace this with
# assert_never
assert False, f"{content.swhid} has status: {content.status!r}"
def write_content(self, obj_id: Sha1Git, content: bytes) -> None:
header = git_objects.git_object_header("blob", len(content))
......
......@@ -30,7 +30,11 @@ class RevisionFlatCooker(BaseVaultCooker):
revdir = root / hashutil.hash_to_hex(revision["id"])
revdir.mkdir()
directory_builder = DirectoryBuilder(
self.storage, str(revdir).encode(), revision["directory"]
storage=self.storage,
root=str(revdir).encode(),
dir_id=revision["directory"],
thread_pool_size=self.thread_pool_size,
objstorage=self.objstorage,
)
directory_builder.build()
with tarfile.open(fileobj=self.fileobj, mode="w:gz") as tar:
......
# Copyright (C) 2017-2019 The Software Heritage developers
# Copyright (C) 2017-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -22,7 +22,7 @@ from swh.model.swhids import ObjectType
from swh.model.toposort import toposort
from swh.vault.cookers.base import BaseVaultCooker
from swh.vault.cookers.utils import revision_log
from swh.vault.to_disk import get_filtered_files_content
from swh.vault.to_disk import get_filtered_file_content
class RevisionGitfastCooker(BaseVaultCooker):
......@@ -82,9 +82,8 @@ class RevisionGitfastCooker(BaseVaultCooker):
obj_id = file_data["sha1"]
if obj_id in self.obj_done:
return
contents = list(get_filtered_files_content(self.storage, [file_data]))
content = contents[0]["content"]
self.write_cmd(BlobCommand(mark=self.mark(obj_id), data=content))
content = get_filtered_file_content(self.storage, file_data)
self.write_cmd(BlobCommand(mark=self.mark(obj_id), data=content["content"]))
self.obj_done.add(obj_id)
def _author_tuple_format(self, author, date):
......
......@@ -10,13 +10,13 @@ from swh.vault.cookers import get_cooker
@app.task(name=__name__ + ".SWHCookingTask")
def cook_bundle(bundle_type, swhid):
def cook_vault_bundle(bundle_type, swhid):
"""Main task to cook a bundle."""
get_cooker(bundle_type, CoreSWHID.from_string(swhid)).cook()
# TODO: remove once the scheduler handles priority tasks
@app.task(name=__name__ + ".SWHBatchCookingTask")
def batch_cook_bundle(bundle_type, swhid):
def cook_vault_bundle_batch(bundle_type, swhid):
"""Temporary task for the batch queue."""
get_cooker(bundle_type, CoreSWHID.from_string(swhid)).cook()
# Copyright (C) 2017-2021 The Software Heritage developers
# Copyright (C) 2017-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import timedelta
from typing import Any, Dict, List, Optional, Tuple
from swh.model.swhids import CoreSWHID
......@@ -19,6 +20,15 @@ class InMemoryVaultBackend:
def fetch(self, bundle_type: str, swhid: CoreSWHID) -> Optional[bytes]:
return self._cache.get(bundle_type, swhid)
def download_url(
self,
bundle_type: str,
swhid: CoreSWHID,
content_disposition: Optional[str] = None,
expiry: Optional[timedelta] = None,
) -> Optional[str]:
return None
def cook(
self, bundle_type: str, swhid: CoreSWHID, email: Optional[str] = None
) -> Dict[str, Any]:
......@@ -48,4 +58,4 @@ class InMemoryVaultBackend:
raise NotImplementedError("InMemoryVaultBackend.batch_cook()")
def batch_progress(self, batch_id: int) -> Dict[str, Any]:
pass
return {}
# Copyright (C) 2017-2020 The Software Heritage developers
# Copyright (C) 2017-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import timedelta
from typing import Any, Dict, List, Optional, Tuple
from typing_extensions import Protocol, runtime_checkable
......@@ -22,6 +23,17 @@ class VaultInterface(Protocol):
"""Fetch information from a bundle"""
...
@remote_api_endpoint("download_url")
def download_url(
self,
bundle_type: str,
swhid: CoreSWHID,
content_disposition: Optional[str] = None,
expiry: Optional[timedelta] = None,
) -> Optional[str]:
"""Obtain bundle direct download link if the vault cache backend supports it."""
...
@remote_api_endpoint("cook")
def cook(
self, bundle_type: str, swhid: CoreSWHID, email: Optional[str] = None
......@@ -31,8 +43,7 @@ class VaultInterface(Protocol):
...
@remote_api_endpoint("progress")
def progress(self, bundle_type: str, swhid: CoreSWHID):
...
def progress(self, bundle_type: str, swhid: CoreSWHID): ...
# Cookers endpoints
......
# Copyright (C) 2020-2022 The Software Heritage developers
# Copyright (C) 2020-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -7,11 +7,10 @@ from functools import partial
import os
from typing import Any, Dict
import pkg_resources.extern.packaging.version
import pytest
from pytest_postgresql import factories
from swh.core.db.pytest_plugin import initialize_database_for_module
from swh.core.db.db_utils import initialize_database_for_module
from swh.vault import get_vault
from swh.vault.backend import VaultBackend
......@@ -22,17 +21,6 @@ os.environ["LC_ALL"] = "C.UTF-8"
# a different one.
os.umask(0o022)
pytest_v = pkg_resources.get_distribution("pytest").parsed_version
if pytest_v < pkg_resources.extern.packaging.version.parse("3.9"):
@pytest.fixture
def tmp_path():
import pathlib
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
yield pathlib.Path(tmpdir)
vault_postgresql_proc = factories.postgresql_proc(
load=[
......@@ -43,11 +31,31 @@ vault_postgresql_proc = factories.postgresql_proc(
postgres_vault = factories.postgresql("vault_postgresql_proc")
def pytest_collection_modifyitems(items):
"""Skip tests using httpserver fixture if pytest-httpserver is
not available (debian < 12 for instance)"""
try:
from pytest_httpserver import HTTPServer # noqa
except ImportError:
pytest_httpserver_available = False
else:
pytest_httpserver_available = True
for item in items:
try:
fixtures = item.fixturenames
if "httpserver" in fixtures and not pytest_httpserver_available:
item.add_marker(
pytest.mark.skip(reason="pytest-httpserver not installed")
)
except Exception:
pass
@pytest.fixture
def swh_vault_config(postgres_vault, tmp_path) -> Dict[str, Any]:
tmp_path = str(tmp_path)
return {
"db": postgres_vault.dsn,
"db": postgres_vault.info.dsn,
"storage": {
"cls": "memory",
},
......@@ -66,7 +74,16 @@ def swh_vault_config(postgres_vault, tmp_path) -> Dict[str, Any]:
@pytest.fixture
def swh_vault(swh_vault_config):
return get_vault("local", **swh_vault_config)
return get_vault("postgresql", **swh_vault_config)
@pytest.fixture
def swh_vault_custom_notif(swh_vault_config):
notif_cfg = {
"from": "Someone from somewhere <nobody@nowhere.local>",
"api_url": "http://test.local/api/1",
}
return get_vault("postgresql", notification=notif_cfg, **swh_vault_config)
@pytest.fixture
......
# Copyright (C) 2017-2022 The Software Heritage developers
# Copyright (C) 2017-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -10,15 +10,20 @@ import smtplib
from unittest.mock import MagicMock, patch
import attr
import psycopg2
from backports.entry_points_selectable import entry_points as get_entry_points
import psycopg
import pytest
import requests
from swh.core.api import APIError
from swh.core.sentry import init_sentry
from swh.model.model import Content
from swh.model.swhids import CoreSWHID
from swh.vault.exc import NotFoundExc
from swh.vault.tests.vault_testing import hash_content
SENTRY_DSN = "https://user@example.org/1234"
@contextlib.contextmanager
def mock_cooking(vault_backend):
......@@ -109,7 +114,7 @@ def test_create_task_simple(swh_vault):
def test_create_fail_duplicate_task(swh_vault):
with mock_cooking(swh_vault):
swh_vault.create_task(TEST_TYPE, TEST_SWHID)
with pytest.raises(psycopg2.IntegrityError):
with pytest.raises(psycopg.IntegrityError):
swh_vault.create_task(TEST_TYPE, TEST_SWHID)
......@@ -173,6 +178,20 @@ def test_create_update_access_ts(swh_vault):
assert access_ts_2 < access_ts_3
def test_create_scheduler_rpc_failure(swh_vault, mocker):
mocker.patch.object(swh_vault, "_send_task").side_effect = APIError(
"Could not connect to scheduler RPC service"
)
# exception should bubble up when requesting to cook a bundle
with pytest.raises(APIError):
cooking_info = swh_vault.cook(TEST_TYPE, TEST_SWHID)
# once scheduler rpc issue fixed, task should be created
with mock_cooking(swh_vault):
cooking_info = swh_vault.cook(TEST_TYPE, TEST_SWHID)
assert cooking_info["task_status"] == "new"
def test_cook_idempotent(swh_vault, sample_data):
with mock_cooking(swh_vault):
info1 = swh_vault.cook(TEST_TYPE, TEST_SWHID)
......@@ -183,10 +202,11 @@ def test_cook_idempotent(swh_vault, sample_data):
def test_cook_email_pending_done(swh_vault):
with mock_cooking(swh_vault), patch.object(
swh_vault, "add_notif_email"
) as madd, patch.object(swh_vault, "send_notification") as msend:
with (
mock_cooking(swh_vault),
patch.object(swh_vault, "add_notif_email") as madd,
patch.object(swh_vault, "send_notification") as msend,
):
swh_vault.cook(TEST_TYPE, TEST_SWHID)
madd.assert_not_called()
msend.assert_not_called()
......@@ -202,6 +222,7 @@ def test_cook_email_pending_done(swh_vault):
msend.reset_mock()
swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done")
swh_vault.cache.add(TEST_TYPE, TEST_SWHID, b"content")
swh_vault.cook(TEST_TYPE, TEST_SWHID, email=TEST_EMAIL)
msend.assert_called_once_with(None, TEST_EMAIL, TEST_TYPE, TEST_SWHID, "done")
madd.assert_not_called()
......@@ -242,9 +263,45 @@ def test_send_all_emails(swh_vault):
m.assert_not_called()
def test_send_all_emails_custom_notif(swh_vault_custom_notif):
swh_vault = swh_vault_custom_notif
with mock_cooking(swh_vault):
emails = ("a@example.com", "billg@example.com", "test+42@example.org")
for email in emails:
swh_vault.cook(TEST_TYPE, TEST_SWHID, email=email)
swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done")
with patch.object(swh_vault, "_smtp_send") as m:
swh_vault.send_notif(TEST_TYPE, TEST_SWHID)
sent_emails = {k[0][0] for k in m.call_args_list}
assert {k["To"] for k in sent_emails} == set(emails)
download_url = (
"http://test.local/api/1/vault/"
f"{TEST_TYPE.replace('_', '-')}/{str(TEST_SWHID)}/raw"
)
for e in sent_emails:
assert "nobody@nowhere.local" in e["From"]
assert TEST_TYPE in e["Subject"]
assert TEST_SWHID.object_id.hex()[:5] in e["Subject"]
assert TEST_TYPE in str(e)
assert download_url in str(e)
assert TEST_SWHID.object_id.hex()[:5] in str(e)
assert "--\x20\n" in str(e) # Well-formated signature!!!
# Check that the entries have been deleted and recalling the
# function does not re-send the e-mails
m.reset_mock()
swh_vault.send_notif(TEST_TYPE, TEST_SWHID)
m.assert_not_called()
def test_send_email_error_no_smtp(swh_vault):
reports = []
init_sentry("http://example.org", extra_kwargs={"transport": reports.append})
init_sentry(SENTRY_DSN, extra_kwargs={"transport": reports.append})
emails = ("a@example.com", "billg@example.com", "test+42@example.org")
with mock_cooking(swh_vault):
......@@ -270,7 +327,7 @@ def test_send_email_error_no_smtp(swh_vault):
def test_send_email_error_send_failed(swh_vault):
reports = []
init_sentry("http://example.org", extra_kwargs={"transport": reports.append})
init_sentry(SENTRY_DSN, extra_kwargs={"transport": reports.append})
emails = ("a@example.com", "billg@example.com", "test+42@example.org")
with mock_cooking(swh_vault):
......@@ -417,3 +474,74 @@ def test_retry_failed_bundle(swh_vault):
swh_vault.cook(TEST_TYPE, TEST_SWHID)
info = swh_vault.progress(TEST_TYPE, TEST_SWHID)
assert info["task_status"] == "new"
def test_download_url_cache_pathslicing_backend(swh_vault):
swhid, content = fake_cook(swh_vault, TEST_TYPE, b"content")
# download URL feature is not available with pathslicing backend for vault cache
assert swh_vault.download_url(TEST_TYPE, swhid) is None
@pytest.fixture
def swh_vault_config_http_cache(swh_vault_config, httpserver):
swh_vault_config["cache"] = {
"cls": "http",
"url": httpserver.url_for("/"),
"compression": "none",
}
return swh_vault_config
@pytest.fixture
def swh_vault_http_cache(swh_vault_config_http_cache):
from swh.vault import get_vault
return get_vault("postgresql", **swh_vault_config_http_cache)
def test_download_url_cache_http_backend(swh_vault_http_cache, mocker, httpserver):
unknown_swhid = Content.from_data(b"foo").swhid()
with pytest.raises(
NotFoundExc, match=f"{TEST_TYPE} {unknown_swhid} is not available."
):
swh_vault_http_cache.download_url(TEST_TYPE, unknown_swhid)
mocker.patch.object(
swh_vault_http_cache, "progress", return_value={"task_status": "done"}
)
content = b"content"
swhid = Content.from_data(content).swhid()
objid = swh_vault_http_cache.cache._get_internal_id(TEST_TYPE, swhid)["sha1"]
httpserver.expect_request(f"/{objid.hex()}").respond_with_data(content)
download_url = swh_vault_http_cache.download_url(TEST_TYPE, swhid)
assert download_url is not None
assert requests.get(download_url).content == content
def test_cook_if_status_done_but_bundle_not_in_cache(swh_vault, mocker):
with mock_cooking(swh_vault):
swh_vault.cook(TEST_TYPE, TEST_SWHID, email="a@example.com")
swh_vault.cache.add(TEST_TYPE, TEST_SWHID, b"content")
swh_vault.set_status(TEST_TYPE, TEST_SWHID, "done")
create_task = mocker.spy(swh_vault, "create_task")
with mock_cooking(swh_vault):
swh_vault.cook(TEST_TYPE, TEST_SWHID, email="a@example.com")
create_task.assert_not_called()
swh_vault.cache.delete(TEST_TYPE, TEST_SWHID)
with mock_cooking(swh_vault):
swh_vault.cook(TEST_TYPE, TEST_SWHID, email="a@example.com")
create_task.assert_called_once_with(TEST_TYPE, TEST_SWHID, False)
def test_registered_backends():
entry_points = get_entry_points(group="swh.vault.classes")
assert {ep.name for ep in entry_points} == {"remote", "postgresql", "memory"}
for ep in entry_points:
assert ep.load()
# Copyright (C) 2021 The Software Heritage developers
# Copyright (C) 2021-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -71,7 +71,7 @@ def test_cook_directory(bundle_type, cooker_name_suffix, swhid_type, mocker):
runner = click.testing.CliRunner()
swhid = CoreSWHID.from_string(f"swh:1:{swhid_type}:{'0'*40}")
swhid = CoreSWHID.from_string(f"swh:1:{swhid_type}:{'0' * 40}")
with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
config_fd.write('{"storage": {}}')
......@@ -81,7 +81,7 @@ def test_cook_directory(bundle_type, cooker_name_suffix, swhid_type, mocker):
vault_cli_group,
[
"cook",
f"swh:1:{swhid_type}:{'0'*40}",
f"swh:1:{swhid_type}:{'0' * 40}",
"-",
"-C",
config_fd.name,
......@@ -143,7 +143,7 @@ vault:
result = cli_runner.invoke(swhdb, ["-C", cfgfile, "init", module_name])
assert result.exit_code == 0, f"Unexpected output: {result.output}"
assert swh_db_module(conninfo) == "vault"
assert swh_db_module(conninfo) == "vault:postgresql"
assert swh_db_version(conninfo) == VaultBackend.current_version
with BaseDb.connect(conninfo).cursor() as cur:
......
# Copyright (C) 2017-2022 The Software Heritage developers
# Copyright (C) 2017-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -25,29 +25,31 @@ import dulwich.porcelain
import dulwich.repo
import pytest
from swh.loader.git.from_disk import GitLoaderFromDisk
from swh.loader.git.loader import GitLoader
from swh.model import from_disk, hashutil
from swh.model.model import (
Content,
Directory,
DirectoryEntry,
Person,
Release,
ReleaseTargetType,
Revision,
RevisionType,
SkippedContent,
Snapshot,
SnapshotBranch,
TargetType,
SnapshotTargetType,
Timestamp,
TimestampWithTimezone,
)
from swh.model.model import Content, Directory, DirectoryEntry
from swh.model.model import ObjectType as ModelObjectType
from swh.model.swhids import CoreSWHID, ObjectType
from swh.vault.cookers import DirectoryCooker, GitBareCooker, RevisionGitfastCooker
from swh.vault.tests.vault_testing import hash_content
from swh.vault.to_disk import HIDDEN_MESSAGE, SKIPPED_MESSAGE
class TestRepo:
class _TestRepo:
"""A tiny context manager for a test git repository, with some utility
functions to perform basic git stuff.
"""
......@@ -169,22 +171,27 @@ def git_loader(
"""Instantiate a Git Loader using the storage instance as storage."""
def _create_loader(directory):
return GitLoaderFromDisk(
return GitLoader(
swh_storage,
"fake_origin",
directory=directory,
visit_date=datetime.datetime.now(datetime.timezone.utc),
directory,
)
return _create_loader
@contextlib.contextmanager
def cook_extract_directory_dircooker(storage, swhid, fsck=True):
def cook_extract_directory_dircooker(
storage, swhid, fsck=True, direct_objstorage=False
):
"""Context manager that cooks a directory and extract it."""
backend = unittest.mock.MagicMock()
backend.storage = storage
cooker = DirectoryCooker(swhid, backend=backend, storage=storage)
cooker = DirectoryCooker(
swhid,
backend=backend,
storage=storage,
objstorage=storage.objstorage if direct_objstorage else None,
)
cooker.fileobj = io.BytesIO()
assert cooker.check_exists()
cooker.prepare_bundle()
......@@ -197,10 +204,10 @@ def cook_extract_directory_dircooker(storage, swhid, fsck=True):
@contextlib.contextmanager
def cook_extract_directory_gitfast(storage, swhid, fsck=True):
def cook_extract_directory_gitfast(storage, swhid, fsck=True, direct_objstorage=False):
"""Context manager that cooks a revision containing a directory and extract it,
using RevisionGitfastCooker"""
test_repo = TestRepo()
test_repo = _TestRepo()
with test_repo as p:
date = TimestampWithTimezone.from_datetime(
datetime.datetime.now(datetime.timezone.utc)
......@@ -217,9 +224,10 @@ def cook_extract_directory_gitfast(storage, swhid, fsck=True):
)
storage.revision_add([revision])
with cook_stream_revision_gitfast(
storage, revision.swhid()
) as stream, test_repo as p:
with (
cook_stream_revision_gitfast(storage, revision.swhid()) as stream,
test_repo as p,
):
processor = dulwich.fastexport.GitImportProcessor(test_repo.repo)
processor.import_stream(stream)
test_repo.checkout(b"HEAD")
......@@ -300,7 +308,7 @@ def cook_stream_revision_gitfast(storage, swhid):
def cook_extract_revision_gitfast(storage, swhid, fsck=True):
"""Context manager that cooks a revision and extract it,
using RevisionGitfastCooker"""
test_repo = TestRepo()
test_repo = _TestRepo()
with cook_stream_revision_gitfast(storage, swhid) as stream, test_repo as p:
processor = dulwich.fastexport.GitImportProcessor(test_repo.repo)
processor.import_stream(stream)
......@@ -338,7 +346,7 @@ def cook_extract_git_bare(storage, swhid, fsck=True):
clone_dir,
]
)
test_repo = TestRepo(clone_dir)
test_repo = _TestRepo(clone_dir)
with test_repo:
yield test_repo, clone_dir
......@@ -390,8 +398,13 @@ TEST_EXECUTABLE = b"\x42\x40\x00\x00\x05"
class TestDirectoryCooker:
def test_directory_simple(self, git_loader, cook_extract_directory):
repo = TestRepo()
@pytest.mark.parametrize(
"direct_objstorage", [True, False], ids=["use objstorage", "storage only"]
)
def test_directory_simple(
self, git_loader, cook_extract_directory, direct_objstorage
):
repo = _TestRepo()
with repo as rp:
(rp / "file").write_text(TEST_CONTENT)
(rp / "executable").write_bytes(TEST_EXECUTABLE)
......@@ -407,7 +420,9 @@ class TestDirectoryCooker:
obj_id = hashutil.hash_to_bytes(obj_id_hex)
swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj_id)
with cook_extract_directory(loader.storage, swhid) as p:
with cook_extract_directory(
loader.storage, swhid, direct_objstorage=direct_objstorage
) as p:
assert (p / "file").stat().st_mode == 0o100644
assert (p / "file").read_text() == TEST_CONTENT
assert (p / "executable").stat().st_mode == 0o100755
......@@ -420,8 +435,13 @@ class TestDirectoryCooker:
directory = from_disk.Directory.from_disk(path=bytes(p))
assert obj_id_hex == hashutil.hash_to_hex(directory.hash)
def test_directory_filtered_objects(self, git_loader, cook_extract_directory):
repo = TestRepo()
@pytest.mark.parametrize(
"direct_objstorage", [True, False], ids=["use objstorage", "storage only"]
)
def test_directory_filtered_objects(
self, git_loader, cook_extract_directory, direct_objstorage
):
repo = _TestRepo()
with repo as rp:
file_1, id_1 = hash_content(b"test1")
file_2, id_2 = hash_content(b"test2")
......@@ -459,16 +479,23 @@ class TestDirectoryCooker:
for hashkey in loader.storage._cql_runner._content_indexes:
loader.storage._cql_runner._content_indexes[hashkey].pop(cnt3[hashkey])
with cook_extract_directory(loader.storage, swhid) as p:
with cook_extract_directory(
loader.storage, swhid, direct_objstorage=direct_objstorage
) as p:
assert (p / "file").read_bytes() == b"test1"
assert (p / "hidden_file").read_bytes() == HIDDEN_MESSAGE
assert (p / "absent_file").read_bytes() == SKIPPED_MESSAGE
def test_directory_bogus_perms(self, git_loader, cook_extract_directory):
@pytest.mark.parametrize(
"direct_objstorage", [True, False], ids=["use objstorage", "storage only"]
)
def test_directory_bogus_perms(
self, git_loader, cook_extract_directory, direct_objstorage
):
# Some early git repositories have 664/775 permissions... let's check
# if all the weird modes are properly normalized in the directory
# cooker.
repo = TestRepo()
repo = _TestRepo()
with repo as rp:
(rp / "file").write_text(TEST_CONTENT)
(rp / "file").chmod(0o664)
......@@ -507,7 +534,9 @@ class TestDirectoryCooker:
obj_id = hashutil.hash_to_bytes(obj_id_hex)
swhid = CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=obj_id)
with cook_extract_directory(loader.storage, swhid) as p:
with cook_extract_directory(
loader.storage, swhid, direct_objstorage=direct_objstorage
) as p:
assert (p / "file").stat().st_mode == 0o100644
assert (p / "executable").stat().st_mode == 0o100755
assert (p / "wat").stat().st_mode == 0o100644
......@@ -518,7 +547,7 @@ class TestDirectoryCooker:
):
"""Like test_directory_simple, but using swh_objstorage directly, without
going through swh_storage.content_get_data()"""
repo = TestRepo()
repo = _TestRepo()
with repo as rp:
(rp / "file").write_text(TEST_CONTENT)
(rp / "executable").write_bytes(TEST_EXECUTABLE)
......@@ -539,7 +568,7 @@ class TestDirectoryCooker:
swh_storage, "content_get_data", wraps=swh_storage.content_get_data
)
objstorage_content_batch = mocker.patch.object(
swh_storage.objstorage, "get_batch", wraps=swh_storage.objstorage.get_batch
swh_storage.objstorage, "get", wraps=swh_storage.objstorage.get
)
with cook_extract_directory_git_bare(
......@@ -582,8 +611,8 @@ class TestDirectoryCooker:
with cook_extract_directory_dircooker(
swh_storage, dir.swhid(), fsck=False
) as p:
assert (p / "submodule").is_symlink()
assert os.readlink(str(p / "submodule")) == target_rev
assert (p / "submodule").is_dir()
assert list((p / "submodule").iterdir()) == []
class RepoFixtures:
......@@ -594,7 +623,7 @@ class RepoFixtures:
#
# 1--2--3--4--5--6--7
#
repo = TestRepo()
repo = _TestRepo()
with repo as rp:
(rp / "file1").write_text(TEST_CONTENT)
repo.commit("add file1")
......@@ -637,7 +666,7 @@ class RepoFixtures:
# /
# 2----
#
repo = TestRepo()
repo = _TestRepo()
with repo as rp:
(rp / "file1").write_text(TEST_CONTENT)
c1 = repo.commit("Add file1")
......@@ -666,7 +695,7 @@ class RepoFixtures:
# \
# ----3 <-- b2
#
repo = TestRepo()
repo = _TestRepo()
with repo as rp:
(rp / "file1").write_text(TEST_CONTENT)
repo.commit("Add file1")
......@@ -711,7 +740,7 @@ class RepoFixtures:
# / / /
# 1---3---5
#
repo = TestRepo()
repo = _TestRepo()
with repo as rp:
(rp / "file1").write_text(TEST_CONTENT)
c1 = repo.commit("Add file1") # create commit 1
......@@ -764,7 +793,7 @@ class RepoFixtures:
# / / /
# 1---.---.
#
repo = TestRepo()
repo = _TestRepo()
with repo as rp:
(rp / "file1").write_text(TEST_CONTENT)
c1 = repo.commit("Commit 1")
......@@ -806,7 +835,7 @@ class RepoFixtures:
)
def load_repo_filtered_objects(self, git_loader):
repo = TestRepo()
repo = _TestRepo()
with repo as rp:
file_1, id_1 = hash_content(b"test1")
file_2, id_2 = hash_content(b"test2")
......@@ -854,7 +883,7 @@ class RepoFixtures:
def load_repo_null_fields(self, git_loader):
# Our schema doesn't enforce a lot of non-null revision fields. We need
# to check these cases don't break the cooker.
repo = TestRepo()
repo = _TestRepo()
with repo as rp:
(rp / "file").write_text(TEST_CONTENT)
c = repo.commit("initial commit")
......@@ -892,7 +921,7 @@ class RepoFixtures:
# \
# ----3----4 <-- t4a (annotated)
#
repo = TestRepo()
repo = _TestRepo()
with repo as rp:
(rp / "file1").write_text(TEST_CONTENT)
repo.commit("Add file1")
......@@ -1170,7 +1199,7 @@ class TestSnapshotCooker(RepoFixtures):
author=Person.from_fullname(b"me <test@example.org>"),
date=date,
target=revision.id,
target_type=ModelObjectType.REVISION,
target_type=ReleaseTargetType.REVISION,
synthetic=True,
raw_manifest=f"tag {len(malformed_rel_manifest)}\x00".encode()
+ malformed_rel_manifest,
......@@ -1180,10 +1209,10 @@ class TestSnapshotCooker(RepoFixtures):
snapshot = Snapshot(
branches={
b"refs/tags/v1.1.0": SnapshotBranch(
target=release.id, target_type=TargetType.RELEASE
target=release.id, target_type=SnapshotTargetType.RELEASE
),
b"HEAD": SnapshotBranch(
target=revision.id, target_type=TargetType.REVISION
target=revision.id, target_type=SnapshotTargetType.REVISION
),
}
)
......
......@@ -13,18 +13,18 @@ import datetime
import enum
from functools import partial
import io
import os
import subprocess
import tarfile
import tempfile
import unittest.mock
import attr
import dulwich.repo
import pytest
from pytest import param
from pytest_postgresql import factories
from swh.core.db.pytest_plugin import initialize_database_for_module
from swh.core.db.db_utils import initialize_database_for_module
from swh.model.from_disk import DentryPerms
from swh.model.model import (
Content,
......@@ -37,8 +37,7 @@ from swh.model.model import (
RevisionType,
Snapshot,
SnapshotBranch,
TargetType,
Timestamp,
SnapshotTargetType,
TimestampWithTimezone,
)
from swh.storage import get_storage
......@@ -55,7 +54,9 @@ storage_postgresql = factories.postgresql("storage_postgresql_proc")
@pytest.fixture
def swh_storage(storage_postgresql):
return get_storage("local", db=storage_postgresql.dsn, objstorage={"cls": "memory"})
return get_storage(
"postgresql", db=storage_postgresql.info.dsn, objstorage={"cls": "memory"}
)
class RootObjects(enum.Enum):
......@@ -175,7 +176,7 @@ def test_graph_revisions(
If weird_branches is False, dir4, cnt4, rel3, rel4, and cnt5 are excluded.
"""
from swh.graph.naive_client import NaiveClient as GraphClient
from swh.graph.http_naive_client import NaiveClient as GraphClient
# Create objects:
......@@ -289,22 +290,22 @@ def test_graph_revisions(
branches = {
b"refs/heads/master": SnapshotBranch(
target=rev2.id, target_type=TargetType.REVISION
target=rev2.id, target_type=SnapshotTargetType.REVISION
),
}
if tag:
branches[b"refs/tags/1.0.0"] = SnapshotBranch(
target=rel2.id, target_type=TargetType.RELEASE
target=rel2.id, target_type=SnapshotTargetType.RELEASE
)
if weird_branches:
branches[b"refs/heads/tree-ref"] = SnapshotBranch(
target=dir4.id, target_type=TargetType.DIRECTORY
target=dir4.id, target_type=SnapshotTargetType.DIRECTORY
)
branches[b"refs/heads/blob-ref"] = SnapshotBranch(
target=cnt4.sha1_git, target_type=TargetType.CONTENT
target=cnt4.sha1_git, target_type=SnapshotTargetType.CONTENT
)
branches[b"refs/tags/1.0.0-weird"] = SnapshotBranch(
target=rel4.id, target_type=TargetType.RELEASE
target=rel4.id, target_type=SnapshotTargetType.RELEASE
)
snp = Snapshot(branches=branches)
......@@ -401,6 +402,15 @@ def test_graph_revisions(
with tarfile.open(fileobj=io.BytesIO(bundle)) as tf:
tf.extractall(tempdir)
if root_object != RootObjects.WEIRD_RELEASE:
# check master ref exists in repository
master_ref_path = os.path.join(
tempdir, f"{cooked_swhid}.git/refs/heads/master"
)
assert os.path.exists(master_ref_path)
with open(master_ref_path, "r") as master_ref:
assert master_ref.read() == branches[b"refs/heads/master"].target.hex()
if root_object in (RootObjects.SNAPSHOT, RootObjects.REVISION):
log_head = "master"
elif root_object == RootObjects.RELEASE:
......@@ -559,139 +569,3 @@ def test_checksum_mismatch(swh_storage, mismatch_on):
)
assert output.decode() == f"{rev2.id.hex()} msg2\n{rev1.id.hex()} msg1\n"
@pytest.mark.parametrize(
"use_graph",
[
pytest.param(False, id="without-graph"),
pytest.param(True, id="with-graph", marks=pytest.mark.graph),
],
)
def test_ignore_displayname(swh_storage, use_graph):
"""Tests the original authorship information is used instead of
configured display names; otherwise objects would not match their hash,
and git-fsck/git-clone would fail.
This tests both with and without swh-graph, as both configurations use different
code paths to fetch revisions.
"""
date = TimestampWithTimezone.from_numeric_offset(Timestamp(1643882820, 0), 0, False)
legacy_person = Person.from_fullname(b"old me <old@example.org>")
current_person = Person.from_fullname(b"me <me@example.org>")
content = Content.from_data(b"foo")
swh_storage.content_add([content])
directory = Directory(
entries=(
DirectoryEntry(
name=b"file1", type="file", perms=0o100644, target=content.sha1_git
),
),
)
swh_storage.directory_add([directory])
revision = Revision(
message=b"rev",
author=legacy_person,
date=date,
committer=legacy_person,
committer_date=date,
parents=(),
type=RevisionType.GIT,
directory=directory.id,
synthetic=True,
)
swh_storage.revision_add([revision])
release = Release(
name=b"v1.1.0",
message=None,
author=legacy_person,
date=date,
target=revision.id,
target_type=ObjectType.REVISION,
synthetic=True,
)
swh_storage.release_add([release])
snapshot = Snapshot(
branches={
b"refs/tags/v1.1.0": SnapshotBranch(
target=release.id, target_type=TargetType.RELEASE
),
b"HEAD": SnapshotBranch(
target=revision.id, target_type=TargetType.REVISION
),
}
)
swh_storage.snapshot_add([snapshot])
# Add all objects to graph
if use_graph:
from swh.graph.naive_client import NaiveClient as GraphClient
nodes = [
str(x.swhid()) for x in [content, directory, revision, release, snapshot]
]
edges = [
(str(x.swhid()), str(y.swhid()))
for (x, y) in [
(directory, content),
(revision, directory),
(release, revision),
(snapshot, release),
(snapshot, revision),
]
]
swh_graph = unittest.mock.Mock(wraps=GraphClient(nodes=nodes, edges=edges))
else:
swh_graph = None
# Set a display name
with swh_storage.db() as db:
with db.transaction() as cur:
cur.execute(
"UPDATE person set displayname = %s where fullname = %s",
(current_person.fullname, legacy_person.fullname),
)
# Check the display name did apply in the storage
assert swh_storage.revision_get([revision.id])[0] == attr.evolve(
revision,
author=current_person,
committer=current_person,
)
# Cook
cooked_swhid = snapshot.swhid()
backend = InMemoryVaultBackend()
cooker = GitBareCooker(
cooked_swhid,
backend=backend,
storage=swh_storage,
graph=swh_graph,
)
cooker.cook()
# Get bundle
bundle = backend.fetch("git_bare", cooked_swhid)
# Extract bundle and make sure both revisions are in it
with tempfile.TemporaryDirectory("swh-vault-test-bare") as tempdir:
with tarfile.open(fileobj=io.BytesIO(bundle)) as tf:
tf.extractall(tempdir)
# If we are here, it means git-fsck succeeded when called by cooker.cook(),
# so we already know the original person was used. Let's double-check.
repo = dulwich.repo.Repo(f"{tempdir}/{cooked_swhid}.git")
tag = repo[b"refs/tags/v1.1.0"]
assert tag.tagger == legacy_person.fullname
commit = repo[tag.object[1]]
assert commit.author == legacy_person.fullname