Skip to content
Snippets Groups Projects

Fix journal client's statsd probing

Merged David Douard requested to merge douardda/swh-journal:fix-client-statsd into master
2 files
+ 76
1
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -10,6 +10,7 @@ from unittest.mock import MagicMock
from confluent_kafka import Producer
import pytest
from swh.core.pytest_plugin import FakeSocket
from swh.journal.client import EofBehavior, JournalClient
from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka
from swh.model.model import Content, Revision
@@ -82,6 +83,80 @@ def test_client(
worker_fn.assert_called_once_with({"revision": [REV]})
def test_client_statsd(
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: str,
):
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
# Fill Kafka
producer.produce(
topic=kafka_prefix + ".revision",
key=REV["id"],
value=value_to_kafka(REV),
)
producer.flush()
client = JournalClient(
brokers=[kafka_server],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
on_eof=EofBehavior.STOP,
)
client.statsd._socket = FakeSocket()
worker_fn = MagicMock()
client.process(worker_fn)
worker_fn.assert_called_once_with({"revision": [REV]})
assert client.statsd.namespace == "swh_journal_client"
# check at least initialization of the status gauge is ok
assert client.statsd.socket.recv() == "swh_journal_client.status:0|g|#status:idle"
assert (
client.statsd.socket.recv()
== "swh_journal_client.status:0|g|#status:processing"
)
assert (
client.statsd.socket.recv() == "swh_journal_client.status:0|g|#status:waiting"
)
# processing the batch with only one message
assert client.statsd.socket.recv() == "swh_journal_client.status:1|g|#status:idle"
assert client.statsd.socket.recv() == "swh_journal_client.status:0|g|#status:idle"
assert (
client.statsd.socket.recv() == "swh_journal_client.status:1|g|#status:waiting"
)
assert (
client.statsd.socket.recv() == "swh_journal_client.status:0|g|#status:waiting"
)
assert (
client.statsd.socket.recv()
== "swh_journal_client.status:1|g|#status:processing"
)
assert (
client.statsd.socket.recv()
== "swh_journal_client.status:0|g|#status:processing"
)
assert client.statsd.socket.recv() == "swh_journal_client.status:1|g|#status:idle"
# from there, status messages can be mixed with a few other ones...
assert (
b"swh_journal_client.handle_message_total:1|c" in client.statsd.socket.payloads
)
assert b"swh_journal_client.stop_total:1|c" in client.statsd.socket.payloads
# and some waiting/idle forth and back may have happened, so only check the
# last 3 messages resetting the status gauges)
assert list(client.statsd.socket.payloads)[-3:] == [
b"swh_journal_client.status:0|g|#status:idle",
b"swh_journal_client.status:0|g|#status:processing",
b"swh_journal_client.status:0|g|#status:waiting",
]
@pytest.mark.parametrize(
"count,legacy_eof", [(1, True), (2, True), (1, False), (2, False)]
)
Loading