Skip to content
Snippets Groups Projects
Commit f92d4acf authored by David Douard's avatar David Douard
Browse files

Pass the object_type to JournalClient.value_serializer()

and make this function an (optional) constructor argument.
If not given, stick to `kafka_to_value`.

If the returned value is None, it is ignored (not passed to the
`worker_fn` function).

This is needed in order to make it possible for the JournalClient to use
a special value_deserializer implementation that needs the object_type,
for example to make the value_deserializer directly instanciate
BaseModel object.

This will be used by an upcoming refactoring of the storage replayer
that will make sure any BaseModel object coming from the journal is valid,
and log invalid kafka objects in case it's not.

Related to T3693.
parent 88054da8
No related branches found
No related tags found
No related merge requests found
......@@ -6,12 +6,16 @@ Software Heritage Journal clients
Journal client are processes that read data from the |swh| Journal,
in order to efficiently process all existing objects, and process new objects
as they come.
Some journal clients, such as :ref:`swh-dataset <swh-dataset>` only read
existing objects and stop when they are done.
They can run in parallel, and the :mod:`swh.journal.client` module
provides an abstraction handling all the setup, so actual clients are actually
a single function that takes :mod:`model objects <swh.model.model>` as parameters.
Other journal clients, such as the :ref:`mirror <swh-storage>` are expected to
read constantly from the journal.
They can run in parallel, and the :mod:`swh.journal.client` module provides an
abstraction handling all the setup, so actual clients only consists in a single
function that takes :mod:`model objects <swh.model.model>` as parameters.
For example, a very simple journal client that prints all revisions and releases
to the console can be implemented like this:
......
......@@ -7,7 +7,7 @@ from collections import defaultdict
import logging
import os
import time
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from confluent_kafka import Consumer, KafkaError, KafkaException
......@@ -73,6 +73,16 @@ class JournalClient:
Messages are processed by the `worker_fn` callback passed to the `process`
method, in batches of maximum `batch_size` messages (defaults to 200).
The objects passed to the `worker_fn` callback are the result of the kafka
message converted by the `value_deserializer` function. By default (if this
argument is not given), it will produce dicts (using the `kafka_to_value`
function). This signature of the function is:
`value_deserializer(object_type: str, kafka_msg: bytes) -> Any`
If the value returned by `value_deserializer` is None, it is ignored and
not passed the `worker_fn` function.
If set, the processing stops after processing `stop_after_objects` messages
in total.
......@@ -99,6 +109,7 @@ class JournalClient:
process_timeout: Optional[float] = None,
auto_offset_reset: str = "earliest",
stop_on_eof: bool = False,
value_deserializer: Optional[Callable[[str, bytes], Any]] = None,
**kwargs,
):
if prefix is None:
......@@ -111,8 +122,10 @@ class JournalClient:
if batch_size <= 0:
raise ValueError("Option 'batch_size' needs to be positive")
self.value_deserializer = kafka_to_value
if value_deserializer:
self.value_deserializer = value_deserializer
else:
self.value_deserializer = lambda _, value: kafka_to_value(value)
if isinstance(brokers, str):
brokers = [brokers]
......@@ -286,7 +299,11 @@ class JournalClient:
continue
nb_processed += 1
object_type = message.topic().split(".")[-1]
objects[object_type].append(self.deserialize_message(message))
deserialized_object = self.deserialize_message(
message, object_type=object_type
)
if deserialized_object is not None:
objects[object_type].append(deserialized_object)
if objects:
worker_fn(dict(objects))
......@@ -299,8 +316,8 @@ class JournalClient:
return nb_processed, at_eof
def deserialize_message(self, message):
return self.value_deserializer(message.value())
def deserialize_message(self, message, object_type=None):
return self.value_deserializer(object_type, message.value())
def close(self):
self.consumer.close()
......@@ -3,15 +3,16 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Dict, List
from typing import Dict, List, cast
from unittest.mock import MagicMock
from confluent_kafka import Producer
import pytest
from swh.journal.client import JournalClient
from swh.journal.serializers import key_to_kafka, value_to_kafka
from swh.model.model import Content
from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka
from swh.model.model import Content, Revision
from swh.model.tests.swh_model_data import TEST_OBJECTS
REV = {
"message": b"something cool",
......@@ -327,3 +328,45 @@ def test_client_subscriptions_without_anonymized_topics(
# we also only subscribed to the standard prefix, since there is no priviled prefix
# on the kafka broker
assert client.subscription == [kafka_prefix + ".revision"]
def test_client_with_deserializer(
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str
):
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test producer",
"acks": "all",
}
)
# Fill Kafka
revisions = cast(List[Revision], TEST_OBJECTS["revision"])
for rev in revisions:
producer.produce(
topic=kafka_prefix + ".revision",
key=rev.id,
value=value_to_kafka(rev.to_dict()),
)
producer.flush()
def custom_deserializer(object_type, msg):
assert object_type == "revision"
obj = kafka_to_value(msg)
# filter the first revision
if obj["id"] == revisions[0].id:
return None
return Revision.from_dict(obj)
client = JournalClient(
brokers=[kafka_server],
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=1,
value_deserializer=custom_deserializer,
)
worker_fn = MagicMock()
client.process(worker_fn)
# check that the first Revision has not been passed to worker_fn
worker_fn.assert_called_once_with({"revision": revisions[1:]})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment