Skip to content
Snippets Groups Projects
Commit 5b17c50d authored by David Douard's avatar David Douard
Browse files

Add support for the rdkafka 'stats_cb' config option in get_journal_client

this options allows to define a callback which will be called once every
'statistics.interval.ms' ms by rdkafka with a bunch of statistics (as a
json string).

See https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md

This allows to define this callback as a string of the form:

  "path.to.module:function"
parent 0d115993
No related branches found
No related tags found
No related merge requests found
......@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
from collections import defaultdict
from importlib import import_module
import logging
import os
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
......@@ -34,6 +35,30 @@ def get_journal_client(cls: str, **kwargs: Any):
Currently, only the "kafka" journal client is supported.
"""
if cls == "kafka":
if "stats_cb" in kwargs:
stats_cb = kwargs["stats_cb"]
if isinstance(stats_cb, str):
try:
module_path, func_name = stats_cb.split(":")
except ValueError:
raise ValueError(
"Invalid stats_cb configuration option: "
"it should be a string like 'path.to.module:function'"
)
try:
module = import_module(module_path, package=__package__)
except ModuleNotFoundError:
raise ValueError(
"Invalid stats_cb configuration option: "
f"module {module_path} not found"
)
try:
kwargs["stats_cb"] = getattr(module, func_name)
except AttributeError:
raise ValueError(
"Invalid stats_cb configuration option: "
f"function {func_name} not found in module {module_path}"
)
return JournalClient(**kwargs)
raise ValueError("Unknown journal client class `%s`" % cls)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment