Skip to content
Snippets Groups Projects

JournalClient: add a stop_at_eof boolean to read the log only once

3 unresolved threads

Migrated from D2718 (view on Phabricator)

Merge request reports

Loading
Loading

Activity

Filter activity
  • Approvals
  • Assignees & reviewers
  • Comments (from bots)
  • Comments (from users)
  • Commits & branches
  • Edits
  • Labels
  • Lock status
  • Mentions
  • Merge request status
  • Tracking
171 176 if not messages:
172 177 continue
173 178
174 # Process messages and add the successfully processed ones to the
175 # message counter.
176 nb_messages += self.handle_messages(messages, worker_fn)
179 nb_processed, at_eof = self.handle_messages(messages, worker_fn)
  • 48 48 client.process(worker_fn)
    49 49
    50 50 worker_fn.assert_called_once_with({'revision': [rev.to_dict()]})
    51
    52
    53 def test_client_eof(
    54 kafka_prefix: str,
    55 kafka_server: Tuple[Popen, int]):
    • Most of this is copy pasted from the function above, but I'm not sure if it's worth to factor, and how we should factor it. I can write a fixture that yields a kafka_prefix maybe?

    • Please register or sign in to reply
  • vlorentz
    vlorentz @vlorentz started a thread on the diff
  • 199 210 worker_fn(dict(objects))
    200 211 self.consumer.commit()
    201 212
    202 return nb_processed
    213 at_eof = (self.stop_on_eof and all(
    214 (tp.topic, tp.partition) in self.eof_reached
    215 for tp in self.consumer.assignment()
    • This code is unsound. Your client may reach EOF on some partitions, then get rebalanced on other partitions, reach EOF on some other partitions; but it will count the older partitions on the left side. You should try to do a set equality instead.

    • Please register or sign in to reply
  • Merge request was returned for changes

  • fix reviews

  • Merge request was accepted

  • vlorentz approved this merge request

    approved this merge request

  • Merge request was merged

  • Please register or sign in to reply
    Loading