@@ -304,18 +304,21 @@ def _conn_state_change(self, node_id, conn):
304
304
# SSL connections can enter this state 2x (second during Handshake)
305
305
if node_id not in self ._connecting :
306
306
self ._connecting .add (node_id )
307
+ try :
307
308
self ._selector .register (conn ._sock , selectors .EVENT_WRITE )
309
+ except KeyError :
310
+ self ._selector .modify (conn ._sock , selectors .EVENT_WRITE )
308
311
309
312
elif conn .connected ():
310
313
log .debug ("Node %s connected" , node_id )
311
314
if node_id in self ._connecting :
312
315
self ._connecting .remove (node_id )
313
316
314
317
try :
315
- self ._selector .unregister (conn ._sock )
318
+ self ._selector .modify (conn ._sock , selectors . EVENT_READ , conn )
316
319
except KeyError :
317
- pass
318
- self . _selector . register ( conn . _sock , selectors . EVENT_READ , conn )
320
+ self . _selector . register ( conn . _sock , selectors . EVENT_READ , conn )
321
+
319
322
if self ._sensors :
320
323
self ._sensors .connection_created .record ()
321
324
@@ -336,6 +339,7 @@ def _conn_state_change(self, node_id, conn):
336
339
self ._selector .unregister (conn ._sock )
337
340
except KeyError :
338
341
pass
342
+
339
343
if self ._sensors :
340
344
self ._sensors .connection_closed .record ()
341
345
@@ -348,6 +352,17 @@ def _conn_state_change(self, node_id, conn):
348
352
log .warning ("Node %s connection failed -- refreshing metadata" , node_id )
349
353
self .cluster .request_update ()
350
354
355
+ def maybe_connect (self , node_id ):
356
+ """Queues a node for asynchronous connection during the next .poll()"""
357
+ if self ._can_connect (node_id ):
358
+ self ._connecting .add (node_id )
359
+ # Wakeup signal is useful in case another thread is
360
+ # blocked waiting for incoming network traffic while holding
361
+ # the client lock in poll().
362
+ self .wakeup ()
363
+ return True
364
+ return False
365
+
351
366
def _maybe_connect (self , node_id ):
352
367
"""Idempotent non-blocking connection attempt to the given node id."""
353
368
with self ._lock :
@@ -397,7 +412,7 @@ def ready(self, node_id, metadata_priority=True):
397
412
Returns:
398
413
bool: True if we are ready to send to the given node
399
414
"""
400
- self ._maybe_connect (node_id )
415
+ self .maybe_connect (node_id )
401
416
return self .is_ready (node_id , metadata_priority = metadata_priority )
402
417
403
418
def connected (self , node_id ):
@@ -520,8 +535,8 @@ def send(self, node_id, request):
520
535
Future: resolves to Response struct or Error
521
536
"""
522
537
if not self ._can_send_request (node_id ):
523
- if not self ._maybe_connect (node_id ):
524
- return Future ().failure (Errors .NodeNotReadyError (node_id ))
538
+ self .maybe_connect (node_id )
539
+ return Future ().failure (Errors .NodeNotReadyError (node_id ))
525
540
526
541
# conn.send will queue the request internally
527
542
# we will need to call send_pending_requests()
@@ -814,9 +829,8 @@ def refresh_done(val_or_error):
814
829
# have such application level configuration, using request timeout instead.
815
830
return self .config ['request_timeout_ms' ]
816
831
817
- if self ._can_connect (node_id ):
832
+ if self .maybe_connect (node_id ):
818
833
log .debug ("Initializing connection to node %s for metadata request" , node_id )
819
- self ._maybe_connect (node_id )
820
834
return self .config ['reconnect_backoff_ms' ]
821
835
822
836
# connected but can't send more, OR connecting
0 commit comments