Skip to content
Snippets Groups Projects
Commit 7f12aa27 authored by Pierre-Yves David's avatar Pierre-Yves David
Browse files

Migration to psycopg3

parent 00d2d9bb
No related branches found
No related tags found
1 merge request!537Migration to psycopg3
Pipeline #14223 passed
swh.core[db,http] >= 3.6.1
swh.core[db,http] >= 4.0.0
swh.model >= 6.13.0
swh.objstorage >= 2.3.1
swh.storage >= 2.0.0
swh.storage >= 3.0.0
swh.journal >= 0.1.0
......@@ -9,8 +9,8 @@ from typing import Dict, Iterable, List, Optional, Tuple, Union
import warnings
import attr
import psycopg2
import psycopg2.pool
import psycopg
import psycopg_pool
from swh.core.db.common import db_transaction
from swh.indexer.storage.interface import IndexerStorageInterface
......@@ -129,22 +129,24 @@ class IndexerStorage:
def __init__(self, db, min_pool_conns=1, max_pool_conns=10, journal_writer=None):
"""
Args:
db: either a libpq connection string, or a psycopg2 connection
db: either a libpq connection string, or a psycopg connection
journal_writer: configuration passed to
`swh.journal.writer.get_journal_writer`
"""
self.journal_writer = JournalWriter(journal_writer)
try:
if isinstance(db, psycopg2.extensions.connection):
self._pool = None
self._db = Db(db)
else:
self._pool = psycopg2.pool.ThreadedConnectionPool(
min_pool_conns, max_pool_conns, db
if isinstance(db, str):
self._pool = psycopg_pool.ConnectionPool(
conninfo=db,
min_size=min_pool_conns,
max_size=max_pool_conns,
)
self._db = None
except psycopg2.OperationalError as e:
else:
self._pool = None
self._db = Db(db)
except psycopg.OperationalError as e:
raise StorageDBError(e)
def get_db(self):
......
......@@ -43,9 +43,6 @@ def my_error_handler(exception):
return error_handler(exception, encode_data)
app.setup_psycopg2_errorhandlers()
@app.errorhandler(IndexerStorageArgumentException)
def argument_error_handler(exception):
return error_handler(exception, encode_data, status_code=400)
......
......@@ -3,14 +3,27 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Dict, Iterable, Iterator, List
from typing import Any, Dict, Iterable, Iterator, List
from psycopg import Cursor
from swh.core.db import BaseDb
from swh.core.db.db_utils import execute_values_generator, stored_procedure
from swh.core.db.db_utils import stored_procedure
from .interface import Sha1
def execute_values_generator(
cur: Cursor, query: str, values: Iterable[Any]
) -> Iterator[Any]:
cur.executemany(query, values, returning=True)
if cur.pgresult is None:
return
yield from cur.fetchall()
while cur.nextset():
yield from cur.fetchall()
class Db(BaseDb):
"""Proxy to the SWH Indexer DB, with wrappers around stored procedures"""
......@@ -32,17 +45,18 @@ class Db(BaseDb):
"""
cur = self._cursor(cur)
keys = ", ".join(hash_keys)
values_place_holder = ", ".join(["%s"] * len(hash_keys))
equality = " AND ".join(("t.%s = c.%s" % (key, key)) for key in hash_keys)
yield from execute_values_generator(
cur,
"""
select %s from (values %%s) as t(%s)
select %s from (values (%s)) as t(%s)
where not exists (
select 1 from %s c
where %s
)
"""
% (keys, keys, table, equality),
% (keys, values_place_holder, keys, table, equality),
(tuple(m[k] for k in hash_keys) for m in data),
)
......@@ -114,7 +128,7 @@ class Db(BaseDb):
keys = map(self._convert_key, cols)
query = """
select {keys}
from (values %s) as t(id)
from (values (%s)) as t(id)
inner join {table} c
on c.{id_col}=t.id
inner join indexer_configuration i
......@@ -206,7 +220,7 @@ class Db(BaseDb):
cur,
"""
select %s
from (values %%s) as t(id)
from (values (%%s)) as t(id)
inner join content_fossology_license c on t.id=c.id
inner join indexer_configuration i
on i.id=c.indexer_configuration_id
......
......@@ -3,7 +3,7 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import psycopg2
import psycopg
import pytest
from swh.core.api import RemoteException, TransientRemoteException
......@@ -79,7 +79,7 @@ def test_operationalerror_exception(app_server, swh_indexer_storage, mocker):
mocker.patch.object(
app_server.storage,
"content_mimetype_get",
side_effect=psycopg2.errors.AdminShutdown("cluster is shutting down"),
side_effect=psycopg.errors.AdminShutdown("cluster is shutting down"),
)
with pytest.raises(RemoteException) as excinfo:
swh_indexer_storage.content_mimetype_get([b"\x01" * 20])
......@@ -94,7 +94,7 @@ def test_querycancelled_exception(app_server, swh_indexer_storage, mocker):
mocker.patch.object(
app_server.storage,
"content_mimetype_get",
side_effect=psycopg2.errors.QueryCanceled("too big!"),
side_effect=psycopg.errors.QueryCanceled("too big!"),
)
with pytest.raises(RemoteException) as excinfo:
swh_indexer_storage.content_mimetype_get([b"\x01" * 20])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment