Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • lunar/swh-deposit
  • anlambert/swh-deposit
  • swh/devel/swh-deposit
  • douardda/swh-deposit
  • ardumont/swh-deposit
  • marmoute/swh-deposit
  • rboyer/swh-deposit
7 results
Show changes
Showing
with 1967 additions and 959 deletions
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from rest_framework.request import Request
from swh.deposit.api.common import APIDelete, APIPut, ParsedRequestHeaders
from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS
from swh.deposit.errors import BAD_REQUEST, DepositError, ParserError
from swh.deposit.models import Deposit
from swh.deposit.parsers import SWHAtomEntryParser, SWHMultiPartParser
from swh.model.swhids import QualifiedSWHID
class EditAPI(APIPut, APIDelete):
"""Deposit request class defining api endpoints for sword deposit.
What's known as 'Edit-IRI' in the sword specification.
HTTP verbs supported: PUT, DELETE
"""
parser_classes = (SWHMultiPartParser, SWHAtomEntryParser)
def restrict_access(
self, request: Request, headers: ParsedRequestHeaders, deposit: Deposit
) -> None:
"""Relax restriction access to allow metadata update on deposit with status "done" when
a swhid is provided.
"""
if (
request.method == "PUT"
and headers.swhid is not None
and deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS
):
# Allow metadata update on deposit with status "done" when swhid provided
return
# otherwise, let the standard access restriction check occur
super().restrict_access(request, headers, deposit)
def process_put(
self,
request,
headers: ParsedRequestHeaders,
collection_name: str,
deposit: Deposit,
) -> None:
"""This allows the following scenarios:
- multipart: replace all the deposit (status partial) metadata and archive
with the provided ones.
- atom: replace all the deposit (status partial) metadata with the
provided ones.
- with swhid, atom: Add new metadata to deposit (status done) with provided ones
and push such metadata to the metadata storage directly.
source:
- http://swordapp.github.io/SWORDv2-Profile/SWORDProfile.html#protocoloperations_editingcontent_metadata
- http://swordapp.github.io/SWORDv2-Profile/SWORDProfile.html#protocoloperations_editingcontent_multipart
Raises:
400 if any of the following occur:
- the swhid provided and the deposit swhid do not match
- the provided metadata xml file is malformed
- the provided xml atom entry is empty
- the provided swhid does not exist in the archive
""" # noqa
swhid = headers.swhid
if swhid is None:
if request.content_type.startswith("multipart/"):
self._multipart_upload(
request,
headers,
collection_name,
deposit=deposit,
replace_archives=True,
replace_metadata=True,
)
else:
# standard metadata update (replace all metadata already provided to the
# deposit by the new ones)
self._atom_entry(
request,
headers,
collection_name,
deposit=deposit,
replace_metadata=True,
)
return
# Update metadata on a deposit already ingested
# Write to the metadata storage (and the deposit backend)
# no ingestion triggered
assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS
if swhid != deposit.swhid:
raise DepositError(
BAD_REQUEST,
f"Mismatched provided SWHID {swhid} with deposit's {deposit.swhid}.",
"The provided SWHID does not match the deposit to update. "
"Please ensure you send the correct deposit SWHID.",
)
try:
raw_metadata, metadata_tree = self._read_metadata(request.data)
except ParserError:
raise DepositError(
BAD_REQUEST,
"Malformed xml metadata",
"The xml received is malformed. "
"Please ensure your metadata file is correctly formatted.",
)
if len(metadata_tree) == 0:
raise DepositError(
BAD_REQUEST,
"Empty body request is not supported",
"Atom entry deposit is supposed to send for metadata. "
"If the body is empty, there is no metadata.",
)
_, deposit, deposit_request = self._store_metadata_deposit(
deposit,
QualifiedSWHID.from_string(swhid),
metadata_tree,
raw_metadata,
deposit.origin_url,
)
def process_delete(self, req, collection_name: str, deposit: Deposit) -> None:
"""Delete the container (deposit).
source: http://swordapp.github.io/SWORDv2-Profile/SWORDProfile.html#protocoloperations_deleteconteiner # noqa
"""
self._delete_deposit(collection_name, deposit)
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Optional, Tuple
from rest_framework import status
from swh.deposit.api.common import (
ACCEPT_ARCHIVE_CONTENT_TYPES,
APIDelete,
APIPost,
APIPut,
ParsedRequestHeaders,
Receipt,
)
from swh.deposit.config import CONT_FILE_IRI
from swh.deposit.errors import BAD_REQUEST, DepositError
from swh.deposit.models import Deposit
from swh.deposit.parsers import SWHFileUploadTarParser, SWHFileUploadZipParser
class EditMediaAPI(APIPost, APIPut, APIDelete):
"""Deposit request class defining api endpoints for sword deposit.
What's known as 'EM IRI' in the sword specification.
HTTP verbs supported: PUT, POST, DELETE
"""
parser_classes = (
SWHFileUploadZipParser,
SWHFileUploadTarParser,
)
def process_put(
self, req, headers: ParsedRequestHeaders, collection_name: str, deposit: Deposit
) -> None:
"""Replace existing content for the existing deposit.
source: http://swordapp.github.io/SWORDv2-Profile/SWORDProfile.html#protocoloperations_editingcontent_binary # noqa
Returns:
204 No content
"""
if req.content_type not in ACCEPT_ARCHIVE_CONTENT_TYPES:
msg = "Packaging format supported is restricted to %s" % (
", ".join(ACCEPT_ARCHIVE_CONTENT_TYPES)
)
raise DepositError(BAD_REQUEST, msg)
self._binary_upload(
req, headers, collection_name, deposit=deposit, replace_archives=True
)
def process_post(
self,
req,
headers: ParsedRequestHeaders,
collection_name: str,
deposit: Optional[Deposit] = None,
) -> Tuple[int, str, Receipt]:
"""Add new content to the existing deposit.
source: http://swordapp.github.io/SWORDv2-Profile/SWORDProfile.html#protocoloperations_addingcontent_mediaresource # noqa
Returns:
201 Created
Headers: Location: [Cont-File-IRI]
Body: [optional Deposit Receipt]
"""
assert deposit is not None
if req.content_type not in ACCEPT_ARCHIVE_CONTENT_TYPES:
msg = "Packaging format supported is restricted to %s" % (
", ".join(ACCEPT_ARCHIVE_CONTENT_TYPES)
)
raise DepositError(BAD_REQUEST, msg)
return (
status.HTTP_201_CREATED,
CONT_FILE_IRI,
self._binary_upload(req, headers, collection_name, deposit),
)
def process_delete(self, req, collection_name: str, deposit: Deposit) -> None:
"""Delete content (archives) from existing deposit.
source: http://swordapp.github.io/SWORDv2-Profile/SWORDProfile.html#protocoloperations_deletingcontent # noqa
Returns:
204 Created
"""
self._delete_archives(collection_name, deposit)
# Copyright (C) 2017-2018 The Software Heritage developers
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.deposit import utils
from typing import Optional
from ...config import METADATA_TYPE
from ...models import DepositRequest, Deposit
from rest_framework.permissions import AllowAny
from rest_framework.views import APIView
from swh.deposit.config import METADATA_TYPE, APIConfig
from swh.deposit.models import Deposit, DepositRequest
class DepositReadMixin:
"""Deposit Read mixin
"""
class DepositReadMixin:
"""Deposit Read mixin"""
def _deposit_requests(self, deposit, request_type):
def _deposit_requests(self, deposit: Deposit, request_type: str):
"""Given a deposit, yields its associated deposit_request
Args:
deposit (Deposit): Deposit to list requests for
request_type (str): 'archive' or 'metadata'
deposit: Deposit to list requests for
request_type: 'archive' or 'metadata'
Yields:
deposit requests of type request_type associated to the deposit
deposit requests of type request_type associated to the deposit,
most recent first
"""
if isinstance(deposit, int):
deposit = Deposit.objects.get(pk=deposit)
deposit_requests = DepositRequest.objects.filter(
type=request_type,
deposit=deposit).order_by('id')
type=request_type, deposit=deposit
).order_by("-id")
for deposit_request in deposit_requests:
yield deposit_request
def _metadata_get(self, deposit):
"""Given a deposit, aggregate all metadata requests.
def _metadata_get(self, deposit: Deposit) -> Optional[bytes]:
"""Retrieve the last non-empty raw metadata object for that deposit, if any
Args:
deposit (Deposit): The deposit instance to extract
metadata from.
Returns:
metadata dict from the deposit.
deposit: The deposit instance to extract metadata from
"""
metadata = (m.metadata for m in self._deposit_requests(
deposit, request_type=METADATA_TYPE))
return utils.merge(*metadata)
for deposit_request in self._deposit_requests(
deposit, request_type=METADATA_TYPE
):
if deposit_request.raw_metadata is not None:
return deposit_request.raw_metadata
return None
class APIPrivateView(APIConfig, APIView):
"""Mixin intended as private api (so no authentication) based API view
(for the private ones).
"""
def __init__(self):
super().__init__()
self.authentication_classes = ()
self.permission_classes = (AllowAny,)
def checks(self, req, collection_name, deposit=None):
"""Override default checks implementation to allow empty collection."""
headers = self._read_headers(req)
self.additional_checks(req, headers, collection_name, deposit)
return {"headers": headers}
def get(
self,
request,
collection_name=None,
deposit_id=None,
*args,
**kwargs,
):
return super().get(request, collection_name, deposit_id)
def put(
self,
request,
collection_name=None,
deposit_id=None,
*args,
**kwargs,
):
return super().put(request, collection_name, deposit_id)
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import re
import tarfile
import zipfile
from rest_framework import status
from . import DepositReadMixin
from ..common import SWHGetDepositAPI, SWHPrivateAPIView
from ...config import DEPOSIT_STATUS_VERIFIED, DEPOSIT_STATUS_REJECTED
from ...config import ARCHIVE_TYPE
from ...models import Deposit
MANDATORY_FIELDS_MISSING = 'Mandatory fields are missing'
ALTERNATE_FIELDS_MISSING = 'Mandatory alternate fields are missing'
MANDATORY_ARCHIVE_UNREADABLE = 'At least one of its associated archives is not readable' # noqa
MANDATORY_ARCHIVE_INVALID = 'Mandatory archive is invalid (i.e contains only one archive)' # noqa
MANDATORY_ARCHIVE_UNSUPPORTED = 'Mandatory archive type is not supported'
MANDATORY_ARCHIVE_MISSING = 'Deposit without archive is rejected'
ARCHIVE_EXTENSIONS = [
'zip', 'tar', 'tar.gz', 'xz', 'tar.xz', 'bz2',
'tar.bz2', 'Z', 'tar.Z', 'tgz', '7z'
]
PATTERN_ARCHIVE_EXTENSION = re.compile(
r'.*\.(%s)$' % '|'.join(ARCHIVE_EXTENSIONS))
class SWHChecksDeposit(SWHGetDepositAPI, SWHPrivateAPIView, DepositReadMixin):
"""Dedicated class to read a deposit's raw archives content.
Only GET is supported.
"""
def _check_deposit_archives(self, deposit):
"""Given a deposit, check each deposit request of type archive.
Args:
The deposit to check archives for
Returns
tuple (status, error_detail): True, None if all archives
are ok, (False, <detailed-error>) otherwise.
"""
requests = list(self._deposit_requests(
deposit, request_type=ARCHIVE_TYPE))
if len(requests) == 0: # no associated archive is refused
return False, {
'archive': [{
'summary': MANDATORY_ARCHIVE_MISSING,
}]
}
errors = []
for archive_request in requests:
check, error_message = self._check_archive(archive_request)
if not check:
errors.append({
'summary': error_message,
'fields': [archive_request.id]
})
if not errors:
return True, None
return False, {
'archive': errors
}
def _check_archive(self, archive_request):
"""Check that a deposit associated archive is ok:
- readable
- supported archive format
- valid content: the archive does not contain a single archive file
If any of those checks are not ok, return the corresponding
failing check.
Args:
archive_path (DepositRequest): Archive to check
Returns:
(True, None) if archive is check compliant, (False,
<detail-error>) otherwise.
"""
archive_path = archive_request.archive.path
try:
if zipfile.is_zipfile(archive_path):
with zipfile.ZipFile(archive_path) as f:
files = f.namelist()
elif tarfile.is_tarfile(archive_path):
with tarfile.open(archive_path) as f:
files = f.getnames()
else:
return False, MANDATORY_ARCHIVE_UNSUPPORTED
except Exception:
return False, MANDATORY_ARCHIVE_UNREADABLE
if len(files) > 1:
return True, None
element = files[0]
if PATTERN_ARCHIVE_EXTENSION.match(element):
# archive in archive!
return False, MANDATORY_ARCHIVE_INVALID
return True, None
def _check_metadata(self, metadata):
"""Check to execute on all metadata for mandatory field presence.
Args:
metadata (dict): Metadata dictionary to check for mandatory fields
Returns:
tuple (status, error_detail): True, None if metadata are
ok (False, <detailed-error>) otherwise.
"""
required_fields = {
'author': False,
}
alternate_fields = {
('name', 'title'): False, # alternate field, at least one
# of them must be present
}
for field, value in metadata.items():
for name in required_fields:
if name in field:
required_fields[name] = True
for possible_names in alternate_fields:
for possible_name in possible_names:
if possible_name in field:
alternate_fields[possible_names] = True
continue
mandatory_result = [k for k, v in required_fields.items() if not v]
optional_result = [
' or '.join(k) for k, v in alternate_fields.items() if not v]
if mandatory_result == [] and optional_result == []:
return True, None
detail = []
if mandatory_result != []:
detail.append({
'summary': MANDATORY_FIELDS_MISSING,
'fields': mandatory_result
})
if optional_result != []:
detail.append({
'summary': ALTERNATE_FIELDS_MISSING,
'fields': optional_result,
})
return False, {
'metadata': detail
}
def process_get(self, req, collection_name, deposit_id):
"""Build a unique tarball from the multiple received and stream that
content to the client.
Args:
req (Request):
collection_name (str): Collection owning the deposit
deposit_id (id): Deposit concerned by the reading
Returns:
Tuple status, stream of content, content-type
"""
deposit = Deposit.objects.get(pk=deposit_id)
metadata = self._metadata_get(deposit)
problems = {}
# will check each deposit's associated request (both of type
# archive and metadata) for errors
archives_status, error_detail = self._check_deposit_archives(deposit)
if not archives_status:
problems.update(error_detail)
metadata_status, error_detail = self._check_metadata(metadata)
if not metadata_status:
problems.update(error_detail)
deposit_status = archives_status and metadata_status
# if any problems arose, the deposit is rejected
if not deposit_status:
deposit.status = DEPOSIT_STATUS_REJECTED
deposit.status_detail = problems
response = {
'status': deposit.status,
'details': deposit.status_detail,
}
else:
deposit.status = DEPOSIT_STATUS_VERIFIED
response = {
'status': deposit.status,
}
deposit.save()
return status.HTTP_200_OK, json.dumps(response), 'application/json'
# Copyright (C) 2018 The Software Heritage developers
# Copyright (C) 2018-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Dict
from xml.etree import ElementTree
from rest_framework.fields import _UnvalidatedField
from django.conf import settings
from django.core.paginator import Paginator
from django.db.models import CharField, Q, TextField
from django.http import JsonResponse
from rest_framework.decorators import (
api_view,
authentication_classes,
permission_classes,
)
from rest_framework.generics import ListAPIView
from rest_framework.pagination import PageNumberPagination
from rest_framework import serializers
from rest_framework.permissions import AllowAny
from rest_framework.request import Request
import sentry_sdk
from ..common import SWHPrivateAPIView
from ..converters import convert_status_detail
from ...models import Deposit
from swh.deposit.api.private import APIPrivateView
from swh.deposit.api.utils import DefaultPagination, DepositSerializer
from swh.deposit.models import Deposit
from swh.deposit.utils import parse_swh_deposit_origin, parse_swh_metadata_provenance
from swh.model.swhids import QualifiedSWHID
class DefaultPagination(PageNumberPagination):
page_size = 100
page_size_query_param = 'page_size'
def _enrich_deposit_with_metadata(deposit: Deposit) -> Deposit:
deposit_requests = deposit.depositrequest_set.filter(type="metadata")
deposit_requests = deposit_requests.order_by("-id")
# enrich deposit with raw metadata when we have some
if deposit_requests and len(deposit_requests) > 0:
raw_meta = deposit_requests[0].raw_metadata
if raw_meta:
deposit.set_raw_metadata(raw_meta)
return deposit
class StatusDetailField(_UnvalidatedField):
"""status_detail field is a dict, we want a simple message instead.
So, we reuse the convert_status_detail from deposit_status
endpoint to that effect.
class APIList(ListAPIView, APIPrivateView):
"""Deposit request class to list the deposit's status per page.
HTTP verbs supported: GET
"""
def to_representation(self, value):
return convert_status_detail(value)
serializer_class = DepositSerializer
pagination_class = DefaultPagination
class DepositSerializer(serializers.ModelSerializer):
status_detail = StatusDetailField()
def paginate_queryset(self, queryset):
"""Return a single page of results. This enriches the queryset results with
metadata if any.
class Meta:
model = Deposit
fields = '__all__'
"""
page_result = self.paginator.paginate_queryset(
queryset, self.request, view=self
)
deposits = []
for deposit in page_result:
_enrich_deposit_with_metadata(deposit)
deposits.append(deposit)
class DepositList(ListAPIView, SWHPrivateAPIView):
"""Deposit request class to list the deposit's status per page.
return deposits
HTTP verbs supported: GET
def get_queryset(self):
"""Retrieve queryset of deposits (with some optional filtering)."""
params = self.request.query_params
exclude_like = params.get("exclude")
username = params.get("username")
"""
queryset = Deposit.objects.all().order_by('id')
serializer_class = DepositSerializer
pagination_class = DefaultPagination
if username:
deposits_qs = Deposit.objects.select_related("client").filter(
client__username=username
)
else:
deposits_qs = Deposit.objects.all()
if exclude_like:
# sql injection: A priori, nothing to worry about, django does it for
# queryset
# https://docs.djangoproject.com/en/3.0/topics/security/#sql-injection-protection # noqa
deposits_qs = deposits_qs.exclude(external_id__startswith=exclude_like)
return deposits_qs.order_by("id")
def _deposit_search_query(search_value: str) -> Q:
fields = [f for f in Deposit._meta.fields if isinstance(f, (CharField, TextField))]
queries = [Q(**{f.name + "__icontains": search_value}) for f in fields]
search_query = Q()
for query in queries:
search_query = search_query | query
return search_query
@api_view()
@authentication_classes([])
@permission_classes([AllowAny])
def deposit_list_datatables(request: Request) -> JsonResponse:
"""Special API view to list and filter deposits, produced responses are intended
to be consumed by datatables js framework used in deposits admin Web UI."""
table_data: Dict[str, Any] = {}
table_data["draw"] = int(request.GET.get("draw", 1))
try:
username = request.GET.get("username")
if username:
deposits = Deposit.objects.select_related("client").filter(
client__username=username
)
else:
deposits = Deposit.objects.all()
deposits_count = deposits.count()
search_value = request.GET.get("search[value]")
if search_value:
deposits = deposits.filter(_deposit_search_query(search_value))
exclude_pattern = request.GET.get("excludePattern")
if exclude_pattern:
deposits = deposits.exclude(_deposit_search_query(exclude_pattern))
column_order = request.GET.get("order[0][column]")
field_order = request.GET.get("columns[%s][name]" % column_order, "id")
order_dir = request.GET.get("order[0][dir]", "desc")
if order_dir == "desc":
field_order = "-" + field_order
deposits = deposits.order_by(field_order)
length = int(request.GET.get("length", 10))
page = int(request.GET.get("start", 0)) // length + 1
paginator = Paginator(deposits, length)
data = [
DepositSerializer(_enrich_deposit_with_metadata(d)).data
for d in paginator.page(page).object_list
]
table_data["recordsTotal"] = deposits_count
table_data["recordsFiltered"] = deposits.count()
data_list = []
for d in data:
data_dict = {
"id": d["id"],
"type": d["type"],
"external_id": d["external_id"],
"raw_metadata": d["raw_metadata"],
"reception_date": d["reception_date"],
"status": d["status"],
"status_detail": d["status_detail"],
"swhid": d["swhid"],
"swhid_context": d["swhid_context"],
}
provenance = None
raw_metadata = d["raw_metadata"]
# for meta deposit, the uri should be the url provenance
if raw_metadata and d["type"] == "meta": # metadata provenance
provenance = parse_swh_metadata_provenance(
ElementTree.fromstring(raw_metadata)
)
# For code deposits the uri is the origin
# First, trying to determine it out of the raw metadata associated with the
# deposit
elif raw_metadata and d["type"] == "code":
create_origin_url, add_to_origin_url = parse_swh_deposit_origin(
ElementTree.fromstring(raw_metadata)
)
provenance = create_origin_url or add_to_origin_url
# For code deposits, if not provided, use the origin_url
if not provenance and d["type"] == "code":
if d["origin_url"]:
provenance = d["origin_url"]
# If still not found, fallback using the swhid context
if not provenance and d["swhid_context"]:
swhid = QualifiedSWHID.from_string(d["swhid_context"])
provenance = swhid.origin
data_dict["uri"] = provenance # could be None
data_list.append(data_dict)
table_data["data"] = data_list
except Exception as exc:
sentry_sdk.capture_exception(exc)
table_data["error"] = (
"An error occurred while retrieving the list of deposits !"
)
if settings.DEBUG:
table_data["error"] += "\n" + str(exc)
return JsonResponse(table_data)
# Copyright (C) 2017-2019 The Software Heritage developers
# Copyright (C) 2017-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from contextlib import contextmanager
import os
from pathlib import Path
import shutil
import tempfile
from typing import Any, Dict, Iterator, List, Optional, Tuple
from xml.etree import ElementTree
from contextlib import contextmanager
from django.http import FileResponse
from rest_framework import status
from swh.core import tarball
from swh.model import identifiers
from swh.deposit.utils import normalize_date
from swh.deposit import utils
from . import DepositReadMixin
from ...config import SWH_PERSON, ARCHIVE_TYPE
from ..common import SWHGetDepositAPI, SWHPrivateAPIView
from ...models import Deposit
from swh.deposit.api.common import APIGet
from swh.deposit.api.private import APIPrivateView, DepositReadMixin
from swh.deposit.config import ARCHIVE_TYPE, SWH_PERSON
from swh.deposit.models import Deposit
from swh.deposit.utils import NAMESPACES, normalize_date
from swh.model.hashutil import hash_to_hex
from swh.model.model import MetadataAuthorityType
from swh.model.swhids import CoreSWHID
@contextmanager
def aggregate_tarballs(extraction_dir, archive_paths):
def aggregate_tarballs(extraction_dir: str, archives: List) -> Iterator[str]:
"""Aggregate multiple tarballs into one and returns this new archive's
path.
Args:
extraction_dir (path): Path to use for the tarballs computation
archive_paths ([str]): Deposit's archive paths
extraction_dir: Path to use for the tarballs computation
archive_paths: Deposit's archive paths
Returns:
Tuple (directory to clean up, archive path (aggregated or not))
"""
if len(archive_paths) > 1:
# need to rebuild one archive from multiple ones
os.makedirs(extraction_dir, 0o755, exist_ok=True)
dir_path = tempfile.mkdtemp(prefix='swh.deposit-',
dir=extraction_dir)
# root folder to build an aggregated tarball
aggregated_tarball_rootdir = os.path.join(dir_path, 'aggregate')
os.makedirs(aggregated_tarball_rootdir, 0o755, exist_ok=True)
# uncompress in a temporary location all archives
for archive_path in archive_paths:
tarball.uncompress(archive_path, aggregated_tarball_rootdir)
# Aggregate into one big tarball the multiple smaller ones
temp_tarpath = tarball.compress(
aggregated_tarball_rootdir + '.zip',
nature='zip',
dirpath_or_files=aggregated_tarball_rootdir)
# can already clean up temporary directory
shutil.rmtree(aggregated_tarball_rootdir)
try:
yield temp_tarpath
finally:
shutil.rmtree(dir_path)
else: # only 1 archive, no need to do fancy actions (and no cleanup step)
yield archive_paths[0]
class SWHDepositReadArchives(SWHGetDepositAPI, SWHPrivateAPIView,
DepositReadMixin):
# rebuild one zip archive from (possibly) multiple ones
os.makedirs(extraction_dir, 0o755, exist_ok=True)
dir_path = tempfile.mkdtemp(prefix="swh.deposit-", dir=extraction_dir)
# root folder to build an aggregated tarball
aggregated_tarball_rootdir = os.path.join(dir_path, "aggregate")
download_tarball_rootdir = os.path.join(dir_path, "download")
# uncompress in a temporary location all client's deposit archives
for archive in archives:
with archive.open("rb") as archive_fp:
try:
# For storage which supports the path method access, let's retrieve it
archive_path = archive.path
except NotImplementedError:
# otherwise for remote backend which do not support it, let's download
# the tarball locally first
tarball_path = Path(archive.name)
tarball_path_dir = Path(download_tarball_rootdir) / tarball_path.parent
tarball_path_dir.mkdir(0o755, parents=True, exist_ok=True)
archive_path = str(tarball_path_dir / tarball_path.name)
with open(archive_path, "wb") as f:
while chunk := archive_fp.read(10 * 1024 * 1024):
f.write(chunk)
tarball.uncompress(archive_path, aggregated_tarball_rootdir)
# Aggregate into one big tarball the multiple smaller ones
temp_tarpath = shutil.make_archive(
aggregated_tarball_rootdir, "tar", aggregated_tarball_rootdir
)
# can already clean up temporary directory
shutil.rmtree(aggregated_tarball_rootdir)
try:
yield temp_tarpath
finally:
shutil.rmtree(dir_path)
class APIReadArchives(APIPrivateView, APIGet, DepositReadMixin):
"""Dedicated class to read a deposit's raw archives content.
Only GET is supported.
"""
ADDITIONAL_CONFIG = {
'extraction_dir': ('str', '/tmp/swh-deposit/archive/'),
}
def __init__(self):
super().__init__()
self.extraction_dir = self.config['extraction_dir']
self.extraction_dir = self.config["extraction_dir"]
if not os.path.exists(self.extraction_dir):
os.makedirs(self.extraction_dir)
def process_get(self, req, collection_name, deposit_id):
def process_get(
self, request, collection_name: str, deposit: Deposit
) -> Tuple[int, Any, str]:
"""Build a unique tarball from the multiple received and stream that
content to the client.
Args:
req (Request):
collection_name (str): Collection owning the deposit
deposit_id (id): Deposit concerned by the reading
request (Request):
collection_name: Collection owning the deposit
deposit: Deposit concerned by the reading
Returns:
Tuple status, stream of content, content-type
"""
archive_paths = [r.archive.path for r in self._deposit_requests(
deposit_id, request_type=ARCHIVE_TYPE)]
with aggregate_tarballs(self.extraction_dir, archive_paths) as path:
return FileResponse(open(path, 'rb'),
status=status.HTTP_200_OK,
content_type='application/octet-stream')
class SWHDepositReadMetadata(SWHGetDepositAPI, SWHPrivateAPIView,
DepositReadMixin):
"""Class in charge of aggregating metadata on a deposit.
"""
ADDITIONAL_CONFIG = {
'provider': ('dict', {
# 'provider_name': '', # those are not set since read from the
# 'provider_url': '', # deposit's client
'provider_type': 'deposit_client',
'metadata': {}
}),
'tool': ('dict', {
'name': 'swh-deposit',
'version': '0.0.1',
'configuration': {
'sword_version': '2'
}
})
}
archives = [
r.archive
for r in self._deposit_requests(deposit, request_type=ARCHIVE_TYPE)
]
return (
status.HTTP_200_OK,
aggregate_tarballs(self.extraction_dir, archives),
"swh/generator",
)
def __init__(self):
super().__init__()
self.provider = self.config['provider']
self.tool = self.config['tool']
def _normalize_dates(self, deposit, metadata):
class APIReadMetadata(APIPrivateView, APIGet, DepositReadMixin):
"""Class in charge of aggregating metadata on a deposit."""
def _parse_dates(
self, deposit: Deposit, metadata: ElementTree.Element
) -> Tuple[dict, dict]:
"""Normalize the date to use as a tuple of author date, committer date
from the incoming metadata.
Args:
deposit (Deposit): Deposit model representation
metadata (Dict): Metadata dict representation
Returns:
Tuple of author date, committer date. Those dates are
swh normalized.
"""
commit_date = metadata.get('codemeta:datePublished')
author_date = metadata.get('codemeta:dateCreated')
if author_date and commit_date:
pass
elif commit_date:
author_date = commit_date
elif author_date:
commit_date = author_date
commit_date_elt = metadata.find("codemeta:datePublished", namespaces=NAMESPACES)
author_date_elt = metadata.find("codemeta:dateCreated", namespaces=NAMESPACES)
author_date: Any
commit_date: Any
if author_date_elt is None and commit_date_elt is None:
author_date = commit_date = deposit.complete_date
elif commit_date_elt is None:
author_date = commit_date = author_date_elt.text # type: ignore
elif author_date_elt is None:
author_date = commit_date = commit_date_elt.text
else:
author_date = deposit.complete_date
commit_date = deposit.complete_date
return (
normalize_date(author_date),
normalize_date(commit_date)
)
author_date = author_date_elt.text
commit_date = commit_date_elt.text
return (normalize_date(author_date), normalize_date(commit_date))
def metadata_read(self, deposit):
"""Read and aggregate multiple data on deposit into one unified data
dictionary.
def metadata_read(self, deposit: Deposit) -> Dict[str, Any]:
"""Read and aggregate multiple deposit information into one unified dictionary.
Args:
deposit (Deposit): Deposit concerned by the data aggregation.
deposit: Deposit to retrieve information from
Returns:
Dictionary of data representing the deposit to inject in swh.
Dictionary of deposit information read by the deposit loader, with the
following keys:
"""
metadata = self._metadata_get(deposit)
# Read information metadata
data = {
'origin': {
'type': 'deposit',
'url': utils.origin_url_from(deposit),
}
}
# revision
**origin** (Dict): Information about the origin
fullname = deposit.client.username
author_committer = SWH_PERSON
**raw_metadata** (str): List of raw metadata received for the
deposit
# metadata provider
self.provider['provider_name'] = deposit.client.last_name
self.provider['provider_url'] = deposit.client.provider_url
**provider** (Dict): the metadata provider information about the
deposit client
revision_type = 'tar'
revision_msg = '%s: Deposit %s in collection %s' % (
fullname, deposit.id, deposit.collection.name)
author_date, commit_date = self._normalize_dates(deposit, metadata)
data['revision'] = {
'synthetic': True,
'date': author_date,
'committer_date': commit_date,
'author': author_committer,
'committer': author_committer,
'type': revision_type,
'message': revision_msg,
'metadata': metadata,
}
**tool** (Dict): the deposit information
if deposit.parent:
swh_persistent_id = deposit.parent.swh_id
persistent_identifier = identifiers.parse_persistent_identifier(
swh_persistent_id)
parent_revision = persistent_identifier.object_id
**deposit** (Dict): deposit information relevant to build the revision
(author_date, committer_date, etc...)
data['revision']['parents'] = [parent_revision]
"""
raw_metadata = self._metadata_get(deposit)
author_date: Optional[dict]
commit_date: Optional[dict]
if raw_metadata:
metadata_tree = ElementTree.fromstring(raw_metadata)
author_date, commit_date = self._parse_dates(deposit, metadata_tree)
release_notes_elements = metadata_tree.findall(
"codemeta:releaseNotes", namespaces=NAMESPACES
)
else:
author_date = commit_date = None
release_notes_elements = []
if deposit.parent and deposit.parent.swhid:
parent_swhid = deposit.parent.swhid
assert parent_swhid is not None
swhid = CoreSWHID.from_string(parent_swhid)
parent_revision = hash_to_hex(swhid.object_id)
parents = [parent_revision]
else:
parents = []
data['branch_name'] = 'master'
data['origin_metadata'] = {
'provider': self.provider,
'tool': self.tool,
'metadata': metadata
release_notes: Optional[str]
if release_notes_elements:
release_notes = "\n\n".join(
element.text for element in release_notes_elements if element.text
)
else:
release_notes = None
return {
"origin": {"type": "deposit", "url": deposit.origin_url},
"provider": {
"provider_name": deposit.client.last_name,
"provider_url": deposit.client.provider_url,
"provider_type": MetadataAuthorityType.DEPOSIT_CLIENT.value,
"metadata": {},
},
"tool": self.tool,
"raw_metadata": raw_metadata,
"deposit": {
"id": deposit.id,
"client": deposit.client.username,
"collection": deposit.collection.name,
"author": SWH_PERSON,
"author_date": author_date,
"committer": SWH_PERSON,
"committer_date": commit_date,
"revision_parents": parents,
"release_notes": release_notes,
},
}
return data
def process_get(self, req, collection_name, deposit_id):
deposit = Deposit.objects.get(pk=deposit_id)
def process_get(
self, request, collection_name: str, deposit: Deposit
) -> Tuple[int, Dict, str]:
data = self.metadata_read(deposit)
d = {}
if data:
d = json.dumps(data)
return status.HTTP_200_OK, d, 'application/json'
return status.HTTP_200_OK, data if data else {}, "application/json"
# Copyright (C) 2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Tuple
from rest_framework import status
from swh.deposit.api.common import APIGet
from swh.deposit.api.private import APIPrivateView
from swh.deposit.api.utils import DepositSerializer
from swh.deposit.models import Deposit
from swh.deposit.utils import get_releases
class APIReleases(APIPrivateView, APIGet):
"""Deposit request class to list releases related to a deposit.
HTTP verbs supported: GET
"""
def process_get(
self, request, collection_name: str, deposit: Deposit
) -> Tuple[int, Any, str]:
"""Create a list of releases related to the ``deposit``.
Args:
request (Request):
collection_name: Collection owning the deposit
deposit: Deposit concerned by the reading
Returns:
Tuple status, a list of deposits as dicts (sorted by increasing date),
content-type
"""
releases = DepositSerializer(get_releases(deposit), many=True)
return status.HTTP_200_OK, releases.data, "application/json"
# Copyright (C) 2017-2018 The Software Heritage developers
# Copyright (C) 2017-2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from rest_framework.parsers import JSONParser
from swh.model.identifiers import (
persistent_identifier, REVISION, DIRECTORY
from swh.deposit.api.common import APIPut, ParsedRequestHeaders
from swh.deposit.api.private import APIPrivateView
from swh.deposit.errors import BAD_REQUEST, DepositError
from swh.deposit.models import (
DEPOSIT_STATUS_DETAIL,
DEPOSIT_STATUS_LOAD_SUCCESS,
DEPOSIT_STATUS_VERIFIED,
Deposit,
)
from swh.model.hashutil import hash_to_bytes
from swh.model.swhids import CoreSWHID, ObjectType, QualifiedSWHID
from swh.scheduler.utils import create_oneshot_task
from ..common import SWHPutDepositAPI, SWHPrivateAPIView
from ...errors import make_error_dict, BAD_REQUEST
from ...models import Deposit, DEPOSIT_STATUS_DETAIL
from ...models import DEPOSIT_STATUS_LOAD_SUCCESS
MANDATORY_KEYS = ["origin_url", "release_id", "directory_id", "snapshot_id"]
class SWHUpdateStatusDeposit(SWHPutDepositAPI, SWHPrivateAPIView):
class APIUpdateStatus(APIPrivateView, APIPut):
"""Deposit request class to update the deposit's status.
HTTP verbs supported: PUT
"""
parser_classes = (JSONParser, )
def additional_checks(self, req, headers, collection_name,
deposit_id=None):
parser_classes = (JSONParser,)
def additional_checks(
self, request, headers: ParsedRequestHeaders, collection_name, deposit=None
):
"""Enrich existing checks to the default ones.
New checks:
- Ensure the status is provided
- Ensure it exists
- no missing information on load success update
"""
data = req.data
status = data.get('status')
data = request.data
status = data.get("status")
if not status:
msg = 'The status key is mandatory with possible values %s' % list(
DEPOSIT_STATUS_DETAIL.keys())
return make_error_dict(BAD_REQUEST, msg)
msg = "The status key is mandatory with possible values %s" % list(
DEPOSIT_STATUS_DETAIL.keys()
)
raise DepositError(BAD_REQUEST, msg)
if status not in DEPOSIT_STATUS_DETAIL:
msg = 'Possible status in %s' % list(DEPOSIT_STATUS_DETAIL.keys())
return make_error_dict(BAD_REQUEST, msg)
msg = "Possible status in %s" % list(DEPOSIT_STATUS_DETAIL.keys())
raise DepositError(BAD_REQUEST, msg)
if status == DEPOSIT_STATUS_LOAD_SUCCESS:
swh_id = data.get('revision_id')
if not swh_id:
msg = 'Updating status to %s requires a revision_id key' % (
status, )
return make_error_dict(BAD_REQUEST, msg)
missing_keys = []
for key in MANDATORY_KEYS:
value = data.get(key)
if value is None:
missing_keys.append(key)
if missing_keys:
msg = (
f"Updating deposit status to {status}"
f" requires information {','.join(missing_keys)}"
)
raise DepositError(BAD_REQUEST, msg)
return {}
def restrict_access(self, req, deposit=None):
"""Remove restriction modification to 'partial' deposit.
Update is possible regardless of the existing status.
"""
return None
def process_put(self, req, headers, collection_name, deposit_id):
"""Update the deposit's status
def process_put(
self,
request,
headers: ParsedRequestHeaders,
collection_name: str,
deposit: Deposit,
) -> None:
"""Update the deposit with status, SWHIDs and release infos.
Returns:
204 No content
400 Bad request if checks fail
"""
deposit = Deposit.objects.get(pk=deposit_id)
deposit.status = req.data['status'] # checks already done before
data = request.data
origin_url = req.data.get('origin_url')
dir_id = req.data.get('directory_id')
if dir_id:
deposit.swh_id = persistent_identifier(DIRECTORY, dir_id)
deposit.swh_id_context = persistent_identifier(
DIRECTORY, dir_id, metadata={'origin': origin_url})
rev_id = req.data.get('revision_id')
if rev_id:
deposit.swh_anchor_id = persistent_identifier(
REVISION, rev_id)
deposit.swh_anchor_id_context = persistent_identifier(
REVISION, rev_id, metadata={'origin': origin_url})
status = data["status"]
deposit.status = status
if status == DEPOSIT_STATUS_LOAD_SUCCESS:
origin_url = data["origin_url"]
directory_id = data["directory_id"]
release_id = data["release_id"]
dir_id = CoreSWHID(
object_type=ObjectType.DIRECTORY, object_id=hash_to_bytes(directory_id)
)
snp_id = CoreSWHID(
object_type=ObjectType.SNAPSHOT,
object_id=hash_to_bytes(data["snapshot_id"]),
)
rel_id = CoreSWHID(
object_type=ObjectType.RELEASE, object_id=hash_to_bytes(release_id)
)
deposit.swhid = str(dir_id)
# new id with contextual information
deposit.swhid_context = str(
QualifiedSWHID(
object_type=ObjectType.DIRECTORY,
object_id=hash_to_bytes(directory_id),
origin=origin_url,
visit=snp_id,
anchor=rel_id,
path="/",
)
)
elif (
status == DEPOSIT_STATUS_VERIFIED
and not deposit.load_task_id
and self.config["checks"]
):
# Deposit ok, then we schedule the deposit loading task (if not already done)
url = deposit.origin_url
task = create_oneshot_task(
"load-deposit", url=url, deposit_id=deposit.id, retries_left=3
)
load_task_id = self.scheduler.create_tasks([task])[0].id
deposit.load_task_id = str(load_task_id)
if "status_detail" in data:
deposit.status_detail = data["status_detail"]
deposit.save()
return {}
# Copyright (C) 2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import List, Tuple
from django.db.models import FileField
from django.http.request import HttpRequest
from rest_framework import status
from swh.deposit.api.common import APIGet
from swh.deposit.api.private import APIPrivateView, DepositReadMixin
from swh.deposit.config import ARCHIVE_TYPE
from swh.deposit.models import Deposit
class APIUploadURLs(APIPrivateView, APIGet, DepositReadMixin):
"""
Private API endpoint returning a list of URLs for downloading
tarballs uploaded with a deposit request.
Only GET is supported.
"""
@classmethod
def _get_archive_url(cls, archive: FileField, request: HttpRequest) -> str:
url = archive.storage.url(archive.name)
if url.startswith("/"):
url = request.build_absolute_uri(url)
return url
def process_get(
self, request: HttpRequest, collection_name: str, deposit: Deposit
) -> Tuple[int, List[str], str]:
"""
Returns list of URLs for downloading tarballs uploaded with
a deposit request.
Args:
request: input HTTP request
collection_name: Collection owning the deposit
deposit: Deposit to get tarball download URLs
Returns:
Tuple status, list of URLs, content-type
"""
upload_urls = [
self._get_archive_url(r.archive, request)
# ensure that tarball URLs are sorted in ascending order of their upload
# dates as tarball contents will be aggregated into a single tarball by the
# deposit loader and the files they contain can overlap
for r in reversed(
list(self._deposit_requests(deposit, request_type=ARCHIVE_TYPE))
)
]
return status.HTTP_200_OK, upload_urls, "application/json"
# Copyright (C) 2017-2018 The Software Heritage developers
# Copyright (C) 2017-2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.conf.urls import url
from django.urls import path
from django.urls import re_path as url
from ...config import (
PRIVATE_GET_RAW_CONTENT, PRIVATE_PUT_DEPOSIT, PRIVATE_GET_DEPOSIT_METADATA,
PRIVATE_CHECK_DEPOSIT, PRIVATE_LIST_DEPOSITS
from swh.deposit.api.private.deposit_list import APIList, deposit_list_datatables
from swh.deposit.api.private.deposit_read import APIReadArchives, APIReadMetadata
from swh.deposit.api.private.deposit_releases import APIReleases
from swh.deposit.api.private.deposit_update_status import APIUpdateStatus
from swh.deposit.api.private.deposit_upload_urls import APIUploadURLs
from swh.deposit.config import (
PRIVATE_GET_DEPOSIT_METADATA,
PRIVATE_GET_RAW_CONTENT,
PRIVATE_GET_RELEASES,
PRIVATE_GET_UPLOAD_URLS,
PRIVATE_LIST_DEPOSITS,
PRIVATE_LIST_DEPOSITS_DATATABLES,
PRIVATE_PUT_DEPOSIT,
)
from .deposit_read import SWHDepositReadArchives
from .deposit_read import SWHDepositReadMetadata
from .deposit_update_status import SWHUpdateStatusDeposit
from .deposit_check import SWHChecksDeposit
from .deposit_list import DepositList
urlpatterns = [
# Retrieve deposit's raw archives' content
# -> GET
url(r'^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/raw/$',
SWHDepositReadArchives.as_view(),
name=PRIVATE_GET_RAW_CONTENT),
url(
r"^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/raw/$",
APIReadArchives.as_view(),
name=PRIVATE_GET_RAW_CONTENT,
),
# Update deposit's status
# -> PUT
url(r'^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/update/$',
SWHUpdateStatusDeposit.as_view(),
name=PRIVATE_PUT_DEPOSIT),
url(
r"^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/update/$",
APIUpdateStatus.as_view(),
name=PRIVATE_PUT_DEPOSIT,
),
# Retrieve metadata information on a specific deposit
# -> GET
url(r'^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/meta/$',
SWHDepositReadMetadata.as_view(),
name=PRIVATE_GET_DEPOSIT_METADATA),
# Check archive and metadata information on a specific deposit
url(
r"^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/meta/$",
APIReadMetadata.as_view(),
name=PRIVATE_GET_DEPOSIT_METADATA,
),
# Retrieve deposit's raw archives' content
# -> GET
url(
r"^(?P<deposit_id>[^/]+)/raw/$",
APIReadArchives.as_view(),
name=PRIVATE_GET_RAW_CONTENT + "-nc",
),
# Update deposit's status
# -> PUT
url(
r"^(?P<deposit_id>[^/]+)/update/$",
APIUpdateStatus.as_view(),
name=PRIVATE_PUT_DEPOSIT + "-nc",
),
# Retrieve metadata information on a specific deposit
# -> GET
url(
r"^(?P<deposit_id>[^/]+)/meta/$",
APIReadMetadata.as_view(),
name=PRIVATE_GET_DEPOSIT_METADATA + "-nc",
),
url(r"^deposits/$", APIList.as_view(), name=PRIVATE_LIST_DEPOSITS),
url(
r"^deposits/datatables/$",
deposit_list_datatables,
name=PRIVATE_LIST_DEPOSITS_DATATABLES,
),
# Retrieve all releases for a specific deposit
# -> GET
path(
"<int:deposit_id>/releases/",
APIReleases.as_view(),
name=PRIVATE_GET_RELEASES,
),
# Retrieve download URLs for the tarballs uploaded with a deposit
# -> GET
url(r'^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/check/$',
SWHChecksDeposit.as_view(),
name=PRIVATE_CHECK_DEPOSIT),
url(r'^deposits/$', DepositList.as_view(),
name=PRIVATE_LIST_DEPOSITS)
path(
"<int:deposit_id>/upload-urls/",
APIUploadURLs.as_view(),
name=PRIVATE_GET_UPLOAD_URLS,
),
]
# Copyright (C) 2017-2019 The Software Heritage developers
# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -6,28 +6,37 @@
from django.shortcuts import render
from django.urls import reverse
from .common import SWHBaseDeposit, ACCEPT_PACKAGINGS
from .common import ACCEPT_ARCHIVE_CONTENT_TYPES
from ..config import COL_IRI
from ..models import DepositClient, DepositCollection
from swh.deposit.api.common import (
ACCEPT_ARCHIVE_CONTENT_TYPES,
ACCEPT_PACKAGINGS,
APIBase,
)
from swh.deposit.config import COL_IRI
from swh.deposit.models import DepositClient, DepositCollection
class SWHServiceDocument(SWHBaseDeposit):
def get(self, req, *args, **kwargs):
client = DepositClient.objects.get(username=req.user)
class ServiceDocumentAPI(APIBase):
def get(self, request, *args, **kwargs):
if isinstance(request.user, DepositClient):
client = request.user
else:
client = DepositClient.objects.get(username=request.user)
collections = {}
for col_id in client.collections:
col = DepositCollection.objects.get(pk=col_id)
col_uri = req.build_absolute_uri(reverse(COL_IRI, args=[col.name]))
col_uri = request.build_absolute_uri(reverse(COL_IRI, args=[col.name]))
collections[col.name] = col_uri
context = {
'max_upload_size': self.config['max_upload_size'],
'accept_packagings': ACCEPT_PACKAGINGS,
'accept_content_types': ACCEPT_ARCHIVE_CONTENT_TYPES,
'collections': collections,
"max_upload_size": self.config["max_upload_size"],
"accept_packagings": ACCEPT_PACKAGINGS,
"accept_content_types": ACCEPT_ARCHIVE_CONTENT_TYPES,
"collections": collections,
}
return render(req, 'deposit/service_document.xml',
context, content_type='application/xml')
return render(
request,
"deposit/service_document.xml",
context,
content_type="application/xml",
)
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.http import HttpResponse
from django.shortcuts import render
from rest_framework import status
from swh.deposit.api.common import APIBase, get_deposit_by_id
from swh.deposit.api.converters import convert_status_detail
from swh.deposit.models import DEPOSIT_STATUS_DETAIL
class StateAPI(APIBase):
"""Deposit status.
What's known as 'State-IRI' in the sword specification.
HTTP verbs supported: GET
"""
def get( # type: ignore
self, req, collection_name: str, deposit_id: int
) -> HttpResponse:
deposit = get_deposit_by_id(deposit_id, collection_name)
self.checks(req, collection_name, deposit)
status_detail = convert_status_detail(deposit.status_detail)
if not status_detail:
status_detail = DEPOSIT_STATUS_DETAIL[deposit.status]
context = {
"deposit_id": deposit.id,
"status_detail": status_detail,
}
keys = (
"status",
"swhid",
"swhid_context",
"external_id",
"origin_url",
)
for k in keys:
context[k] = getattr(deposit, k, None)
return render(
req,
"deposit/state.xml",
context=context,
content_type="application/xml",
status=status.HTTP_200_OK,
)
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Optional, Tuple
from rest_framework import status
from swh.deposit.api.common import APIPost, ParsedRequestHeaders, Receipt
from swh.deposit.config import EDIT_IRI, EM_IRI
from swh.deposit.models import Deposit
from swh.deposit.parsers import SWHAtomEntryParser, SWHMultiPartParser
from swh.storage import get_storage
from swh.storage.interface import StorageInterface
class SwordEditAPI(APIPost):
"""Deposit request class defining api endpoints for sword deposit.
What's known as 'SE-IRI' in the sword specification.
HTTP verbs supported: POST
"""
parser_classes = (SWHMultiPartParser, SWHAtomEntryParser)
def __init__(self):
super().__init__()
self.storage_metadata: StorageInterface = get_storage(
**self.config["storage_metadata"]
)
def process_post(
self,
request,
headers: ParsedRequestHeaders,
collection_name: str,
deposit: Optional[Deposit] = None,
) -> Tuple[int, str, Receipt]:
"""Add new metadata/archive to existing deposit.
This allows the following scenarios to occur:
- multipart: Add new metadata and archive to a deposit in status partial with
the provided ones.
- empty atom: Allows to finalize a deposit in status partial (transition to
deposited).
source:
- http://swordapp.github.io/SWORDv2-Profile/SWORDProfile.html#protocoloperations_addingcontent_metadata
- http://swordapp.github.io/SWORDv2-Profile/SWORDProfile.html#protocoloperations_addingcontent_multipart
- http://swordapp.github.io/SWORDv2-Profile/SWORDProfile.html#continueddeposit_complete
Returns:
In optimal case for a multipart and atom-entry update, a
201 Created response. The body response will hold a
deposit. And the response headers will contain an entry
'Location' with the EM-IRI.
For the empty post case, this returns a 200.
""" # noqa
assert deposit is not None
if request.content_type.startswith("multipart/"):
receipt = self._multipart_upload(
request, headers, collection_name, deposit=deposit
)
return (status.HTTP_201_CREATED, EM_IRI, receipt)
content_length = headers.content_length or 0
if content_length == 0 and headers.in_progress is False:
# check for final empty post
receipt = self._empty_post(request, headers, collection_name, deposit)
return (status.HTTP_200_OK, EDIT_IRI, receipt)
receipt = self._atom_entry(request, headers, collection_name, deposit=deposit)
return (status.HTTP_201_CREATED, EM_IRI, receipt)
# Copyright (C) 2017-2018 The Software Heritage developers
# Copyright (C) 2017-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""swh URL Configuration
"""SWH's deposit api URL Configuration"""
"""
from django.shortcuts import render
from django.urls import re_path as url
from django.conf.urls import url
from swh.deposit.api.collection import CollectionAPI
from swh.deposit.api.content import ContentAPI
from swh.deposit.api.edit import EditAPI
from swh.deposit.api.edit_media import EditMediaAPI
from swh.deposit.api.service_document import ServiceDocumentAPI
from swh.deposit.api.state import StateAPI
from swh.deposit.api.sword_edit import SwordEditAPI
from swh.deposit.config import (
COL_IRI,
CONT_FILE_IRI,
EDIT_IRI,
EM_IRI,
SD_IRI,
SE_IRI,
STATE_IRI,
)
from ..config import EDIT_SE_IRI, EM_IRI, CONT_FILE_IRI
from ..config import SD_IRI, COL_IRI, STATE_IRI
from .deposit import SWHDeposit
from .deposit_status import SWHDepositStatus
from .deposit_update import SWHUpdateMetadataDeposit
from .deposit_update import SWHUpdateArchiveDeposit
from .deposit_content import SWHDepositContent
from .service_document import SWHServiceDocument
urlpatterns = [
# PUBLIC API
def api_view(req):
return render(req, "api.html")
# PUBLIC API
urlpatterns = [
# simple view on the api
url(r"^$", api_view, name="api"),
# SD IRI - Service Document IRI
# -> GET
url(r'^servicedocument/', SWHServiceDocument.as_view(),
name=SD_IRI),
# Col IRI - Collection IRI
url(r"^servicedocument/", ServiceDocumentAPI.as_view(), name=SD_IRI),
# Col-IRI - Collection IRI
# -> POST
url(r'^(?P<collection_name>[^/]+)/$', SWHDeposit.as_view(),
name=COL_IRI),
url(r"^(?P<collection_name>[^/]+)/$", CollectionAPI.as_view(), name=COL_IRI),
# EM IRI - Atom Edit Media IRI (update archive IRI)
# -> PUT (update-in-place existing archive)
# -> POST (add new archive)
url(r'^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/media/$',
SWHUpdateArchiveDeposit.as_view(),
name=EM_IRI),
url(
r"^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/media/$",
EditMediaAPI.as_view(),
name=EM_IRI,
),
# Edit IRI - Atom Entry Edit IRI (update metadata IRI)
# SE IRI - Sword Edit IRI ;; possibly same as Edit IRI
# -> PUT (update in place)
# -> DELETE (delete container)
url(
r"^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/atom/$",
EditAPI.as_view(),
name=EDIT_IRI,
),
# SE IRI - Sword Edit IRI ;; possibly same as Edit IRI
# -> POST (add new metadata)
url(r'^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/metadata/$',
SWHUpdateMetadataDeposit.as_view(),
name=EDIT_SE_IRI),
url(
r"^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/metadata/$",
SwordEditAPI.as_view(),
name=SE_IRI,
),
# State IRI
# -> GET
url(r'^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/status/$',
SWHDepositStatus.as_view(),
name=STATE_IRI),
# Cont/File IRI
url(
r"^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/status/$",
StateAPI.as_view(),
name=STATE_IRI,
),
# Cont-IRI
# -> GET
url(r'^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/content/$',
SWHDepositContent.as_view(),
name=CONT_FILE_IRI), # specification is not clear about
# FILE-IRI, we assume it's the same as
# the CONT-IRI one
url(
r"^(?P<collection_name>[^/]+)/(?P<deposit_id>[^/]+)/content/$",
ContentAPI.as_view(),
name=CONT_FILE_IRI,
), # specification is not clear about
# File-IRI, we assume it's the same as
# the Cont-IRI one
]
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from rest_framework import serializers
from rest_framework.fields import _UnvalidatedField
from rest_framework.pagination import PageNumberPagination
from swh.deposit.api.converters import convert_status_detail
from swh.deposit.models import Deposit
class DefaultPagination(PageNumberPagination):
page_size = 100
page_size_query_param = "page_size"
class StatusDetailField(_UnvalidatedField):
"""status_detail field is a dict, we want a simple message instead.
So, we reuse the convert_status_detail from deposit_status
endpoint to that effect.
"""
def to_representation(self, value):
return convert_status_detail(value)
class DepositSerializer(serializers.ModelSerializer):
status_detail = StatusDetailField()
raw_metadata = _UnvalidatedField()
class Meta:
model = Deposit
fields = "__all__"
......@@ -7,10 +7,5 @@ from django.apps import AppConfig
class DepositConfig(AppConfig):
name = 'swh.deposit'
def ready(self):
super().ready()
# install the signal permitting to trigger the status' check
from .signals import post_deposit_save # noqa
name = "swh.deposit"
label = "deposit"
# Copyright (C) 2017 The Software Heritage developers
# Copyright (C) 2017-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from typing import Optional
from django.core.cache import cache
from django.utils import timezone
from rest_framework import status
from rest_framework.authentication import BasicAuthentication
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.permissions import BasePermission
from sentry_sdk import capture_exception
from swh.auth.django.models import OIDCUser
from swh.auth.django.utils import oidc_user_from_profile
from swh.auth.keycloak import (
KeycloakError,
KeycloakOpenIDConnect,
keycloak_error_message,
)
from swh.deposit.errors import UNAUTHORIZED, make_error_response
from swh.deposit.models import DepositClient
logger = logging.getLogger(__name__)
from .errors import UNAUTHORIZED, make_error_response
OIDC_DEPOSIT_CLIENT_ID = "swh-deposit"
DEPOSIT_PERMISSION = "swh.deposit.api"
def convert_response(request, content):
"""Convert response from drf's basic authentication mechanism to a
swh-deposit one.
swh-deposit one.
Args:
request (Request): Use to build the response
content (bytes): The drf's answer
Args:
request (Request): Use to build the response
content (bytes): The drf's answer
Returns:
Returns:
Response with the same status error as before, only the
body is now an swh-deposit compliant one.
Response with the same status error as before, only the
body is now an swh-deposit compliant one.
"""
from json import loads
content = loads(content.decode('utf-8'))
detail = content.get('detail')
content = loads(content.decode("utf-8"))
detail = content.get("detail")
if detail:
verbose_description = 'API is protected by basic authentication'
verbose_description = "API is protected by basic authentication"
else:
detail = 'API is protected by basic authentication'
detail = "API is protected by basic authentication"
verbose_description = None
response = make_error_response(
request,
UNAUTHORIZED,
summary=detail,
verbose_description=verbose_description)
response['WWW-Authenticate'] = 'Basic realm=""'
request, UNAUTHORIZED, summary=detail, verbose_description=verbose_description
)
response["WWW-Authenticate"] = 'Basic realm=""'
return response
class WrapBasicAuthenticationResponseMiddleware:
"""Middleware to capture potential authentication error and convert
them to standard deposit response.
them to standard deposit response.
This is to be installed in django's settings.py module.
This is to be installed in django's settings.py module.
"""
def __init__(self, get_response):
super().__init__()
self.get_response = get_response
......@@ -57,8 +79,98 @@ class WrapBasicAuthenticationResponseMiddleware:
response = self.get_response(request)
if response.status_code is status.HTTP_401_UNAUTHORIZED:
content_type = response._headers.get('content-type')
if content_type == ('Content-Type', 'application/json'):
content_type = response.get("content-type")
if content_type == "application/json":
return convert_response(request, response.content)
return response
class HasDepositPermission(BasePermission):
"""Allows access to authenticated users with the DEPOSIT_PERMISSION."""
def has_permission(self, request, view):
assert isinstance(request.user, DepositClient)
return request.user.oidc_user.has_perm(DEPOSIT_PERMISSION)
class KeycloakBasicAuthentication(BasicAuthentication):
"""Keycloack authentication against username/password.
Deposit users will continue sending `Basic authentication` queries to the deposit
server. Transparently, the deposit server will stop authenticate itself the users.
It will delegate the authentication queries to the keycloak instance.
Technically, reuses :class:`rest_framework.BasicAuthentication` and overrides the
func:`authenticate_credentials` method to discuss with keycloak.
As an implementation detail, this also uses the django cache mechanism to avoid too
many authentication request to keycloak.
"""
_client: Optional[KeycloakOpenIDConnect] = None
@property
def client(self):
if self._client is None:
self._client = KeycloakOpenIDConnect.from_configfile(
client_id=OIDC_DEPOSIT_CLIENT_ID
)
return self._client
def _cache_key(self, user_id: str) -> str:
"""Internal key to use to store user id token."""
return f"oidc_user_{self.client.realm_name}_{self.client.client_id}_{user_id}"
def get_user(self, user_id: str) -> Optional[OIDCUser]:
"""Retrieve user from cache if any."""
oidc_profile = cache.get(self._cache_key(user_id))
if oidc_profile:
try:
return oidc_user_from_profile(self.client, oidc_profile)
except Exception as e:
logger.warning("Error during cache token retrieval: %s", e)
capture_exception(e)
return None
def authenticate_credentials(self, user_id, password, request):
"""Authenticate the user_id/password against keycloak.
Raises:
AuthenticationFailed in case of authentication failure
Returns:
Tuple of deposit_client, None.
"""
try:
oidc_profile = self.client.login(user_id, password)
except KeycloakError as e:
logger.debug("KeycloakError: e: %s", e)
error_msg = keycloak_error_message(e)
raise AuthenticationFailed(error_msg)
oidc_user = oidc_user_from_profile(self.client, oidc_profile)
ttl = int(oidc_user.refresh_expires_at.timestamp() - timezone.now().timestamp())
# Making sure the associated deposit client is correctly configured in backend
try:
deposit_client = DepositClient.objects.get(username=user_id)
except DepositClient.DoesNotExist:
raise AuthenticationFailed(f"Unknown user {user_id}")
if not deposit_client.is_active:
raise AuthenticationFailed(f"Deactivated user {user_id}")
deposit_client.oidc_user = oidc_user
if ttl:
# cache the oidc_profile user while it's valid
cache.set(
self._cache_key(user_id),
oidc_profile,
timeout=max(0, ttl),
)
return (deposit_client, None)
......@@ -3,36 +3,41 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import click
import logging
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import click
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
logger = logging.getLogger(__name__)
@click.group(context_settings=CONTEXT_SETTINGS)
@swh_cli_group.group(context_settings=CONTEXT_SETTINGS)
@click.pass_context
def deposit(ctx):
"""Deposit main command
"""
"""Deposit main command"""
ctx.ensure_object(dict)
log_level = ctx.obj.get('log_level', logging.INFO)
log_level = ctx.obj.get("log_level", logging.INFO)
logger.setLevel(log_level)
def main():
logging.basicConfig()
return deposit(auto_envvar_prefix='SWH_DEPOSIT')
return deposit(auto_envvar_prefix="SWH_DEPOSIT")
# These import statements MUST be executed after defining the 'deposit' group
# since the subcommands in these are defined using this 'deposit' group.
from . import client # noqa
from swh.deposit.cli import client # noqa
try:
from . import admin # noqa
from swh.deposit.cli import admin # noqa
except ImportError: # server part is optional
logger.debug('admin subcommand not loaded')
logger.debug("admin subcommand not loaded")
if __name__ == '__main__':
if __name__ == "__main__":
main()
# Copyright (C) 2017-2019 The Software Heritage developers
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
from __future__ import annotations
from typing import TYPE_CHECKING
import click
from swh.deposit.config import setup_django_for
from swh.deposit.cli import deposit
if TYPE_CHECKING:
from swh.deposit.models import DepositCollection
@deposit.group('admin')
@click.option('--config-file', '-C', default=None,
type=click.Path(exists=True, dir_okay=False,),
help="Optional extra configuration file.")
@click.option('--platform', default='development',
type=click.Choice(['development', 'production']),
help='development or production platform')
@deposit.group("admin")
@click.option(
"--config-file",
"-C",
default=None,
type=click.Path(
exists=True,
dir_okay=False,
),
help="Optional extra configuration file.",
)
@click.option(
"--platform",
default="development",
type=click.Choice(["development", "production"]),
help="development or production platform",
)
@click.pass_context
def admin(ctx, config_file, platform):
def admin(ctx, config_file: str, platform: str):
"""Server administration tasks (manipulate user or collections)"""
from swh.deposit.config import setup_django_for
# configuration happens here
setup_django_for(platform, config_file=config_file)
@admin.group('user')
@admin.group("user")
@click.pass_context
def user(ctx):
"""Manipulate user."""
......@@ -31,15 +51,14 @@ def user(ctx):
pass
def _create_collection(name):
def _create_collection(name: str) -> DepositCollection:
"""Create the collection with name if it does not exist.
Args:
name (str): collection's name
name: collection name
Returns:
collection (DepositCollection): the existing collection object
(created or not)
collection: the existing collection object
"""
# to avoid loading too early django namespaces
......@@ -47,33 +66,41 @@ def _create_collection(name):
try:
collection = DepositCollection.objects.get(name=name)
click.echo('Collection %s exists, nothing to do.' % name)
click.echo(f"Collection '{name}' exists, skipping.")
except DepositCollection.DoesNotExist:
click.echo('Create new collection %s' % name)
click.echo(f"Create collection '{name}'.")
collection = DepositCollection.objects.create(name=name)
click.echo('Collection %s created' % name)
click.echo(f"Collection '{name}' created.")
return collection
@user.command('create')
@click.option('--username', required=True, help="User's name")
@click.option('--password', required=True,
help="Desired user's password (plain).")
@click.option('--firstname', default='', help="User's first name")
@click.option('--lastname', default='', help="User's last name")
@click.option('--email', default='', help="User's email")
@click.option('--collection', help="User's collection")
@click.option('--provider-url', default='', help="Provider URL")
@click.option('--domain', help="The domain")
@user.command("create")
@click.option("--username", required=True, help="User's name")
@click.option("--password", help="(Deprecated) Desired user password (plain).")
@click.option("--firstname", default="", help="User's first name")
@click.option("--lastname", default="", help="User's last name")
@click.option("--email", default="", help="User's email")
@click.option("--collection", help="User's collection")
@click.option("--provider-url", default="", help="Provider URL")
@click.option("--domain", default="", help="The domain")
@click.pass_context
def user_create(ctx, username, password, firstname, lastname, email,
collection, provider_url, domain):
def user_create(
ctx,
username: str,
password: str,
firstname: str,
lastname: str,
email: str,
collection: str,
provider_url: str,
domain: str,
):
"""Create a user with some needed information (password, collection)
If the collection does not exist, the collection is then created
alongside.
The password is stored encrypted using django's utilies.
The password is stored encrypted using django's utilities.
"""
# to avoid loading too early django namespaces
......@@ -82,22 +109,23 @@ def user_create(ctx, username, password, firstname, lastname, email,
# If collection is not provided, fallback to username
if not collection:
collection = username
click.echo('collection: %s' % collection)
# create the collection if it does not exist
collection = _create_collection(collection)
collection_ = _create_collection(collection)
# user create/update
try:
user = DepositClient.objects.get(username=username)
click.echo('User %s exists, updating information.' % user)
user.set_password(password)
click.echo(f"Update user '{username}'.")
action_done = "updated"
except DepositClient.DoesNotExist:
click.echo('Create new user %s' % username)
user = DepositClient.objects.create_user(
username=username,
password=password)
click.echo(f"Create user '{username}'.")
user = DepositClient(username=username)
user.save()
action_done = "created"
user.collections = [collection.id]
if password:
user.set_password(password)
user.collections = [collection_.id]
user.first_name = firstname
user.last_name = lastname
user.email = email
......@@ -106,89 +134,91 @@ def user_create(ctx, username, password, firstname, lastname, email,
user.domain = domain
user.save()
click.echo('Information registered for user %s' % user)
click.echo(f"User '{username}' {action_done}.")
@user.command('list')
@user.command("list")
@click.pass_context
def user_list(ctx):
"""List existing users.
This entrypoint is not paginated yet as there is not a lot of
entry.
This entrypoint is not paginated yet as there is not a lot of
entry.
"""
# to avoid loading too early django namespaces
from swh.deposit.models import DepositClient
users = DepositClient.objects.all()
if not users:
output = 'Empty user list'
output = "Empty user list"
else:
output = '\n'.join((user.username for user in users))
output = "\n".join((user.username for user in users))
click.echo(output)
@user.command('exists')
@click.argument('username', required=True)
@user.command("exists")
@click.argument("username", required=True)
@click.pass_context
def user_exists(ctx, username):
"""Check if user exists.
"""
def user_exists(ctx, username: str):
"""Check if user exists."""
# to avoid loading too early django namespaces
from swh.deposit.models import DepositClient
try:
DepositClient.objects.get(username=username)
click.echo('User %s exists.' % username)
click.echo(f"User {username} exists.")
ctx.exit(0)
except DepositClient.DoesNotExist:
click.echo('User %s does not exist.' % username)
click.echo(f"User {username} does not exist.")
ctx.exit(1)
@admin.group('collection')
@admin.group("collection")
@click.pass_context
def collection(ctx):
"""Manipulate collections."""
pass
@collection.command('create')
@click.option('--name', required=True, help="Collection's name")
@collection.command("create")
@click.option("--name", required=True, help="Collection's name")
@click.pass_context
def collection_create(ctx, name):
_create_collection(name)
@collection.command('list')
@collection.command("list")
@click.pass_context
def collection_list(ctx):
"""List existing collections.
This entrypoint is not paginated yet as there is not a lot of
entry.
This entrypoint is not paginated yet as there is not a lot of
entry.
"""
# to avoid loading too early django namespaces
from swh.deposit.models import DepositCollection
collections = DepositCollection.objects.all()
if not collections:
output = 'Empty collection list'
output = "Empty collection list"
else:
output = '\n'.join((col.name for col in collections))
output = "\n".join((col.name for col in collections))
click.echo(output)
@admin.group('deposit')
@admin.group("deposit")
@click.pass_context
def deposit(ctx):
def adm_deposit(ctx):
"""Manipulate deposit."""
pass
@deposit.command('reschedule')
@click.option('--deposit-id', required=True, help="Deposit identifier")
@adm_deposit.command("reschedule")
@click.option("--deposit-id", required=True, help="Deposit identifier")
@click.pass_context
def deposit_reschedule(ctx, deposit_id):
def adm_deposit_reschedule(ctx, deposit_id):
"""Reschedule the deposit loading
This will:
......@@ -206,49 +236,54 @@ def deposit_reschedule(ctx, deposit_id):
"""
# to avoid loading too early django namespaces
from datetime import datetime
from swh.deposit.models import Deposit
import datetime
from swh.deposit.config import (
DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_LOAD_FAILURE,
DEPOSIT_STATUS_VERIFIED, SWHDefaultConfig,
DEPOSIT_STATUS_LOAD_FAILURE,
DEPOSIT_STATUS_LOAD_SUCCESS,
DEPOSIT_STATUS_VERIFIED,
APIConfig,
)
from swh.deposit.models import Deposit
try:
deposit = Deposit.objects.get(pk=deposit_id)
except Deposit.DoesNotExist:
click.echo('Deposit %s does not exist.' % deposit_id)
click.echo(f"Deposit {deposit_id} does not exist.")
ctx.exit(1)
# Check the deposit is in a reasonable state
accepted_statuses = [
DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_LOAD_FAILURE
]
accepted_statuses = [DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_LOAD_FAILURE]
if deposit.status == DEPOSIT_STATUS_VERIFIED:
click.echo('Deposit %s\'s status already set for rescheduling.' % (
deposit_id))
click.echo(f"Deposit {deposit_id} already set for rescheduling.")
ctx.exit(0)
if deposit.status not in accepted_statuses:
click.echo('Deposit %s\'s status be one of %s.' % (
deposit_id, ', '.join(accepted_statuses)))
click.echo(
f"Deposit {deposit_id} cannot be rescheduled (status: {deposit.status}).\n"
"Rescheduling deposit is only accepted for deposit with status: "
f"{', '.join(accepted_statuses)}."
)
ctx.exit(1)
task_id = deposit.load_task_id
if not task_id:
click.echo('Deposit %s cannot be rescheduled. It misses the '
'associated task.' % deposit_id)
click.echo(
f"Deposit {deposit_id} cannot be rescheduled. It misses the "
"associated scheduler task id (field load_task_id)."
)
ctx.exit(1)
# Reset the deposit's state
deposit.swh_id = None
deposit.swh_id_context = None
deposit.swh_anchor_id = None
deposit.swh_anchor_id_context = None
deposit.swhid = None
deposit.swhid_context = None
deposit.status = DEPOSIT_STATUS_VERIFIED
deposit.save()
# Trigger back the deposit
scheduler = SWHDefaultConfig().scheduler
# Schedule back the deposit loading task
scheduler = APIConfig().scheduler
scheduler.set_status_tasks(
[task_id], status='next_run_not_scheduled',
next_run=datetime.now())
[task_id],
status="next_run_not_scheduled",
next_run=datetime.datetime.now(tz=datetime.timezone.utc),
)
# Copyright (C) 2017-2019 The Software Heritage developers
# Copyright (C) 2017-2024 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from __future__ import annotations
from contextlib import contextmanager
from datetime import datetime, timezone
import logging
import tempfile
import uuid
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import os
import sys
from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional
import warnings
import xml.etree.ElementTree as ET
import click
import xmltodict
from swh.deposit.client import PublicApiDepositClient
from swh.deposit.cli import deposit
logger = logging.getLogger(__name__)
class InputError(ValueError):
"""Input script error
if TYPE_CHECKING:
from swh.deposit.client import PublicApiDepositClient
"""
pass
class InputError(ValueError):
"""Input script error"""
def generate_slug():
"""Generate a slug (sample purposes).
pass
"""
return str(uuid.uuid4())
@contextmanager
def trap_and_report_exceptions():
"""Trap and report exceptions (InputError, MaintenanceError) in a unified way."""
from swh.deposit.client import MaintenanceError
def generate_metadata_file(name, external_id, authors):
"""Generate a temporary metadata file with the minimum required metadata
try:
yield
except InputError as e:
logger.error("Problem during parsing options: %s", e)
sys.exit(1)
except MaintenanceError as e:
logger.error(e)
sys.exit(1)
This generates a xml file in a temporary location and returns the
path to that file.
This is up to the client of that function to clean up the
temporary file.
def _url(url: str) -> str:
"""Force the /1 api version at the end of the url (avoiding confusing
issues without it).
Args:
name (str): Software's name
external_id (str): External identifier (slug) or generated one
authors (List[str]): List of author names
url (str): api url used by cli users
Returns:
Filepath to the metadata generated file
Top level api url to actually request
"""
_, tmpfile = tempfile.mkstemp(prefix='swh.deposit.cli.')
if not url.endswith("/1"):
url = "%s/1" % url
return url
# generate a metadata file with the minimum required metadata
codemetadata = {
'entry': {
'@xmlns': "http://www.w3.org/2005/Atom",
'@xmlns:codemeta': "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0",
'codemeta:name': name,
'codemeta:identifier': external_id,
'codemeta:author': [{
'codemeta:name': author_name
} for author_name in authors],
},
}
logging.debug('Temporary file: %s', tmpfile)
logging.debug('Metadata dict to generate as xml: %s', codemetadata)
s = xmltodict.unparse(codemetadata, pretty=True)
logging.debug('Metadata dict as xml generated: %s', s)
with open(tmpfile, 'w') as fp:
fp.write(s)
return tmpfile
def generate_metadata(
deposit_client: str,
name: str,
authors: List[str],
external_id: Optional[str] = None,
create_origin: Optional[str] = None,
metadata_provenance_url: Optional[str] = None,
) -> str:
"""Generate sword compliant xml metadata with the minimum required metadata.
The Atom spec, https://tools.ietf.org/html/rfc4287, says that:
- atom:entry elements MUST contain one or more atom:author elements
- atom:entry elements MUST contain exactly one atom:title element.
- atom:entry elements MUST contain exactly one atom:updated element.
However, we are also using CodeMeta, so we want some basic information to be
mandatory.
def _cleanup_tempfile(config):
"""Clean up the temporary metadata file generated.
Therefore, we generate the following mandatory fields:
- http://www.w3.org/2005/Atom#updated
- http://www.w3.org/2005/Atom#author
- http://www.w3.org/2005/Atom#title
- https://doi.org/10.5063/SCHEMA/CODEMETA-2.0#name (yes, in addition to
http://www.w3.org/2005/Atom#title, even if they have somewhat the same
meaning)
- https://doi.org/10.5063/SCHEMA/CODEMETA-2.0#author
Args:
deposit_client: Deposit client username,
name: Software name
authors: List of author names
create_origin: Origin concerned by the deposit
metadata_provenance_url: Provenance metadata url
config (Dict): A configuration dict with 2 important keys for
that routine, 'cleanup_tempfile' (bool) and 'metadata' (path
to eventually clean up)
Returns:
metadata xml string
"""
if config['cleanup_tempfile']:
path = config['metadata']
if os.path.exists(path):
os.unlink(path)
from swh.deposit.utils import NAMESPACES as NS
# generate a metadata file with the minimum required metadata
document = ET.Element(f"{{{NS['atom']}}}entry")
now = datetime.now(tz=timezone.utc)
ET.SubElement(document, f"{{{NS['atom']}}}updated").text = str(now)
ET.SubElement(document, f"{{{NS['atom']}}}author").text = deposit_client
ET.SubElement(document, f"{{{NS['atom']}}}title").text = name
ET.SubElement(document, f"{{{NS['codemeta']}}}name").text = name
for author_name in authors:
author = ET.SubElement(document, f"{{{NS['codemeta']}}}author")
ET.SubElement(author, f"{{{NS['codemeta']}}}name").text = author_name
def _client(url, username, password):
"""Instantiate a client to access the deposit api server
if external_id:
ET.SubElement(document, f"{{{NS['codemeta']}}}identifier").text = external_id
Args:
url (str): Deposit api server
username (str): User
password (str): User's password
swh_deposit_elt = ET.Element(f"{{{NS['swh']}}}deposit")
"""
client = PublicApiDepositClient({
'url': url,
'auth': {
'username': username,
'password': password
},
})
return client
if create_origin:
elt = ET.SubElement(swh_deposit_elt, f"{{{NS['swh']}}}create_origin")
ET.SubElement(elt, f"{{{NS['swh']}}}origin").set("url", create_origin)
if metadata_provenance_url:
elt = ET.SubElement(swh_deposit_elt, f"{{{NS['swh']}}}metadata-provenance")
ET.SubElement(elt, f"{{{NS['schema']}}}url").text = metadata_provenance_url
def _collection(client):
"""Retrieve the client's collection
if len(swh_deposit_elt):
document.append(swh_deposit_elt)
"""
s = ET.tostring(document, encoding="utf-8").decode()
logging.debug("Atom entry dict to generate as xml: %s", s)
return s
def _collection(client: PublicApiDepositClient) -> str:
"""Retrieve the client's collection"""
# retrieve user's collection
sd_content = client.service_document()
if 'error' in sd_content:
raise InputError('Service document retrieval: %s' % (
sd_content['error'], ))
collection = sd_content[
'service']['workspace']['collection']['sword:name']
if "error" in sd_content:
msg = sd_content["error"]
raise InputError(f"Service document retrieval: {msg}")
collection = sd_content["app:service"]["app:workspace"][0]["app:collection"][
"sword:name"
]
return collection
def client_command_parse_input(
username, password, archive, metadata,
archive_deposit, metadata_deposit,
collection, slug, partial, deposit_id, replace,
url, name, authors):
client,
username: str,
archive: Optional[str],
metadata: Optional[str],
collection: Optional[str],
slug: Optional[str],
create_origin: Optional[str],
metadata_provenance_url: Optional[str],
partial: bool,
deposit_id: Optional[int],
swhid: Optional[str],
replace: bool,
url: str,
name: Optional[str],
authors: List[str],
temp_dir: str,
) -> Dict[str, Any]:
"""Parse the client subcommand options and make sure the combination
is acceptable*. If not, an InputError exception is raised
explaining the issue.
......@@ -152,233 +195,441 @@ def client_command_parse_input(
errors are already dealt with by the underlying api client.
Raises:
InputError explaining the issue
InputError explaining the user input related issue
MaintenanceError explaining the api status
Returns:
dict with the following keys:
'archive': the software archive to deposit
'username': username
'password': associated password
'metadata': the metadata file to deposit
'collection': the username's associated client
'slug': the slug or external id identifying the deposit to make
'partial': if the deposit is partial or not
'client': instantiated class
'url': deposit's server main entry point
'deposit_type': deposit's type (binary, multipart, metadata)
'deposit_id': optional deposit identifier
"archive": the software archive to deposit
"username": username
"metadata": the metadata file to deposit
"collection": the user's collection under which to put the deposit
"create_origin": the origin concerned by the deposit
"metadata_provenance_url": the metadata provenance url
"in_progress": if the deposit is partial or not
"url": deposit's server main entry point
"deposit_id": optional deposit identifier
"swhid": optional deposit swhid
"replace": whether the given deposit is to be replaced or not
"""
cleanup_tempfile = False
try:
if archive_deposit and metadata_deposit:
# too many flags use, remove redundant ones (-> multipart deposit)
archive_deposit = False
metadata_deposit = False
if archive and not os.path.exists(archive):
raise InputError('Software Archive %s must exist!' % archive)
if not slug: # generate one as this is mandatory
slug = generate_slug()
if not metadata and name and authors:
metadata = generate_metadata_file(name, slug, authors)
cleanup_tempfile = True
if metadata_deposit:
archive = None
if archive_deposit:
metadata = None
if metadata_deposit and not metadata:
if not metadata:
if name and authors:
metadata_path = os.path.join(temp_dir, "metadata.xml")
logging.debug("Temporary file: %s", metadata_path)
metadata_xml = generate_metadata(
username,
name,
authors,
external_id=slug,
create_origin=create_origin,
metadata_provenance_url=metadata_provenance_url,
)
logging.debug("Metadata xml generated: %s", metadata_xml)
with open(metadata_path, "w") as f:
f.write(metadata_xml)
metadata = metadata_path
elif archive is not None and not partial and not deposit_id:
# If we meet all the following conditions:
# * this is not an archive-only deposit request
# * it is not part of a multipart deposit (either create/update
# or finish)
# * it misses either name or authors
raise InputError(
"For metadata deposit request, either a metadata file with "
"--metadata or both --author and --name must be provided. "
)
elif name or authors:
# If we are generating metadata, then all mandatory metadata
# must be present
raise InputError(
"Metadata deposit filepath must be provided for metadata "
"deposit")
"For metadata deposit request, either a metadata file with "
"--metadata or both --author and --name must be provided."
)
else:
# TODO: this is a multipart deposit, we might want to check that
# metadata are deposited at some point
pass
elif name or authors or create_origin:
raise InputError(
"Using --metadata flag is incompatible with "
"--author and --name and --create-origin (those are used to generate one "
"metadata file)."
)
if not archive and not metadata:
raise InputError(
"Please provide an actionable command. See --help for more information"
)
if metadata:
from xml.etree import ElementTree
from swh.deposit.utils import (
parse_swh_deposit_origin,
parse_swh_metadata_provenance,
)
metadata_tree = ElementTree.fromstring(open(metadata).read())
(create_origin, add_to_origin) = parse_swh_deposit_origin(metadata_tree)
if create_origin and add_to_origin:
logger.error(
"The metadata file provided must not contain both "
'"<swh:create_origin>" and "<swh:add_to_origin>" tags',
)
elif not create_origin and not add_to_origin:
logger.warning(
"The metadata file provided should contain "
'"<swh:create_origin>" or "<swh:add_to_origin>" tag',
)
meta_prov_url = parse_swh_metadata_provenance(metadata_tree)
if not meta_prov_url:
logger.warning(
"The metadata file provided should contain "
'"<swh:metadata-provenance>" tag'
)
if replace and not deposit_id:
raise InputError("To update an existing deposit, you must provide its id")
if not collection:
collection = _collection(client)
if metadata and not os.path.exists(metadata):
raise InputError('Software Archive metadata %s must exist!' % (
metadata, ))
return {
"archive": archive,
"username": username,
"metadata": metadata,
"collection": collection,
"slug": slug,
"in_progress": partial,
"url": url,
"deposit_id": deposit_id,
"swhid": swhid,
"replace": replace,
}
if not archive and not metadata:
raise InputError(
'Please provide an actionable command. See --help for more '
'information.')
if replace and not deposit_id:
raise InputError(
'To update an existing deposit, you must provide its id')
client = _client(url, username, password)
if not collection:
collection = _collection(client)
return {
'archive': archive,
'username': username,
'password': password,
'metadata': metadata,
'cleanup_tempfile': cleanup_tempfile,
'collection': collection,
'slug': slug,
'in_progress': partial,
'client': client,
'url': url,
'deposit_id': deposit_id,
'replace': replace,
}
except Exception: # to be clean, cleanup prior to raise
_cleanup_tempfile({
'cleanup_tempfile': cleanup_tempfile,
'metadata': metadata
})
raise
def _subdict(d, keys):
'return a dict from d with only given keys'
def _subdict(d: Dict[str, Any], keys: Collection[str]) -> Dict[str, Any]:
"return a dict from d with only given keys"
return {k: v for k, v in d.items() if k in keys}
def deposit_create(config, logger):
"""Delegate the actual deposit to the deposit client.
def credentials_decorator(f):
"""Add default --url, --username and --password flag to cli."""
f = click.option(
"--password", required=True, help="(Mandatory) User's associated password"
)(f)
f = click.option("--username", required=True, help="(Mandatory) User's name")(f)
f = click.option(
"--url",
default="https://deposit.softwareheritage.org",
help=(
"(Optional) Deposit server api endpoint. By default, "
"https://deposit.softwareheritage.org/1"
),
)(f)
return f
def output_format_decorator(f):
"""Add --format output flag decorator to cli."""
return click.option(
"-f",
"--format",
"output_format",
default="logging",
type=click.Choice(["logging", "yaml", "json"]),
help="Output format results.",
)(f)
"""
logger.debug('Create deposit')
client = config['client']
keys = ('collection', 'archive', 'metadata', 'slug', 'in_progress')
return client.deposit_create(
**_subdict(config, keys))
@deposit.command()
@credentials_decorator
@click.option(
"--archive",
type=click.Path(exists=True),
help="(Optional) Software archive to deposit",
)
@click.option(
"--metadata",
type=click.Path(exists=True),
help=(
"(Optional) Path to xml metadata file. If not provided, "
"this will use a file named <archive>.metadata.xml"
),
)
@click.option(
"--archive-deposit/--no-archive-deposit",
default=False,
help="Deprecated (ignored)",
)
@click.option(
"--metadata-deposit/--no-metadata-deposit",
default=False,
help="Deprecated (ignored)",
)
@click.option(
"--collection",
help="(Optional) User's collection. If not provided, this will be fetched.",
)
@click.option(
"--slug",
help=(
"(Deprecated) (Optional) External system information identifier. "
"If not provided, it will be generated"
),
)
@click.option(
"--create-origin",
help=(
"(Optional) Origin url to attach information to. To be used alongside "
"--name and --author. This will be generated alongside the metadata to "
"provide to the deposit server."
),
)
@click.option(
"--metadata-provenance-url",
help=(
"(Optional) Provenance metadata url to indicate from where the metadata is "
"coming from."
),
)
@click.option(
"--partial/--no-partial",
default=False,
help=(
"(Optional) The deposit will be partial, other deposits "
"will have to take place to finalize it."
),
)
@click.option(
"--deposit-id",
default=None,
help="(Optional) Update an existing partial deposit with its identifier",
)
@click.option(
"--swhid",
default=None,
help="(Optional) Update existing completed deposit (status done) with new metadata",
)
@click.option(
"--replace/--no-replace",
default=False,
help="(Optional) Update by replacing existing metadata to a deposit",
)
@click.option("--verbose/--no-verbose", default=False, help="Verbose mode")
@click.option("--name", help="Software name")
@click.option(
"--author",
multiple=True,
help="Software author(s), this can be repeated as many times"
" as there are authors",
)
@output_format_decorator
@click.pass_context
def upload(
ctx,
username: str,
password: str,
archive: Optional[str],
metadata: Optional[str],
archive_deposit: bool,
metadata_deposit: bool,
collection: Optional[str],
slug: Optional[str],
create_origin: Optional[str],
metadata_provenance_url: Optional[str],
partial: bool,
deposit_id: Optional[int],
swhid: Optional[str],
replace: bool,
url: str,
verbose: bool,
name: Optional[str],
author: List[str],
output_format: Optional[str],
):
"""Software Heritage Public Deposit Client
Create/Update deposit through the command line.
def deposit_update(config, logger):
"""Delegate the actual deposit to the deposit client.
More documentation can be found at
https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html.
"""
logger.debug('Update deposit')
import tempfile
client = config['client']
keys = ('collection', 'deposit_id', 'archive', 'metadata',
'slug', 'in_progress', 'replace')
return client.deposit_update(
**_subdict(config, keys))
from swh.deposit.client import PublicApiDepositClient
if archive_deposit or metadata_deposit:
warnings.warn(
'"archive_deposit" and "metadata_deposit" option arguments are '
"deprecated and have no effect; simply do not provide the archive "
"for a metadata-only deposit, and do not provide a metadata for a"
"archive-only deposit.",
DeprecationWarning,
)
@deposit.command()
@click.option('--username', required=1,
help="(Mandatory) User's name")
@click.option('--password', required=1,
help="(Mandatory) User's associated password")
@click.option('--archive',
help='(Optional) Software archive to deposit')
@click.option('--metadata',
help="(Optional) Path to xml metadata file. If not provided, this will use a file named <archive>.metadata.xml") # noqa
@click.option('--archive-deposit/--no-archive-deposit', default=False,
help='(Optional) Software archive only deposit')
@click.option('--metadata-deposit/--no-metadata-deposit', default=False,
help='(Optional) Metadata only deposit')
@click.option('--collection',
help="(Optional) User's collection. If not provided, this will be fetched.") # noqa
@click.option('--slug',
help="""(Optional) External system information identifier. If not provided, it will be generated""") # noqa
@click.option('--partial/--no-partial', default=False,
help='(Optional) The deposit will be partial, other deposits will have to take place to finalize it.') # noqa
@click.option('--deposit-id', default=None,
help='(Optional) Update an existing partial deposit with its identifier') # noqa
@click.option('--replace/--no-replace', default=False,
help='(Optional) Update by replacing existing metadata to a deposit') # noqa
@click.option('--url', default='https://deposit.softwareheritage.org/1',
help="(Optional) Deposit server api endpoint. By default, https://deposit.softwareheritage.org/1") # noqa
@click.option('--verbose/--no-verbose', default=False,
help='Verbose mode')
@click.option('--name',
help='Software name')
@click.option('--author', multiple=True,
help='Software author(s), this can be repeated as many times'
' as there are authors')
@click.pass_context
def upload(ctx,
username, password, archive=None, metadata=None,
archive_deposit=False, metadata_deposit=False,
collection=None, slug=None, partial=False, deposit_id=None,
replace=False,
url='https://deposit.softwareheritage.org/1',
verbose=False, name=None, author=None):
"""Software Heritage Public Deposit Client
if slug:
if create_origin and slug != create_origin:
raise InputError(
'"--slug" flag has been deprecated in favor of "--create-origin" flag. '
"You mentioned both with different values, please only "
'use "--create-origin".'
)
warnings.warn(
'"--slug" flag has been deprecated in favor of "--create-origin" flag. '
'Please, start using "--create-origin" instead of "--slug"',
DeprecationWarning,
)
url = _url(url)
client = PublicApiDepositClient(url=url, auth=(username, password))
with tempfile.TemporaryDirectory() as temp_dir:
with trap_and_report_exceptions():
logger.debug("Parsing cli options")
config = client_command_parse_input(
client,
username,
archive,
metadata,
collection,
slug,
create_origin,
metadata_provenance_url,
partial,
deposit_id,
swhid,
replace,
url,
name,
author,
temp_dir,
)
Create/Update deposit through the command line.
if verbose:
logger.info("Parsed configuration: %s", config)
keys = [
"archive",
"collection",
"in_progress",
"metadata",
"slug",
]
if config["deposit_id"]:
keys += ["deposit_id", "replace", "swhid"]
data = client.deposit_update(**_subdict(config, keys))
else:
data = client.deposit_create(**_subdict(config, keys))
More documentation can be found at
https://docs.softwareheritage.org/devel/swh-deposit/getting-started.html.
print_result(data, output_format)
"""
config = {}
try:
logger.debug('Parsing cli options')
config = client_command_parse_input(
username, password, archive, metadata, archive_deposit,
metadata_deposit, collection, slug, partial, deposit_id,
replace, url, name, author)
except InputError as e:
msg = 'Problem during parsing options: %s' % e
r = {
'error': msg,
}
logger.info(r)
return 1
@deposit.command()
@credentials_decorator
@click.option("--deposit-id", default=None, required=True, help="Deposit identifier.")
@output_format_decorator
@click.pass_context
def status(ctx, url, username, password, deposit_id, output_format):
"""Deposit's status"""
from swh.deposit.client import PublicApiDepositClient
url = _url(url)
logger.debug("Status deposit")
with trap_and_report_exceptions():
client = PublicApiDepositClient(url=_url(url), auth=(username, password))
collection = _collection(client)
try:
if verbose:
logger.info("Parsed configuration: %s" % (
config, ))
print_result(
client.deposit_status(collection=collection, deposit_id=deposit_id),
output_format,
)
deposit_id = config['deposit_id']
if deposit_id:
r = deposit_update(config, logger)
else:
r = deposit_create(config, logger)
def print_result(data: Dict[str, Any], output_format: Optional[str]) -> None:
"""Display the result data into a dedicated output format."""
import json
logger.info(r)
import yaml
finally:
_cleanup_tempfile(config)
if output_format == "json":
click.echo(json.dumps(data))
elif output_format == "yaml":
click.echo(yaml.dump(data))
else:
logger.info(data)
@deposit.command()
@click.option('--url', default='https://deposit.softwareheritage.org/1',
help="(Optional) Deposit server api endpoint. By default, "
"https://deposit.softwareheritage.org/1")
@click.option('--username', required=1,
help="(Mandatory) User's name")
@click.option('--password', required=1,
help="(Mandatory) User's associated password")
@click.option('--deposit-id', default=None,
required=1,
help="Deposit identifier.")
@deposit.command("metadata-only")
@credentials_decorator
@click.option(
"--metadata",
"metadata_path",
type=click.Path(exists=True),
required=True,
help="Path to xml metadata file",
)
@output_format_decorator
@click.pass_context
def status(ctx, url, username, password, deposit_id):
"""Deposit's status
"""
logger.debug('Status deposit')
try:
client = _client(url, username, password)
def metadata_only(ctx, url, username, password, metadata_path, output_format):
"""Deposit metadata only upload"""
from xml.etree import ElementTree
from swh.deposit.client import PublicApiDepositClient
from swh.deposit.utils import parse_swh_metadata_provenance, parse_swh_reference
# Parse to check for a swhid presence within the metadata file
with open(metadata_path, "r") as f:
raw_metadata = f.read()
metadata_tree = ElementTree.fromstring(raw_metadata)
actual_swhid = parse_swh_reference(metadata_tree)
if not actual_swhid:
raise InputError("A SWHID must be provided for a metadata-only deposit")
meta_prov_url = parse_swh_metadata_provenance(metadata_tree)
if not meta_prov_url:
logger.warning(
"A '<swh:metadata-provenance>' should be provided for a metadata-only "
"deposit"
)
with trap_and_report_exceptions():
client = PublicApiDepositClient(url=_url(url), auth=(username, password))
collection = _collection(client)
except InputError as e:
msg = 'Problem during parsing options: %s' % e
r = {
'error': msg,
}
logger.info(r)
return 1
r = client.deposit_status(
collection=collection, deposit_id=deposit_id)
logger.info(r)
result = client.deposit_metadata_only(collection, metadata_path)
print_result(result, output_format)
@deposit.command("list")
@credentials_decorator
@output_format_decorator
@click.option(
"--page",
default=1,
help="Page number when requesting more information",
)
@click.option(
"--page-size",
default=100,
help="Page number when requesting more information",
)
@click.pass_context
def deposit_list(ctx, url, username, password, output_format, page, page_size):
"""Client deposit listing"""
from swh.deposit.client import PublicApiDepositClient
url = _url(url)
logger.debug("List deposits for user %s", username)
with trap_and_report_exceptions():
client = PublicApiDepositClient(url=_url(url), auth=(username, password))
collection = _collection(client)
result = client.deposit_list(collection, page=page, page_size=page_size)
print_result(result, output_format)