Skip to content
Snippets Groups Projects
Commit 5d89e09d authored by vlorentz's avatar vlorentz Committed by vlorentz
Browse files

postgresql: Sort rows before upsertion to avoid deadlocks

parent 1245ec5f
No related branches found
No related tags found
No related merge requests found
......@@ -110,15 +110,17 @@ class ProvenanceStoragePostgreSql:
@handle_raise_on_commit
def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool:
if cnts:
# Upsert in consistent order to avoid deadlocks
rows = sorted(cnts.items())
sql = """
INSERT INTO content(sha1, date) VALUES %s
ON CONFLICT (sha1) DO
UPDATE SET date=LEAST(EXCLUDED.date,content.date)
"""
page_size = self.page_size or len(cnts)
page_size = self.page_size or len(rows)
with self.transaction() as cursor:
psycopg2.extras.execute_values(
cursor, sql, argslist=cnts.items(), page_size=page_size
cursor, sql, argslist=rows, page_size=page_size
)
return True
......@@ -160,7 +162,8 @@ class ProvenanceStoragePostgreSql:
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "directory_add"})
@handle_raise_on_commit
def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool:
data = [(sha1, rev.date, rev.flat) for sha1, rev in dirs.items()]
# Upsert in consistent order to avoid deadlocks
data = sorted((sha1, rev.date, rev.flat) for sha1, rev in dirs.items())
if data:
sql = """
INSERT INTO directory(sha1, date, flat) VALUES %s
......@@ -220,7 +223,8 @@ class ProvenanceStoragePostgreSql:
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "location_add"})
@handle_raise_on_commit
def location_add(self, paths: Dict[Sha1Git, bytes]) -> bool:
values = [(path,) for path in paths.values()]
# Upsert in consistent order to avoid deadlocks
values = sorted((path,) for path in paths.values())
if values:
sql = """
INSERT INTO location(path) VALUES %s
......@@ -243,16 +247,18 @@ class ProvenanceStoragePostgreSql:
@handle_raise_on_commit
def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool:
if orgs:
# Upsert in consistent order to avoid deadlocks
rows = sorted(orgs.items())
sql = """
INSERT INTO origin(sha1, url) VALUES %s
ON CONFLICT DO NOTHING
"""
page_size = self.page_size or len(orgs)
page_size = self.page_size or len(rows)
with self.transaction() as cursor:
psycopg2.extras.execute_values(
cur=cursor,
sql=sql,
argslist=orgs.items(),
argslist=rows,
page_size=page_size,
)
return True
......@@ -285,7 +291,8 @@ class ProvenanceStoragePostgreSql:
@handle_raise_on_commit
def revision_add(self, revs: Dict[Sha1Git, RevisionData]) -> bool:
if revs:
data = [(sha1, rev.date, rev.origin) for sha1, rev in revs.items()]
# Upsert in consistent order to avoid deadlocks
data = sorted((sha1, rev.date, rev.origin) for sha1, rev in revs.items())
sql = """
INSERT INTO revision(sha1, date, origin)
(SELECT V.rev AS sha1, V.date::timestamptz AS date, O.id AS origin
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment