Skip to content

Commit 143afab

Browse files
committed
Dont acquire lock during KafkaClient.send if node is connected / ready
1 parent 2a1c3ca commit 143afab

File tree

1 file changed

+8
-7
lines changed

1 file changed

+8
-7
lines changed

kafka/client_async.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -499,14 +499,15 @@ def is_ready(self, node_id, metadata_priority=True):
499499
return True
500500

501501
def _can_send_request(self, node_id):
502-
with self._lock:
503-
if node_id not in self._conns:
504-
return False
505-
conn = self._conns[node_id]
506-
return conn.connected() and conn.can_send_more()
502+
conn = self._conns.get(node_id)
503+
if not conn:
504+
return False
505+
return conn.connected() and conn.can_send_more()
507506

508507
def send(self, node_id, request):
509-
"""Send a request to a specific node.
508+
"""Send a request to a specific node. Bytes are placed on an
509+
internal per-connection send-queue. Actual network I/O will be
510+
triggered in a subsequent call to .poll()
510511
511512
Arguments:
512513
node_id (int): destination node
@@ -518,7 +519,7 @@ def send(self, node_id, request):
518519
Returns:
519520
Future: resolves to Response struct or Error
520521
"""
521-
with self._lock:
522+
if not self._can_send_request(node_id):
522523
if not self._maybe_connect(node_id):
523524
return Future().failure(Errors.NodeNotReadyError(node_id))
524525

0 commit comments

Comments
 (0)