Kafka clients crash easily in docker-compose environment
While testing a complete loading then indexing chain in the docker-compose environment, the following errors are often reported :
swh-indexer-journal-client_1 | 2019-03-08 15:17:08,360 1 INFO Starting indexer journal client with config {'content_size_limit': 104857600, 'log_db': 'dbname=softwareheritage-log', 'brokers': ['kafka'], 'max_messages': 1, 'scheduler': {'cls': 'remote', 'args': {'url': 'http://swh-scheduler-api:5008/'}}, 'topic_prefix': 'swh.journal.objects', 'consumer_id': 'swh.journal.client', 'object_types': ['origin_visit'], 'auto_offset_reset': 'earliest', 'origin_visit_tasks': [{'type': 'indexer_origin_metadata', 'kwargs': {'policy_update': 'update-dups', 'parse_ids': False}}]}
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,063 1 INFO Group coordinator for swh.journal.client is BrokerMetadata(nodeId=1001, host='kafka', port=9092, rack=None)
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,063 1 INFO Discovered coordinator 1001 for group swh.journal.client
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,066 1 INFO <BrokerConnection node_id=1001 host=kafka:9092 <connecting> [IPv4 ('172.19.0.16', 9092)]>: connecting to kafka:9092 [('172.19.0.16', 9092) IPv4]
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,066 1 INFO Starting new heartbeat thread
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,067 1 INFO Revoking previously assigned partitions set() for group swh.journal.client
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,068 1 INFO <BrokerConnection node_id=1001 host=kafka:9092 <connecting> [IPv4 ('172.19.0.16', 9092)]>: Connection complete.
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,074 1 INFO <BrokerConnection node_id=bootstrap host=kafka:9092 <connected> [IPv4 ('172.19.0.16', 9092)]>: Closing connection.
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,075 1 INFO (Re-)joining group swh.journal.client
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,190 1 INFO (Re-)joining group swh.journal.client
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,294 1 INFO (Re-)joining group swh.journal.client
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,419 1 INFO Elected group leader -- performing partition assignments using range
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,420 1 WARNING No partition metadata for topic swh.journal.objects.origin_visit
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,420 1 WARNING No partition metadata for topic swh.journal.objects.origin
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,644 1 INFO Successfully joined group swh.journal.client with generation 1
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,644 1 INFO Updated partition assignment: []
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,645 1 INFO Setting newly assigned partitions set() for group swh.journal.client
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,652 1 INFO Revoking previously assigned partitions set() for group swh.journal.client
swh-indexer-journal-client_1 | 2019-03-08 15:17:11,653 1 INFO (Re-)joining group swh.journal.client
swh-indexer-journal-client_1 | 2019-03-08 15:17:14,679 1 INFO Elected group leader -- performing partition assignments using range
swh-indexer-journal-client_1 | 2019-03-08 15:17:14,689 1 INFO Successfully joined group swh.journal.client with generation 2
swh-indexer-journal-client_1 | 2019-03-08 15:17:14,690 1 INFO Updated partition assignment: [TopicPartition(topic='swh.journal.objects.origin_visit', partition=0)]
swh-indexer-journal-client_1 | 2019-03-08 15:17:14,690 1 INFO Setting newly assigned partitions {TopicPartition(topic='swh.journal.objects.origin_visit', partition=0)} for group swh.journal.client
swh-indexer-journal-client_1 | 2019-03-08 15:22:14,892 1 WARNING Heartbeat session expired, marking coordinator dead
swh-indexer-journal-client_1 | 2019-03-08 15:22:14,892 1 WARNING Marking the coordinator dead (node 1001) for group swh.journal.client: Heartbeat session expired.
swh-indexer-journal-client_1 | 2019-03-08 15:22:14,895 1 INFO Group coordinator for swh.journal.client is BrokerMetadata(nodeId=1001, host='kafka', port=9092, rack=None)
swh-indexer-journal-client_1 | 2019-03-08 15:22:14,895 1 INFO Discovered coordinator 1001 for group swh.journal.client
swh-indexer-journal-client_1 | Traceback (most recent call last):
swh-indexer-journal-client_1 | File "/usr/local/lib/python3.6/runpy.py", line 193, in _run_module_as_main
swh-indexer-journal-client_1 | "__main__", mod_spec)
swh-indexer-journal-client_1 | File "/usr/local/lib/python3.6/runpy.py", line 85, in _run_code
swh-indexer-journal-client_1 | exec(code, run_globals)
swh-indexer-journal-client_1 | File "/usr/local/lib/python3.6/site-packages/swh/indexer/journal_client.py", line 88, in <module>
swh-indexer-journal-client_1 | main()
swh-indexer-journal-client_1 | File "/usr/local/lib/python3.6/site-packages/click/core.py", line 764, in __call__
swh-indexer-journal-client_1 | return self.main(*args, **kwargs)
swh-indexer-journal-client_1 | File "/usr/local/lib/python3.6/site-packages/click/core.py", line 717, in main
swh-indexer-journal-client_1 | rv = self.invoke(ctx)
swh-indexer-journal-client_1 | File "/usr/local/lib/python3.6/site-packages/click/core.py", line 956, in invoke
swh-indexer-journal-client_1 | return ctx.invoke(self.callback, **ctx.params)
swh-indexer-journal-client_1 | File "/usr/local/lib/python3.6/site-packages/click/core.py", line 555, in invoke
swh-indexer-journal-client_1 | return callback(*args, **kwargs)
swh-indexer-journal-client_1 | File "/usr/local/lib/python3.6/site-packages/swh/indexer/journal_client.py", line 86, in main
swh-indexer-journal-client_1 | IndexerJournalClient().process()
swh-indexer-journal-client_1 | File "/usr/local/lib/python3.6/site-packages/swh/journal/client.py", line 121, in process
swh-indexer-journal-client_1 | self.consumer.commit()
swh-indexer-journal-client_1 | File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 515, in commit
swh-indexer-journal-client_1 | self._coordinator.commit_offsets_sync(offsets)
swh-indexer-journal-client_1 | File "/usr/local/lib/python3.6/site-packages/kafka/coordinator/consumer.py", line 513, in commit_offsets_sync
swh-indexer-journal-client_1 | raise future.exception # pylint: disable-msg=raising-bad-type
swh-indexer-journal-client_1 | kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
swh-indexer-journal-client_1 | rebalanced and assigned the partitions to another member.
swh-indexer-journal-client_1 | This means that the time between subsequent calls to poll()
swh-indexer-journal-client_1 | was longer than the configured max_poll_interval_ms, which
swh-indexer-journal-client_1 | typically implies that the poll loop is spending too much
swh-indexer-journal-client_1 | time message processing. You can address this either by
swh-indexer-journal-client_1 | increasing the rebalance timeout with max_poll_interval_ms,
swh-indexer-journal-client_1 | or by reducing the maximum size of batches returned in poll()
swh-indexer-journal-client_1 | with max_poll_records.
swh-journal-client_1 | 2019-03-08 15:22:51,599 1 INFO client received the following messages: defaultdict(<class 'list'>, {'origin_visit': [{b'date': b'2019-03-08 15:19:37.500108+00:00', b'metadata': None, b'origin': 1, b'snapshot': None, b'status': b'ongoing', b'visit': 1}]})
swh-journal-client_1 | Traceback (most recent call last):
swh-journal-client_1 | File "/usr/local/lib/python3.6/runpy.py", line 193, in _run_module_as_main
swh-journal-client_1 | "__main__", mod_spec)
swh-journal-client_1 | File "/usr/local/lib/python3.6/runpy.py", line 85, in _run_code
swh-journal-client_1 | exec(code, run_globals)
swh-journal-client_1 | File "/client.py", line 45, in <module>
swh-journal-client_1 | main()
swh-journal-client_1 | File "/usr/local/lib/python3.6/site-packages/click/core.py", line 764, in __call__
swh-journal-client_1 | return self.main(*args, **kwargs)
swh-journal-client_1 | File "/usr/local/lib/python3.6/site-packages/click/core.py", line 717, in main
swh-journal-client_1 | rv = self.invoke(ctx)
swh-journal-client_1 | File "/usr/local/lib/python3.6/site-packages/click/core.py", line 956, in invoke
swh-journal-client_1 | return ctx.invoke(self.callback, **ctx.params)
swh-journal-client_1 | File "/usr/local/lib/python3.6/site-packages/click/core.py", line 555, in invoke
swh-journal-client_1 | return callback(*args, **kwargs)
swh-journal-client_1 | File "/client.py", line 43, in main
swh-journal-client_1 | JournalClientLogger().process()
swh-journal-client_1 | File "/src/swh-journal/swh/journal/client.py", line 121, in process
swh-journal-client_1 | self.consumer.commit()
swh-journal-client_1 | File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 515, in commit
swh-journal-client_1 | self._coordinator.commit_offsets_sync(offsets)
swh-journal-client_1 | File "/usr/local/lib/python3.6/site-packages/kafka/coordinator/consumer.py", line 513, in commit_offsets_sync
swh-journal-client_1 | raise future.exception # pylint: disable-msg=raising-bad-type
swh-journal-client_1 | kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
swh-journal-client_1 | rebalanced and assigned the partitions to another member.
swh-journal-client_1 | This means that the time between subsequent calls to poll()
swh-journal-client_1 | was longer than the configured max_poll_interval_ms, which
swh-journal-client_1 | typically implies that the poll loop is spending too much
swh-journal-client_1 | time message processing. You can address this either by
swh-journal-client_1 | increasing the rebalance timeout with max_poll_interval_ms,
swh-journal-client_1 | or by reducing the maximum size of batches returned in poll()
swh-journal-client_1 | with max_poll_records.
swh-journal-client_1 |
swh-journal-client_1 | 2019-03-08 15:22:51,642 1 INFO Stopping heartbeat thread
Looks like some additional Kafka configuration is required to avoid such behaviors.
Migrated from T1575 (view on Phabricator)