Skip to content
Snippets Groups Projects

Split kafka writer tests into a consumer loop and an object check

Compare and
1 file
+ 23
7
Compare changes
  • Side-by-side
  • Inline
@@ -25,8 +25,10 @@ from .conftest import OBJECT_TYPE_KEYS
MODEL_OBJECTS = {v: k for (k, v) in OBJECT_TYPES.items()}
def assert_written(consumer, kafka_prefix, expected_messages):
consumed_objects = defaultdict(list)
def consume_messages(consumer, kafka_prefix, expected_messages):
"""Consume expected_messages from the consumer;
Sort them all into a consumed_objects dict"""
consumed_messages = defaultdict(list)
fetched_messages = 0
retries_left = 1000
@@ -49,13 +51,21 @@ def assert_written(consumer, kafka_prefix, expected_messages):
continue
fetched_messages += 1
consumed_objects[msg.topic()].append(
topic = msg.topic()
assert topic.startswith(kafka_prefix + '.'), "Unexpected topic"
object_type = topic[len(kafka_prefix + '.'):]
consumed_messages[object_type].append(
(kafka_to_key(msg.key()), kafka_to_value(msg.value()))
)
return consumed_messages
def assert_all_objects_consumed(consumed_messages):
"""Check whether all objects from OBJECT_TYPE_KEYS have been consumed"""
for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items():
topic = kafka_prefix + '.' + object_type
(keys, values) = zip(*consumed_objects[topic])
(keys, values) = zip(*consumed_messages[object_type])
if key_name:
assert list(keys) == [object_[key_name] for object_ in objects]
else:
@@ -99,7 +109,10 @@ def test_kafka_writer(
writer.write_addition(object_type, object_)
expected_messages += 1
assert_written(consumer, kafka_prefix, expected_messages)
consumed_messages = consume_messages(
consumer, kafka_prefix, expected_messages
)
assert_all_objects_consumed(consumed_messages)
def test_storage_direct_writer(
@@ -159,4 +172,7 @@ def test_storage_direct_writer(
else:
assert False, object_type
assert_written(consumer, kafka_prefix, expected_messages)
consumed_messages = consume_messages(
consumer, kafka_prefix, expected_messages
)
assert_all_objects_consumed(consumed_messages)
Loading