Skip to content
Snippets Groups Projects
Commit 07bcf167 authored by David Douard's avatar David Douard
Browse files

Add a --margin option to the `swh dataset graph export` command

this option allows to restart consuming kafka a bit earlier than last
committed offsets.
This can be useful to test and debug.
parent 4154d43a
No related branches found
No related tags found
No related merge requests found
......@@ -83,9 +83,18 @@ AVAILABLE_EXPORTERS = {
type=click.STRING,
help="Comma-separated list of objects types to export",
)
@click.option(
"--margin",
type=click.FloatRange(0, 1),
help=(
"Offset margin to start consuming from. E.g. is set to '0.95', "
"consumers will start at 95% of the last committed offset; "
"in other words, start earlier than last committed position."
),
)
@click.pass_context
def export_graph(
ctx, export_path, export_id, formats, exclude, object_types, processes
ctx, export_path, export_id, formats, exclude, object_types, processes, margin
):
"""Export the Software Heritage graph as an edge dataset."""
from importlib import import_module
......@@ -143,6 +152,7 @@ def export_graph(
obj_type,
node_sets_path=pathlib.Path(export_path) / ".node_sets" / obj_type,
processes=processes,
offset_margin=margin,
)
print("Exporting {}:".format(obj_type))
parallel_exporter.run()
......
......@@ -69,7 +69,6 @@ class JournalClientOffsetRanges(JournalClient):
self.assignment = assignment
self._messages_to_commit: List[Message] = []
self._partitions_to_unsubscribe: Set[int] = set()
self.count = None
self.topic_name: Optional[str] = None
kwargs["stop_on_eof"] = True # Stop when the assignment is empty
......@@ -80,12 +79,20 @@ class JournalClientOffsetRanges(JournalClient):
time.sleep(0.1) # https://github.com/edenhill/librdkafka/issues/1983
logger.debug("Changing assignment to %s", str(self.assignment))
self.consumer.assign(
[TopicPartition(self.topic_name, pid) for pid in self.assignment]
[
TopicPartition(
self.topic_name,
pid,
self.offset_ranges[pid][0],
)
for pid in self.assignment
]
)
def unsubscribe(self, partitions: Container[int]):
assert self.assignment is not None
self.assignment = [pid for pid in self.assignment if pid not in partitions]
logger.debug("Changing assignment to %s", str(self.assignment))
self.consumer.assign(
[TopicPartition(self.topic_name, pid) for pid in self.assignment]
)
......@@ -177,6 +184,7 @@ class ParallelJournalProcessor:
obj_type: str,
node_sets_path: Path,
processes: int = 1,
offset_margin: Optional[float] = None,
):
"""
Args:
......@@ -197,6 +205,7 @@ class ParallelJournalProcessor:
self.processes = processes
self.node_sets_path = node_sets_path
self.offsets = None
self.offset_margin = offset_margin
def get_offsets(self):
"""
......@@ -230,16 +239,30 @@ class ParallelJournalProcessor:
logger.debug("Fetching offset for partition %s", partition_id)
tp = TopicPartition(topic_name, partition_id)
(lo, hi) = client.consumer.get_watermark_offsets(tp)
logger.debug("[%s] (lo,hi)=(%s, %s)", partition_id, lo, hi)
logger.debug(
"[%s] watermark offset (lo,hi)=(%s, %s)", partition_id, lo, hi
)
if hi > lo:
# hi == low means there is nothing in the partition to consume.
# If the partition is not empty, retrieve the committed offset,
# if any, to use it at lo offset.
logger.debug(
"Fetching committed offset for partition %s", partition_id
)
committed = client.consumer.committed([tp])[0]
logger.debug(
"[%s] committed offset: %s", partition_id, committed.offset
)
lo = max(lo, committed.offset)
if self.offset_margin:
# Using min() in case of precision loss when self.offset_margin
# is close to 1.0 and lo is very large
newlo = min(lo, int(self.offset_margin * lo))
logger.debug(
"Apply offset margin: reducing lo from %s to %s", lo, newlo
)
lo = newlo
if hi > lo:
# do only process the partition is there are actually new
# messages to process (partition not empty and committed
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment