Skip to content

Commit 2a1c3ca

Browse files
committed
Send network requests during KafkaClient.poll() rather than in KafkaClient.send()
1 parent f39e9ee commit 2a1c3ca

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

kafka/client_async.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,17 @@ def send(self, node_id, request):
522522
if not self._maybe_connect(node_id):
523523
return Future().failure(Errors.NodeNotReadyError(node_id))
524524

525-
return self._conns[node_id].send(request)
525+
# conn.send will queue the request internally
526+
# we will need to call send_pending_requests()
527+
# to trigger network I/O
528+
future = self._conns[node_id].send(request, blocking=False)
529+
530+
# Wakeup signal is useful in case another thread is
531+
# blocked waiting for incoming network traffic while holding
532+
# the client lock in poll().
533+
self.wakeup()
534+
535+
return future
526536

527537
def poll(self, timeout_ms=None, future=None):
528538
"""Try to read and write to sockets.
@@ -640,6 +650,8 @@ def _poll(self, timeout):
640650
conn.close(error=Errors.RequestTimedOutError(
641651
'Request timed out after %s ms' %
642652
conn.config['request_timeout_ms']))
653+
else:
654+
conn.send_pending_requests()
643655

644656
if self._sensors:
645657
self._sensors.io_time.record((time.time() - end_select) * 1000000000)

0 commit comments

Comments
 (0)