Skip to content
Snippets Groups Projects
Commit 0d115993 authored by David Douard's avatar David Douard
Browse files

Remove 'process_timeout' from JournalClient's arguments

it was not used anywhere, so simplify the code a bit.
parent 4e5e0098
No related branches found
No related tags found
No related merge requests found
......@@ -6,7 +6,6 @@
from collections import defaultdict
import logging
import os
import time
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from confluent_kafka import Consumer, KafkaError, KafkaException
......@@ -220,10 +219,16 @@ class JournalClient:
self.subscribe()
self.stop_after_objects = stop_after_objects
self.process_timeout = process_timeout
self.eof_reached: Set[Tuple[str, str]] = set()
self.batch_size = batch_size
if process_timeout is not None:
raise DeprecationWarning(
"'process_timeout' argument is not supported anymore by "
"JournalClient; please remove it from your configuration.",
)
def subscribe(self):
"""Subscribe to topics listed in self.subscription
......@@ -241,26 +246,11 @@ class JournalClient:
the messages as
argument.
"""
start_time = time.monotonic()
total_objects_processed = 0
# timeout for message poll
timeout = 1.0
while True:
# timeout for message poll
timeout = 1.0
elapsed = time.monotonic() - start_time
if self.process_timeout:
# +0.01 to prevent busy-waiting on / spamming consumer.poll.
# consumer.consume() returns shortly before X expired
# (a matter of milliseconds), so after it returns a first
# time, it would then be called with a timeout in the order
# of milliseconds, therefore returning immediately, then be
# called again, etc.
if elapsed + 0.01 >= self.process_timeout:
break
timeout = self.process_timeout - elapsed
batch_size = self.batch_size
if self.stop_after_objects:
if total_objects_processed >= self.stop_after_objects:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment