From 77e69ecf984105fff4fce5a8f4de8327be7387a0 Mon Sep 17 00:00:00 2001 From: Nicolas Dandrimont <nicolas@dandrimont.eu> Date: Sat, 12 May 2018 17:49:44 +0200 Subject: [PATCH] Use a concurrent.future to parallelize objstorage and storage addition --- swh/storage/storage.py | 55 ++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/swh/storage/storage.py b/swh/storage/storage.py index 63932e11b..e54e45b7e 100644 --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -5,9 +5,11 @@ from collections import defaultdict +import concurrent.futures import datetime import itertools import json + import dateutil.parser import psycopg2 import psycopg2.pool @@ -132,35 +134,46 @@ class Storage(): in self.skipped_content_missing( content_without_data)) + def add_to_objstorage(): + data = { + cont['sha1']: cont['data'] + for cont in content_with_data + if cont['sha1'] in missing_content + } + self.objstorage.add_batch(data) + with db.transaction() as cur: - if missing_content: - # create temporary table for metadata injection - db.mktemp('content', cur) + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + added_to_objstorage = executor.submit(add_to_objstorage) + if missing_content: + # create temporary table for metadata injection + db.mktemp('content', cur) - def add_to_objstorage(cont): - self.objstorage.add(cont['data'], - obj_id=cont['sha1']) + content_filtered = (cont for cont in content_with_data + if cont['sha1'] in missing_content) - content_filtered = (cont for cont in content_with_data - if cont['sha1'] in missing_content) + db.copy_to(content_filtered, 'tmp_content', + db.content_get_metadata_keys, cur) - db.copy_to(content_filtered, 'tmp_content', - db.content_get_metadata_keys, - cur, item_cb=add_to_objstorage) + # move metadata in place + db.content_add_from_temp(cur) - # move metadata in place - db.content_add_from_temp(cur) + if missing_skipped: + missing_filtered = ( + cont for cont in content_without_data + if _unique_key(cont) in missing_skipped + ) - if missing_skipped: - missing_filtered = (cont for cont in content_without_data - if _unique_key(cont) in missing_skipped) + db.mktemp('skipped_content', cur) + db.copy_to(missing_filtered, 'tmp_skipped_content', + db.skipped_content_keys, cur) - db.mktemp('skipped_content', cur) - db.copy_to(missing_filtered, 'tmp_skipped_content', - db.skipped_content_keys, cur) + # move metadata in place + db.skipped_content_add_from_temp(cur) - # move metadata in place - db.skipped_content_add_from_temp(cur) + # Wait for objstorage addition before returning from the + # transaction, bubbling up any exception + _ = added_to_objstorage.result() @db_transaction def content_update(self, content, keys=[], db=None, cur=None): -- GitLab