Skip to content

Commit f460b26

Browse files
committed
Check for wakeup socket errors on read and close and reinit to reset
1 parent 3a0d5d6 commit f460b26

File tree

1 file changed

+35
-12
lines changed

1 file changed

+35
-12
lines changed

kafka/client_async.py

+35-12
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,9 @@ def __init__(self, **configs):
204204
# these properties need to be set on top of the initialization pipeline
205205
# because they are used when __del__ method is called
206206
self._closed = False
207-
self._wake_r, self._wake_w = socket.socketpair()
208207
self._selector = self.config['selector']()
208+
self._init_wakeup_socketpair()
209+
self._wake_lock = threading.Lock()
209210

210211
self.cluster = ClusterMetadata(**self.config)
211212
self._topics = set() # empty set will fetch all topic metadata
@@ -217,9 +218,6 @@ def __init__(self, **configs):
217218
self._refresh_on_disconnects = True
218219
self._last_bootstrap = 0
219220
self._bootstrap_fails = 0
220-
self._wake_r.setblocking(False)
221-
self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0)
222-
self._wake_lock = threading.Lock()
223221

224222
self._lock = threading.RLock()
225223

@@ -228,7 +226,6 @@ def __init__(self, **configs):
228226
# lock above.
229227
self._pending_completion = collections.deque()
230228

231-
self._selector.register(self._wake_r, selectors.EVENT_READ)
232229
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
233230
self._sensors = None
234231
if self.config['metrics']:
@@ -243,6 +240,25 @@ def __init__(self, **configs):
243240
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
244241
self.config['api_version'] = self.check_version(timeout=check_timeout)
245242

243+
def _init_wakeup_socketpair(self):
244+
self._wake_r, self._wake_w = socket.socketpair()
245+
self._wake_r.setblocking(False)
246+
self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0)
247+
self._waking = False
248+
self._selector.register(self._wake_r, selectors.EVENT_READ)
249+
250+
def _close_wakeup_socketpair(self):
251+
if self._wake_r is not None:
252+
try:
253+
self._selector.unregister(self._wake_r)
254+
except KeyError:
255+
pass
256+
self._wake_r.close()
257+
if self._wake_w is not None:
258+
self._wake_w.close()
259+
self._wake_r = None
260+
self._wake_w = None
261+
246262
def _can_bootstrap(self):
247263
effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts
248264
backoff_factor = 2 ** effective_failures
@@ -416,9 +432,8 @@ def connected(self, node_id):
416432
def _close(self):
417433
if not self._closed:
418434
self._closed = True
419-
self._wake_r.close()
420-
self._wake_w.close()
421435
self._selector.close()
436+
self._close_wakeup_socketpair()
422437

423438
def close(self, node_id=None):
424439
"""Close one or all broker connections.
@@ -944,20 +959,28 @@ def check_version(self, node_id=None, timeout=2, strict=False):
944959
raise Errors.NoBrokersAvailable()
945960

946961
def wakeup(self):
962+
if self._waking or self._wake_w is None:
963+
return
947964
with self._wake_lock:
948965
try:
949966
self._wake_w.sendall(b'x')
950-
except socket.timeout:
967+
self._waking = True
968+
except socket.timeout as e:
951969
log.warning('Timeout to send to wakeup socket!')
952-
raise Errors.KafkaTimeoutError()
953-
except socket.error:
954-
log.warning('Unable to send to wakeup socket!')
970+
raise Errors.KafkaTimeoutError(e)
971+
except socket.error as e:
972+
log.warning('Unable to send to wakeup socket! %s', e)
973+
raise e
955974

956975
def _clear_wake_fd(self):
957976
# reading from wake socket should only happen in a single thread
977+
self._waking = False
958978
while True:
959979
try:
960-
self._wake_r.recv(1024)
980+
if not self._wake_r.recv(1024):
981+
self._close_wakeup_socketpair()
982+
self._init_wakeup_socketpair()
983+
break
961984
except socket.error:
962985
break
963986

0 commit comments

Comments
 (0)