Skip to content
Snippets Groups Projects
Commit b53d7d7c authored by Antoine Pietri's avatar Antoine Pietri
Browse files

client: move subscription to a separate function

This allows overloading the subscribe() function, e.g for manual
assignment of partitions.
parent 7ff372a0
No related branches found
No related tags found
1 merge request!162client: move subscription to a separate function
......@@ -174,12 +174,10 @@ class JournalClient:
self.consumer = Consumer(consumer_settings)
topics = ["%s.%s" % (prefix, object_type) for object_type in object_types]
logger.debug("Upstream topics: %s", self.consumer.list_topics(timeout=10))
logger.debug("Subscribing to: %s", topics)
self.consumer.subscribe(topics=topics)
self.subscription = [
"%s.%s" % (prefix, object_type) for object_type in object_types
]
self.subscribe()
self.stop_after_objects = stop_after_objects
self.process_timeout = process_timeout
......@@ -188,6 +186,12 @@ class JournalClient:
self._object_types = object_types
def subscribe(self):
logger.debug("Upstream topics: %s", self.consumer.list_topics(timeout=10))
logger.debug("Subscribing to: %s", self.subscription)
self.consumer.subscribe(topics=self.subscription)
def process(self, worker_fn):
"""Polls Kafka for a batch of messages, and calls the worker_fn
with these messages.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment