Skip to content
Snippets Groups Projects
Commit ed231417 authored by vlorentz's avatar vlorentz
Browse files

Add method RPCServerApp.setup_psycopg2_errorhandlers

This imports code from swh.storage.api.server, so it can be reused
by other RPC servers for psycopg2 backends (swh-indexer-storage, swh-scheduler)
parent 7d104d1b
Branches master
Tags v2.20.0
No related merge requests found
......@@ -30,14 +30,14 @@ from .negotiation import Formatter as FormatterBase
from .negotiation import Negotiator as NegotiatorBase
from .negotiation import negotiate as _negotiate
from .serializers import (
decode_response,
encode_data_client,
exception_to_dict,
json_dumps,
json_loads,
msgpack_dumps,
msgpack_loads,
)
from .serializers import decode_response
from .serializers import encode_data_client as encode_data
logger = logging.getLogger(__name__)
......@@ -283,7 +283,7 @@ class RPCClient(metaclass=MetaRPCClient):
return self._decode_response(response)
def _encode_data(self, data):
return encode_data(data, extra_encoders=self.extra_type_encoders)
return encode_data_client(data, extra_encoders=self.extra_type_encoders)
_post_stream = _post
......@@ -512,3 +512,29 @@ class RPCServerApp(Flask):
f = decorator(f)
self.route("/" + meth._endpoint_path, methods=["POST"])(f)
def setup_psycopg2_errorhandlers(self) -> None:
"""Configures error handlers for exceptions raised by psycopg2 to return 503
exceptions and skip Sentry reports for most exceptions derived from
:exc:`psycopg2.errors.OperationalError`."""
from psycopg2.errors import OperationalError, QueryCanceled
@self.errorhandler(OperationalError)
def operationalerror_exception_handler(exception):
# Same as error_handler(exception, encode_data); but does not log or send
# to Sentry.
# These errors are noisy, and are better logged on the caller's side after
# it retried a few times.
# Additionally, we return 503 instead of 500, telling clients they should
# retry.
response = encode_data_server(exception_to_dict(exception))
response.status_code = 503
return response
@self.errorhandler(QueryCanceled)
def querycancelled_exception_handler(exception):
# Ditto, but 500 instead of 503, because this is usually caused by the query
# size instead of a transient failure
response = encode_data_server(exception_to_dict(exception))
response.status_code = 500
return response
# Copyright (C) 2018-2022 The Software Heritage developers
# Copyright (C) 2018-2023 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -54,6 +54,18 @@ class TestStorage:
def custom_crashy(self, data, db=None, cur=None):
raise MyCustomException("try again later!")
@remote_api_endpoint("crashy/adminshutdown")
def adminshutdown_crash(self, data, db=None, cur=None):
from psycopg2.errors import AdminShutdown
raise AdminShutdown("cluster is shutting down")
@remote_api_endpoint("crashy/querycancelled")
def querycancelled_crash(self, data, db=None, cur=None):
from psycopg2.errors import QueryCanceled
raise QueryCanceled("too big!")
@pytest.fixture
def app():
......@@ -63,6 +75,13 @@ def app():
def custom_error_handler(exception):
return error_handler(exception, encode_data_server, status_code=503)
try:
import psycopg2 # noqa
except ImportError:
pass
else:
app.setup_psycopg2_errorhandlers()
@app.errorhandler(Exception)
def default_error_handler(exception):
return error_handler(exception, encode_data_server)
......@@ -184,6 +203,56 @@ def test_rpc_server_custom_exception(flask_app_client):
), data
def test_rpc_server_psycopg2_adminshutdown(flask_app_client):
pytest.importorskip("psycopg2")
res = flask_app_client.post(
url_for("adminshutdown_crash"),
headers=[
("Content-Type", "application/x-msgpack"),
("Accept", "application/x-msgpack"),
],
data=msgpack.dumps({"data": "toto"}),
)
assert res.status_code == 503
assert res.mimetype == "application/x-msgpack", res.data
data = msgpack.loads(res.data)
assert (
data.items()
>= {
"type": "AdminShutdown",
"module": "psycopg2.errors",
"args": ["cluster is shutting down"],
}.items()
), data
def test_rpc_server_psycopg2_querycancelled(flask_app_client):
pytest.importorskip("psycopg2")
res = flask_app_client.post(
url_for("querycancelled_crash"),
headers=[
("Content-Type", "application/x-msgpack"),
("Accept", "application/x-msgpack"),
],
data=msgpack.dumps({"data": "toto"}),
)
assert res.status_code == 500
assert res.mimetype == "application/x-msgpack", res.data
data = msgpack.loads(res.data)
assert (
data.items()
>= {
"type": "QueryCanceled",
"module": "psycopg2.errors",
"args": ["too big!"],
}.items()
), data
def test_rpc_server_extra_serializers(flask_app_client):
res = flask_app_client.post(
url_for("serializer_test"),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment