Skip to content
Snippets Groups Projects
Verified Commit 71dac8f5 authored by Antoine R. Dumont's avatar Antoine R. Dumont
Browse files

swh.indexer.rehash: Reschedule when error in processing contents

Related T712
parent f7f056f7
No related branches found
No related tags found
No related merge requests found
......@@ -4,15 +4,17 @@
# See top-level LICENSE file for more information
import logging
import itertools
from collections import defaultdict
from swh.model import hashutil
from swh.core import utils
from swh.core.config import SWHConfig
from swh.storage import get_storage
from swh.model import hashutil
from swh.objstorage import get_objstorage
from swh.objstorage.exc import ObjNotFoundError
from swh.storage import get_storage
from swh.scheduler.utils import get_task
class RecomputeChecksums(SWHConfig):
......@@ -74,6 +76,7 @@ class RecomputeChecksums(SWHConfig):
self.batch_size_update = self.config[
'batch_size_update']
self.log = logging.getLogger('swh.indexer.rehash')
self.task = get_task('swh.indexer.tasks.SWHRecomputeChecksums')
if not self.compute_checksums:
raise ValueError('Checksums list should not be empty.')
......@@ -89,6 +92,15 @@ class RecomputeChecksums(SWHConfig):
yield h
def _reschedule(self, contents):
"""Reschedule contents to the task.
Args:
contents ([dict]): dictionary of data to schedule back.
"""
self.task.delay(contents)
def get_new_contents_metadata(self, all_contents):
"""Retrieve raw contents and compute new checksums on the
contents. Unknown or corrupted contents are skipped.
......@@ -102,12 +114,20 @@ class RecomputeChecksums(SWHConfig):
tuple of: content to update, list of checksums computed
"""
for contents in utils.grouper(all_contents,
content_ids = self._read_content_ids(all_contents)
for contents in utils.grouper(content_ids,
self.batch_size_retrieve_content):
contents = self.storage.content_get_metadata(
self._read_content_ids(contents))
for content in contents:
contents_iter = itertools.tee(contents, 2)
try:
content_metadata = self.storage.content_get_metadata(
[s for s in contents_iter[0]])
except:
self.log.error(
'Problem when reading contents metadata. Rescheduling!')
self._reschedule([{'sha1': sha1} for sha1 in contents_iter[1]])
continue
for content in content_metadata:
if self.recompute_checksums: # Recompute checksums provided
# in compute_checksums options
checksums_to_compute = list(self.compute_checksums)
......@@ -156,5 +176,10 @@ class RecomputeChecksums(SWHConfig):
for keys_to_update, contents in groups.items():
keys = keys_to_update.split(',')
self.storage.content_update(contents,
keys=keys)
try:
self.storage.content_update(contents,
keys=keys)
except:
self.log.error('Problem during update. Rescheduling!')
self._reschedule([{'sha1': c['sha1']} for c in contents])
continue
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment