Skip to content
Snippets Groups Projects
Unverified Commit dc58fd68 authored by Antoine R. Dumont's avatar Antoine R. Dumont
Browse files

elasticsearch/origin_update: Manage circuit breaker exception

When some bulk data are sent with too large data, this raises a BulkIndexError to avoid
making the elasticsearch oom. This tries to manage this case client side.

https://www.elastic.co/guide/en/elasticsearch/guide/current/_limiting_memory_usage.html#circuit-breaker

Refs. swh/infra/sysadm-environment#5514
parent 6944d656
No related tags found
1 merge request!164elasticsearch/origin_update: Manage circuit breaker exception
Pipeline #13507 passed
# Copyright (C) 2019-2022 The Software Heritage developers
# Copyright (C) 2019-2025 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
......@@ -11,8 +11,10 @@ from textwrap import dedent
from typing import Any, Dict, Iterable, List, Optional, cast
from elasticsearch import Elasticsearch, NotFoundError, helpers
from elasticsearch.helpers.errors import BulkIndexError
import msgpack
from swh.core.utils import grouper
from swh.indexer import codemeta
from swh.model import model
from swh.model.hashutil import hash_to_hex
......@@ -375,13 +377,37 @@ class ElasticSearch:
for (sha1, document) in documents_with_sha1
]
indexed_count, errors = helpers.bulk(self._backend, actions, index=write_index)
assert isinstance(errors, List) # Make mypy happy
send_metric("document:index", count=indexed_count, method_name="origin_update")
send_metric(
"document:index_error", count=len(errors), method_name="origin_update"
)
try:
# Full bulk update the stream of documents which should work most of the
# time (but this can raise when too much data is sent in one round)
indexed_count, errors = helpers.bulk(
self._backend, actions, index=write_index
)
except BulkIndexError as e:
indexed_count, errors = 0, list()
# Circuit breaker exception raises instead of making elasticsearch OOM.
# https://www.elastic.co/guide/en/elasticsearch/guide/current/_limiting_memory_usage.html#circuit-breaker
# We've got too large data to bulk index, let's try to split bulk updates
# into smaller list of documents (using the `actions_fallback` iterator).
logger.warning("Exception: %s. Bulk update documents in smaller lists.", e)
for docs in grouper(actions, n=10):
# Note that in the same way as the main call, this can raise too.
# In that case, we let the exception raise as some server configuration
# adaptation will be needed.
_indexed_count, _errors = helpers.bulk(
self._backend, docs, index=write_index
)
indexed_count += _indexed_count
errors.append(_errors)
finally:
assert isinstance(errors, List) # Make mypy happy
send_metric(
"document:index", count=indexed_count, method_name="origin_update"
)
send_metric(
"document:index_error", count=len(errors), method_name="origin_update"
)
def origin_get(self, url: str) -> Optional[Dict[str, Any]]:
origin_id = hash_to_hex(model.Origin(url=url).id)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment