From ed2314178de00bb9dc183e5926a08e2b74da3681 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz <vlorentz@softwareheritage.org> Date: Tue, 24 Jan 2023 14:30:48 +0100 Subject: [PATCH] Add method RPCServerApp.setup_psycopg2_errorhandlers This imports code from swh.storage.api.server, so it can be reused by other RPC servers for psycopg2 backends (swh-indexer-storage, swh-scheduler) --- swh/core/api/__init__.py | 32 ++++++++++-- swh/core/api/tests/test_rpc_server.py | 71 ++++++++++++++++++++++++++- 2 files changed, 99 insertions(+), 4 deletions(-) diff --git a/swh/core/api/__init__.py b/swh/core/api/__init__.py index 90b57937..1064350d 100644 --- a/swh/core/api/__init__.py +++ b/swh/core/api/__init__.py @@ -30,14 +30,14 @@ from .negotiation import Formatter as FormatterBase from .negotiation import Negotiator as NegotiatorBase from .negotiation import negotiate as _negotiate from .serializers import ( + decode_response, + encode_data_client, exception_to_dict, json_dumps, json_loads, msgpack_dumps, msgpack_loads, ) -from .serializers import decode_response -from .serializers import encode_data_client as encode_data logger = logging.getLogger(__name__) @@ -283,7 +283,7 @@ class RPCClient(metaclass=MetaRPCClient): return self._decode_response(response) def _encode_data(self, data): - return encode_data(data, extra_encoders=self.extra_type_encoders) + return encode_data_client(data, extra_encoders=self.extra_type_encoders) _post_stream = _post @@ -512,3 +512,29 @@ class RPCServerApp(Flask): f = decorator(f) self.route("/" + meth._endpoint_path, methods=["POST"])(f) + + def setup_psycopg2_errorhandlers(self) -> None: + """Configures error handlers for exceptions raised by psycopg2 to return 503 + exceptions and skip Sentry reports for most exceptions derived from + :exc:`psycopg2.errors.OperationalError`.""" + from psycopg2.errors import OperationalError, QueryCanceled + + @self.errorhandler(OperationalError) + def operationalerror_exception_handler(exception): + # Same as error_handler(exception, encode_data); but does not log or send + # to Sentry. + # These errors are noisy, and are better logged on the caller's side after + # it retried a few times. + # Additionally, we return 503 instead of 500, telling clients they should + # retry. + response = encode_data_server(exception_to_dict(exception)) + response.status_code = 503 + return response + + @self.errorhandler(QueryCanceled) + def querycancelled_exception_handler(exception): + # Ditto, but 500 instead of 503, because this is usually caused by the query + # size instead of a transient failure + response = encode_data_server(exception_to_dict(exception)) + response.status_code = 500 + return response diff --git a/swh/core/api/tests/test_rpc_server.py b/swh/core/api/tests/test_rpc_server.py index 9bf319c7..26ecffb0 100644 --- a/swh/core/api/tests/test_rpc_server.py +++ b/swh/core/api/tests/test_rpc_server.py @@ -1,4 +1,4 @@ -# Copyright (C) 2018-2022 The Software Heritage developers +# Copyright (C) 2018-2023 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -54,6 +54,18 @@ class TestStorage: def custom_crashy(self, data, db=None, cur=None): raise MyCustomException("try again later!") + @remote_api_endpoint("crashy/adminshutdown") + def adminshutdown_crash(self, data, db=None, cur=None): + from psycopg2.errors import AdminShutdown + + raise AdminShutdown("cluster is shutting down") + + @remote_api_endpoint("crashy/querycancelled") + def querycancelled_crash(self, data, db=None, cur=None): + from psycopg2.errors import QueryCanceled + + raise QueryCanceled("too big!") + @pytest.fixture def app(): @@ -63,6 +75,13 @@ def app(): def custom_error_handler(exception): return error_handler(exception, encode_data_server, status_code=503) + try: + import psycopg2 # noqa + except ImportError: + pass + else: + app.setup_psycopg2_errorhandlers() + @app.errorhandler(Exception) def default_error_handler(exception): return error_handler(exception, encode_data_server) @@ -184,6 +203,56 @@ def test_rpc_server_custom_exception(flask_app_client): ), data +def test_rpc_server_psycopg2_adminshutdown(flask_app_client): + pytest.importorskip("psycopg2") + + res = flask_app_client.post( + url_for("adminshutdown_crash"), + headers=[ + ("Content-Type", "application/x-msgpack"), + ("Accept", "application/x-msgpack"), + ], + data=msgpack.dumps({"data": "toto"}), + ) + + assert res.status_code == 503 + assert res.mimetype == "application/x-msgpack", res.data + data = msgpack.loads(res.data) + assert ( + data.items() + >= { + "type": "AdminShutdown", + "module": "psycopg2.errors", + "args": ["cluster is shutting down"], + }.items() + ), data + + +def test_rpc_server_psycopg2_querycancelled(flask_app_client): + pytest.importorskip("psycopg2") + + res = flask_app_client.post( + url_for("querycancelled_crash"), + headers=[ + ("Content-Type", "application/x-msgpack"), + ("Accept", "application/x-msgpack"), + ], + data=msgpack.dumps({"data": "toto"}), + ) + + assert res.status_code == 500 + assert res.mimetype == "application/x-msgpack", res.data + data = msgpack.loads(res.data) + assert ( + data.items() + >= { + "type": "QueryCanceled", + "module": "psycopg2.errors", + "args": ["too big!"], + }.items() + ), data + + def test_rpc_server_extra_serializers(flask_app_client): res = flask_app_client.post( url_for("serializer_test"), -- GitLab