Description
Hi, I am using kafka-python
version 2.0.2 on Ubuntu 22.04 with Python 3.10
My application use multithreading and sometimes I saw this exception
Exception ignored in: <function ConsumerCoordinator.__del__ at 0x7fc077562ef0>
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/kafka/coordinator/consumer.py", line 132, in __del__
super(ConsumerCoordinator, self).__del__()
File "/usr/local/lib/python3.10/dist-packages/kafka/coordinator/base.py", line 756, in __del__
self._close_heartbeat_thread()
File "/usr/local/lib/python3.10/dist-packages/kafka/coordinator/base.py", line 750, in _close_heartbeat_thread
self._heartbeat_thread.close()
File "/usr/local/lib/python3.10/dist-packages/kafka/coordinator/base.py", line 927, in close
self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000)
File "/usr/lib/python3.10/threading.py", line 1093, in join
raise RuntimeError("cannot join current thread")
RuntimeError: cannot join current thread
I found someone also have this problem #663 (comment)
To debug this error, I tried to add some logging just before line 927 in base.py
log.info(f'Current thread id: {threading.get_ident()}')
log.info(f'HeartbeatThread thread id: {self.ident}')
When exception happen, thread identifier of current thread is the same as thread identifier of the HeartbeatThread
object. Other times, when exception don't happend, the log print two different identifiers.
So I think error happen because at some point, the garbage collector was run on the same thread managed by HeartbeatThread
object. When garbage collector call __del__
method of ConsumerCoordinator
, it eventually leads to calling self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000)
inside close()
method of HeartbeatThread
and cause exception
To avoid this error, maybe we should avoid calling self.join
inside HeartbeatThread
?. We could do this by wrapping HeartbeatThread
inside another class or something?