Skip to content
Snippets Groups Projects
Commit 94be817f authored by David Douard's avatar David Douard
Browse files

Commit kafka messages which offset has reach the high limit

this is necessary to ensure these messages are committed in kafka,
otherwise, since the (considered) empty partition is unsubscribed from,
it never gets committed in `JournalClient.handle_messages()` (since this
later only commit assigned partitions).

Ensure offset are committed only after worker_fn is executed without
error.

This requires to overload the `JournalClient.handle_messages()` method in
`JournalClientOffsetRanges` to make sure "pending" messages are
committed after the proper execution of `worker_fn`.

Doing so, we can both unsubscribe from "eof" partitions on the fly (with
"eof" meaning when the partition has been consumed up to the high
watermark offset at the beginning of the export), and commit ALL offsets
that need to be, but only after proper execution of the `worker_fn`
callback.

This should guarantee proper and consistent behavior (famous last
word...).
parent a3c1f390
No related branches found
No related tags found
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment