Skip to content
Snippets Groups Projects
Commit 2760e322 authored by David Douard's avatar David Douard
Browse files

Simplify the lo/high partition offset computation

The computation of lo and high offsets used to be done in 2 steps:
- first get the watermak offsets (thus the absolute min and max offsets
  of the whole partition)
- then, as a "hook" in `process()`, retrieve the last committed offset
  for the partition and "push" these current offsets in the progress
  queue.

Instead, this simplifies a bit this process by quering the committed
offsets while computing the hi/low offsets.
parent e47a3db1
No related branches found
No related tags found
No related merge requests found
......@@ -71,22 +71,11 @@ class JournalClientOffsetRanges(JournalClient):
def process(self, worker_fn):
self.count = 0
try:
self.handle_committed_offsets()
if self.assignment:
super().process(worker_fn)
finally:
self.progress_queue.put(None)
def handle_committed_offsets(self,):
"""
Handle already committed partition offsets before starting processing.
"""
committed = self.consumer.committed(
[TopicPartition(self.topic_name, pid) for pid in self.assignment]
)
for tp in committed:
self.handle_offset(tp.partition, tp.offset)
def handle_offset(self, partition_id, offset):
"""
Check whether the client has reached the end of the current
......@@ -153,8 +142,12 @@ class ParallelJournalProcessor:
def get_offsets(self):
"""
First pass to fetch all the current low and high offsets of each
Compute (lo, high) offset boundaries for all partitions.
First pass to fetch all the current low and high watermark offsets of each
partition to define the consumption boundaries.
If available, use committed offsets as lo offset boundaries.
"""
if self.offsets is None:
client = JournalClient(
......@@ -168,10 +161,27 @@ class ParallelJournalProcessor:
self.offsets = {}
# LOW watermark offset: The offset of the earliest message in the
# topic/partition. If no messages have been written to the topic,
# the low watermark offset is set to 0. The low watermark will also
# be 0 if one message has been written to the partition (with
# offset 0).
# HIGH watermark offset: the offset of the latest message in the
# topic/partition available for consumption + 1
def fetch_insert_partition_id(partition_id):
tp = TopicPartition(topic_name, partition_id)
(lo, hi) = client.consumer.get_watermark_offsets(tp)
if lo != hi:
if hi > lo:
# hi == low means there is nothing in the partition to consume.
# If the partition is not empty, retrieve the committed offset,
# if any, to use it at lo offset.
committed = client.consumer.committed([tp])[0]
lo = max(lo, committed.offset)
if hi > lo:
# do only process the partition is there are actually new
# messages to process (partition not empty and committed
# offset is behind the high watermark).
self.offsets[partition_id] = (lo, hi)
with concurrent.futures.ThreadPoolExecutor(
......@@ -192,7 +202,9 @@ class ParallelJournalProcessor:
"""
offsets = self.get_offsets()
to_assign = list(offsets.keys())
if not to_assign:
print(" - Journal export ({self.obj_type}): skipped (nothing to export)")
return
manager = multiprocessing.Manager()
q = manager.Queue()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment