diff --git a/swh/storage/storage.py b/swh/storage/storage.py index 63932e11bfa02cc7550751ea4981572cc88b5824..e54e45b7e6f4416433439d2847e215e071ae305d 100644 --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -5,9 +5,11 @@ from collections import defaultdict +import concurrent.futures import datetime import itertools import json + import dateutil.parser import psycopg2 import psycopg2.pool @@ -132,35 +134,46 @@ class Storage(): in self.skipped_content_missing( content_without_data)) + def add_to_objstorage(): + data = { + cont['sha1']: cont['data'] + for cont in content_with_data + if cont['sha1'] in missing_content + } + self.objstorage.add_batch(data) + with db.transaction() as cur: - if missing_content: - # create temporary table for metadata injection - db.mktemp('content', cur) + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + added_to_objstorage = executor.submit(add_to_objstorage) + if missing_content: + # create temporary table for metadata injection + db.mktemp('content', cur) - def add_to_objstorage(cont): - self.objstorage.add(cont['data'], - obj_id=cont['sha1']) + content_filtered = (cont for cont in content_with_data + if cont['sha1'] in missing_content) - content_filtered = (cont for cont in content_with_data - if cont['sha1'] in missing_content) + db.copy_to(content_filtered, 'tmp_content', + db.content_get_metadata_keys, cur) - db.copy_to(content_filtered, 'tmp_content', - db.content_get_metadata_keys, - cur, item_cb=add_to_objstorage) + # move metadata in place + db.content_add_from_temp(cur) - # move metadata in place - db.content_add_from_temp(cur) + if missing_skipped: + missing_filtered = ( + cont for cont in content_without_data + if _unique_key(cont) in missing_skipped + ) - if missing_skipped: - missing_filtered = (cont for cont in content_without_data - if _unique_key(cont) in missing_skipped) + db.mktemp('skipped_content', cur) + db.copy_to(missing_filtered, 'tmp_skipped_content', + db.skipped_content_keys, cur) - db.mktemp('skipped_content', cur) - db.copy_to(missing_filtered, 'tmp_skipped_content', - db.skipped_content_keys, cur) + # move metadata in place + db.skipped_content_add_from_temp(cur) - # move metadata in place - db.skipped_content_add_from_temp(cur) + # Wait for objstorage addition before returning from the + # transaction, bubbling up any exception + _ = added_to_objstorage.result() @db_transaction def content_update(self, content, keys=[], db=None, cur=None):