Skip to content

Kafka clients crash easily in docker-compose environment

While testing a complete loading then indexing chain in the docker-compose environment, the following errors are often reported :

swh-indexer-journal-client_1  | 2019-03-08 15:17:08,360 1 INFO Starting indexer journal client with config {'content_size_limit': 104857600, 'log_db': 'dbname=softwareheritage-log', 'brokers': ['kafka'], 'max_messages': 1, 'scheduler': {'cls': 'remote', 'args': {'url': 'http://swh-scheduler-api:5008/'}}, 'topic_prefix': 'swh.journal.objects', 'consumer_id': 'swh.journal.client', 'object_types': ['origin_visit'], 'auto_offset_reset': 'earliest', 'origin_visit_tasks': [{'type': 'indexer_origin_metadata', 'kwargs': {'policy_update': 'update-dups', 'parse_ids': False}}]}
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,063 1 INFO Group coordinator for swh.journal.client is BrokerMetadata(nodeId=1001, host='kafka', port=9092, rack=None)
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,063 1 INFO Discovered coordinator 1001 for group swh.journal.client
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,066 1 INFO <BrokerConnection node_id=1001 host=kafka:9092 <connecting> [IPv4 ('172.19.0.16', 9092)]>: connecting to kafka:9092 [('172.19.0.16', 9092) IPv4]
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,066 1 INFO Starting new heartbeat thread
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,067 1 INFO Revoking previously assigned partitions set() for group swh.journal.client
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,068 1 INFO <BrokerConnection node_id=1001 host=kafka:9092 <connecting> [IPv4 ('172.19.0.16', 9092)]>: Connection complete.
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,074 1 INFO <BrokerConnection node_id=bootstrap host=kafka:9092 <connected> [IPv4 ('172.19.0.16', 9092)]>: Closing connection. 
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,075 1 INFO (Re-)joining group swh.journal.client
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,190 1 INFO (Re-)joining group swh.journal.client
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,294 1 INFO (Re-)joining group swh.journal.client
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,419 1 INFO Elected group leader -- performing partition assignments using range
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,420 1 WARNING No partition metadata for topic swh.journal.objects.origin_visit
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,420 1 WARNING No partition metadata for topic swh.journal.objects.origin
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,644 1 INFO Successfully joined group swh.journal.client with generation 1
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,644 1 INFO Updated partition assignment: []
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,645 1 INFO Setting newly assigned partitions set() for group swh.journal.client
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,652 1 INFO Revoking previously assigned partitions set() for group swh.journal.client
swh-indexer-journal-client_1  | 2019-03-08 15:17:11,653 1 INFO (Re-)joining group swh.journal.client
swh-indexer-journal-client_1  | 2019-03-08 15:17:14,679 1 INFO Elected group leader -- performing partition assignments using range
swh-indexer-journal-client_1  | 2019-03-08 15:17:14,689 1 INFO Successfully joined group swh.journal.client with generation 2
swh-indexer-journal-client_1  | 2019-03-08 15:17:14,690 1 INFO Updated partition assignment: [TopicPartition(topic='swh.journal.objects.origin_visit', partition=0)]
swh-indexer-journal-client_1  | 2019-03-08 15:17:14,690 1 INFO Setting newly assigned partitions {TopicPartition(topic='swh.journal.objects.origin_visit', partition=0)} for group swh.journal.client
swh-indexer-journal-client_1  | 2019-03-08 15:22:14,892 1 WARNING Heartbeat session expired, marking coordinator dead
swh-indexer-journal-client_1  | 2019-03-08 15:22:14,892 1 WARNING Marking the coordinator dead (node 1001) for group swh.journal.client: Heartbeat session expired.
swh-indexer-journal-client_1  | 2019-03-08 15:22:14,895 1 INFO Group coordinator for swh.journal.client is BrokerMetadata(nodeId=1001, host='kafka', port=9092, rack=None)
swh-indexer-journal-client_1  | 2019-03-08 15:22:14,895 1 INFO Discovered coordinator 1001 for group swh.journal.client
swh-indexer-journal-client_1  | Traceback (most recent call last):
swh-indexer-journal-client_1  |   File "/usr/local/lib/python3.6/runpy.py", line 193, in _run_module_as_main
swh-indexer-journal-client_1  |     "__main__", mod_spec)
swh-indexer-journal-client_1  |   File "/usr/local/lib/python3.6/runpy.py", line 85, in _run_code
swh-indexer-journal-client_1  |     exec(code, run_globals)
swh-indexer-journal-client_1  |   File "/usr/local/lib/python3.6/site-packages/swh/indexer/journal_client.py", line 88, in <module>
swh-indexer-journal-client_1  |     main()
swh-indexer-journal-client_1  |   File "/usr/local/lib/python3.6/site-packages/click/core.py", line 764, in __call__
swh-indexer-journal-client_1  |     return self.main(*args, **kwargs)
swh-indexer-journal-client_1  |   File "/usr/local/lib/python3.6/site-packages/click/core.py", line 717, in main
swh-indexer-journal-client_1  |     rv = self.invoke(ctx)
swh-indexer-journal-client_1  |   File "/usr/local/lib/python3.6/site-packages/click/core.py", line 956, in invoke
swh-indexer-journal-client_1  |     return ctx.invoke(self.callback, **ctx.params)
swh-indexer-journal-client_1  |   File "/usr/local/lib/python3.6/site-packages/click/core.py", line 555, in invoke
swh-indexer-journal-client_1  |     return callback(*args, **kwargs)
swh-indexer-journal-client_1  |   File "/usr/local/lib/python3.6/site-packages/swh/indexer/journal_client.py", line 86, in main
swh-indexer-journal-client_1  |     IndexerJournalClient().process()
swh-indexer-journal-client_1  |   File "/usr/local/lib/python3.6/site-packages/swh/journal/client.py", line 121, in process
swh-indexer-journal-client_1  |     self.consumer.commit()
swh-indexer-journal-client_1  |   File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 515, in commit
swh-indexer-journal-client_1  |     self._coordinator.commit_offsets_sync(offsets)
swh-indexer-journal-client_1  |   File "/usr/local/lib/python3.6/site-packages/kafka/coordinator/consumer.py", line 513, in commit_offsets_sync
swh-indexer-journal-client_1  |     raise future.exception # pylint: disable-msg=raising-bad-type
swh-indexer-journal-client_1  | kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
swh-indexer-journal-client_1  |             rebalanced and assigned the partitions to another member.
swh-indexer-journal-client_1  |             This means that the time between subsequent calls to poll()
swh-indexer-journal-client_1  |             was longer than the configured max_poll_interval_ms, which
swh-indexer-journal-client_1  |             typically implies that the poll loop is spending too much
swh-indexer-journal-client_1  |             time message processing. You can address this either by
swh-indexer-journal-client_1  |             increasing the rebalance timeout with max_poll_interval_ms,
swh-indexer-journal-client_1  |             or by reducing the maximum size of batches returned in poll()
swh-indexer-journal-client_1  |             with max_poll_records.


swh-journal-client_1          | 2019-03-08 15:22:51,599 1 INFO client received the following messages: defaultdict(<class 'list'>, {'origin_visit': [{b'date': b'2019-03-08 15:19:37.500108+00:00', b'metadata': None, b'origin': 1, b'snapshot': None, b'status': b'ongoing', b'visit': 1}]})
swh-journal-client_1          | Traceback (most recent call last):
swh-journal-client_1          |   File "/usr/local/lib/python3.6/runpy.py", line 193, in _run_module_as_main
swh-journal-client_1          |     "__main__", mod_spec)
swh-journal-client_1          |   File "/usr/local/lib/python3.6/runpy.py", line 85, in _run_code
swh-journal-client_1          |     exec(code, run_globals)
swh-journal-client_1          |   File "/client.py", line 45, in <module>
swh-journal-client_1          |     main()
swh-journal-client_1          |   File "/usr/local/lib/python3.6/site-packages/click/core.py", line 764, in __call__
swh-journal-client_1          |     return self.main(*args, **kwargs)
swh-journal-client_1          |   File "/usr/local/lib/python3.6/site-packages/click/core.py", line 717, in main
swh-journal-client_1          |     rv = self.invoke(ctx)
swh-journal-client_1          |   File "/usr/local/lib/python3.6/site-packages/click/core.py", line 956, in invoke
swh-journal-client_1          |     return ctx.invoke(self.callback, **ctx.params)
swh-journal-client_1          |   File "/usr/local/lib/python3.6/site-packages/click/core.py", line 555, in invoke
swh-journal-client_1          |     return callback(*args, **kwargs)
swh-journal-client_1          |   File "/client.py", line 43, in main
swh-journal-client_1          |     JournalClientLogger().process()
swh-journal-client_1          |   File "/src/swh-journal/swh/journal/client.py", line 121, in process
swh-journal-client_1          |     self.consumer.commit()
swh-journal-client_1          |   File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 515, in commit
swh-journal-client_1          |     self._coordinator.commit_offsets_sync(offsets)
swh-journal-client_1          |   File "/usr/local/lib/python3.6/site-packages/kafka/coordinator/consumer.py", line 513, in commit_offsets_sync
swh-journal-client_1          |     raise future.exception # pylint: disable-msg=raising-bad-type
swh-journal-client_1          | kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
swh-journal-client_1          |             rebalanced and assigned the partitions to another member.
swh-journal-client_1          |             This means that the time between subsequent calls to poll()
swh-journal-client_1          |             was longer than the configured max_poll_interval_ms, which
swh-journal-client_1          |             typically implies that the poll loop is spending too much
swh-journal-client_1          |             time message processing. You can address this either by
swh-journal-client_1          |             increasing the rebalance timeout with max_poll_interval_ms,
swh-journal-client_1          |             or by reducing the maximum size of batches returned in poll()
swh-journal-client_1          |             with max_poll_records.
swh-journal-client_1          |             
swh-journal-client_1          | 2019-03-08 15:22:51,642 1 INFO Stopping heartbeat thread

Looks like some additional Kafka configuration is required to avoid such behaviors.


Migrated from T1575 (view on Phabricator)

To upload designs, you'll need to enable LFS and have an admin enable hashed storage. More information