Delay the unsubscribe to the end of handle_messages
to prevent some possible race condition leading to kafka errors like:
rd_kafka_assignment_partition_stopped: Assertion `rktp->rktp_started' failed
This could occur when, at the time of unsubscribing from a partition, another partition is also depleted. Since the unsubscription consist in resubscribing to all partitions except unsubscribes ones, we could try to subscribe to such a depleted parition, leading to the error message listed above.
Depends on !45 (closed).
Migrated from D7463 (view on Phabricator)