Skip to content
Snippets Groups Projects
Commit 075b3c30 authored by Antoine Pietri's avatar Antoine Pietri
Browse files

journalprocessor: save final offsets to a text file

parent d2665ef3
No related branches found
No related tags found
No related merge requests found
......@@ -7,6 +7,7 @@ import collections
import concurrent.futures
from concurrent.futures import FIRST_EXCEPTION, ProcessPoolExecutor
import contextlib
import json
import logging
import multiprocessing
from pathlib import Path
......@@ -285,7 +286,10 @@ class ParallelJournalProcessor:
)
futures.append(pool.submit(self.progress_worker, queue=q))
# Run processes until they all complete, or an error occurs
concurrent.futures.wait(futures, return_when=FIRST_EXCEPTION)
# Propagate potential exceptions
for f in futures:
if f.running():
continue
......@@ -318,6 +322,13 @@ class ParallelJournalProcessor:
)
pbar.update(progress - pbar.n)
# Write final consumer offsets to a save file
(
self.node_sets_path
/ self.obj_type
/ f"offsets-final-{int(time.time())}.json"
).write_text(json.dumps(d))
def export_worker(self, assignment, progress_queue):
worker = JournalProcessorWorker(
self.config,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment