Skip to content

pytest_plugin: Prevent possible hang in consumer fixture destruction

Since the release of confluent-kafka v1.6.0, some swh-storage tests became flaky and can hang in consumer fixture destruction when closing the wrapped kafka consumer, see gdb stacktrace below:

#0  futex_wait_cancelable (private=0, expected=0, futex_word=0x29372c0) at ../sysdeps/unix/sysv/linux/futex-internal.h:88
#1  __pthread_cond_wait_common (abstime=0x0, mutex=0x2937270, cond=0x2937298) at pthread_cond_wait.c:502
#2  __pthread_cond_wait (cond=0x2937298, mutex=0x2937270) at pthread_cond_wait.c:655
#3  0x00007ffff7f5c1a9 in cnd_wait (cond=<optimized out>, mutex=<optimized out>) at cnd_wait.c:24
#4  0x00007ffff3edaff5 in cnd_timedwait_abs (cnd=cnd@entry=0x2937298, mtx=mtx@entry=0x2937270, tspec=tspec@entry=0x7fffffff77b0) at /home/anlambert/dev/librdkafka/src/tinycthread_extra.c:99
#5  0x00007ffff3e5e025 in rd_kafka_q_pop_serve (rkq=rkq@entry=0x2937270, timeout_us=<optimized out>, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, callback=callback@entry=0x0, 
    opaque=opaque@entry=0x0) at /home/anlambert/dev/librdkafka/src/rdkafka_queue.c:404
#6  0x00007ffff3e5e140 in rd_kafka_q_pop (rkq=rkq@entry=0x2937270, timeout_us=<optimized out>, version=version@entry=0) at /home/anlambert/dev/librdkafka/src/rdkafka_queue.c:428
#7  0x00007ffff3e4c18c in rd_kafka_op_req0 (destq=destq@entry=0x26d9ec0, recvq=recvq@entry=0x2937270, rko=rko@entry=0x1997b80, timeout_ms=timeout_ms@entry=-1) at /home/anlambert/dev/librdkafka/src/rdtime.h:146
#8  0x00007ffff3e4cadf in rd_kafka_op_req (destq=0x26d9ec0, rko=rko@entry=0x1997b80, timeout_ms=timeout_ms@entry=-1) at /home/anlambert/dev/librdkafka/src/rdkafka_op.c:631
#9  0x00007ffff3e821b0 in rd_kafka_assign0 (rk=<optimized out>, assign_method=RD_KAFKA_ASSIGN_METHOD_ASSIGN, partitions=0x7ffeb400bdb0) at /home/anlambert/dev/librdkafka/src/rdkafka_subscription.c:123
#10 0x00007ffff3e82202 in rd_kafka_assign (rk=rk@entry=0x29523f0, partitions=partitions@entry=0x7ffeb400bdb0) at /home/anlambert/dev/librdkafka/src/rdkafka_subscription.c:134
#11 0x00007ffff53722f6 in Consumer_rebalance_cb (rk=0x29523f0, err=RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, c_parts=0x7ffeb400bdb0, opaque=0x7ffe0d30d0d0)
    at /home/anlambert/dev/confluent-kafka-python/src/confluent_kafka/src/Consumer.c:1365
#12 0x00007ffff3de7f1f in rd_kafka_poll_cb (rk=rk@entry=0x29523f0, rkq=rkq@entry=0x1b48300, rko=rko@entry=0x7ffeb400bd30, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN, opaque=opaque@entry=0x0)
    at /home/anlambert/dev/librdkafka/src/rdkafka.c:3647
#13 0x00007ffff3de8303 in rd_kafka_consumer_close (rk=0x29523f0) at /home/anlambert/dev/librdkafka/src/rdkafka.c:3186
#14 0x00007ffff5373714 in Consumer_close (self=0x7ffe0d30d0d0, ignore=<optimized out>) at /home/anlambert/dev/confluent-kafka-python/src/confluent_kafka/src/Consumer.c:981

Explicitely performing the commit operation on the consumer before closing it removes the hang issue.


Migrated from D5048 (view on Phabricator)

Merge request reports