Skip to content
Snippets Groups Projects
Commit 852fbec9 authored by vlorentz's avatar vlorentz
Browse files

impl origins

parent c2be0e8d
No related branches found
No related tags found
No related merge requests found
Pipeline #13150 failed
......@@ -19,6 +19,7 @@ import itertools
import logging
from typing import Iterator, List
from swh.core.api.classes import stream_results
from swh.model.model import ModelObjectType
from swh.storage import get_storage
from swh.storage.algos.directory import directory_get_many
......@@ -68,6 +69,16 @@ class JournalBackfiller:
journal's reading topic.
"""
if start_partition_id < 0:
raise ValueError("start_partition_id must be a positive number")
if start_partition_id >= end_partition_id:
raise ValueError(
"start_partition_id must be strictly less than end_partition_id"
)
if end_partition_id >= nb_partitions:
raise ValueError(
"end_partition_id must be strictly less than nb_partitions"
)
for partition_id in range(start_partition_id, end_partition_id):
self._backfill_partition(object_type, partition_id, nb_partitions, dry_run)
......@@ -94,12 +105,50 @@ class JournalBackfiller:
)
directory_ids = page.results
objects = list(directory_get_many(self.storage, directory_ids))
elif object_type == ModelObjectType.EXTID:
raise NotImplementedError("extids")
elif object_type == ModelObjectType.ORIGIN:
raise NotImplementedError("origins")
if partition_id != 0:
# partitioning is not yet supported for origins
break
page = self.storage.origin_list(page_token=page_token)
objects = page.results
elif object_type == ModelObjectType.ORIGIN_VISIT:
raise NotImplementedError("origin visits")
if partition_id != 0:
# partitioning is not yet supported for origins
break
page = self.storage.origin_list(page_token=page_token)
origins = page.results
objects = [
visit
for origin in origins
for visit in stream_results(
self.storage.origin_visit_get, origin=origin.url, limit=1000
)
]
elif object_type == ModelObjectType.ORIGIN_VISIT_STATUS:
raise NotImplementedError("origin visit statuses")
if partition_id != 0:
# partitioning is not yet supported for origins
break
page = self.storage.origin_list(page_token=page_token)
origins = page.results
objects = [
visit_status
for origin in origins
for visit in stream_results(
self.storage.origin_visit_get, origin=origin.url
)
for visit_status in stream_results(
self.storage.origin_visit_status_get,
origin=origin.url,
visit=visit.visit,
limit=1000,
)
]
elif object_type == ModelObjectType.METADATA_AUTHORITY:
raise NotImplementedError("metadata authority")
elif object_type == ModelObjectType.METADATA_FETCHER:
raise NotImplementedError("metadata fetcher")
elif object_type == ModelObjectType.RAW_EXTRINSIC_METADATA:
raise NotImplementedError("raw extrinsic metadata")
elif object_type == ModelObjectType.RELEASE:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment