Skip to content

Commit 8c07925

Browse files
authored
Do network connections and writes in KafkaClient.poll() (#1729)
* Add BrokerConnection.send_pending_requests to support async network sends * Send network requests during KafkaClient.poll() rather than in KafkaClient.send() * Dont acquire lock during KafkaClient.send if node is connected / ready * Move all network connection IO into KafkaClient.poll()
1 parent 7a99013 commit 8c07925

File tree

6 files changed

+84
-57
lines changed

6 files changed

+84
-57
lines changed

kafka/client_async.py

+43-16
Original file line numberDiff line numberDiff line change
@@ -304,18 +304,21 @@ def _conn_state_change(self, node_id, conn):
304304
# SSL connections can enter this state 2x (second during Handshake)
305305
if node_id not in self._connecting:
306306
self._connecting.add(node_id)
307+
try:
307308
self._selector.register(conn._sock, selectors.EVENT_WRITE)
309+
except KeyError:
310+
self._selector.modify(conn._sock, selectors.EVENT_WRITE)
308311

309312
elif conn.connected():
310313
log.debug("Node %s connected", node_id)
311314
if node_id in self._connecting:
312315
self._connecting.remove(node_id)
313316

314317
try:
315-
self._selector.unregister(conn._sock)
318+
self._selector.modify(conn._sock, selectors.EVENT_READ, conn)
316319
except KeyError:
317-
pass
318-
self._selector.register(conn._sock, selectors.EVENT_READ, conn)
320+
self._selector.register(conn._sock, selectors.EVENT_READ, conn)
321+
319322
if self._sensors:
320323
self._sensors.connection_created.record()
321324

@@ -336,6 +339,7 @@ def _conn_state_change(self, node_id, conn):
336339
self._selector.unregister(conn._sock)
337340
except KeyError:
338341
pass
342+
339343
if self._sensors:
340344
self._sensors.connection_closed.record()
341345

@@ -348,6 +352,17 @@ def _conn_state_change(self, node_id, conn):
348352
log.warning("Node %s connection failed -- refreshing metadata", node_id)
349353
self.cluster.request_update()
350354

355+
def maybe_connect(self, node_id):
356+
"""Queues a node for asynchronous connection during the next .poll()"""
357+
if self._can_connect(node_id):
358+
self._connecting.add(node_id)
359+
# Wakeup signal is useful in case another thread is
360+
# blocked waiting for incoming network traffic while holding
361+
# the client lock in poll().
362+
self.wakeup()
363+
return True
364+
return False
365+
351366
def _maybe_connect(self, node_id):
352367
"""Idempotent non-blocking connection attempt to the given node id."""
353368
with self._lock:
@@ -397,7 +412,7 @@ def ready(self, node_id, metadata_priority=True):
397412
Returns:
398413
bool: True if we are ready to send to the given node
399414
"""
400-
self._maybe_connect(node_id)
415+
self.maybe_connect(node_id)
401416
return self.is_ready(node_id, metadata_priority=metadata_priority)
402417

403418
def connected(self, node_id):
@@ -499,14 +514,15 @@ def is_ready(self, node_id, metadata_priority=True):
499514
return True
500515

501516
def _can_send_request(self, node_id):
502-
with self._lock:
503-
if node_id not in self._conns:
504-
return False
505-
conn = self._conns[node_id]
506-
return conn.connected() and conn.can_send_more()
517+
conn = self._conns.get(node_id)
518+
if not conn:
519+
return False
520+
return conn.connected() and conn.can_send_more()
507521

508522
def send(self, node_id, request):
509-
"""Send a request to a specific node.
523+
"""Send a request to a specific node. Bytes are placed on an
524+
internal per-connection send-queue. Actual network I/O will be
525+
triggered in a subsequent call to .poll()
510526
511527
Arguments:
512528
node_id (int): destination node
@@ -518,11 +534,21 @@ def send(self, node_id, request):
518534
Returns:
519535
Future: resolves to Response struct or Error
520536
"""
521-
with self._lock:
522-
if not self._maybe_connect(node_id):
523-
return Future().failure(Errors.NodeNotReadyError(node_id))
537+
if not self._can_send_request(node_id):
538+
self.maybe_connect(node_id)
539+
return Future().failure(Errors.NodeNotReadyError(node_id))
540+
541+
# conn.send will queue the request internally
542+
# we will need to call send_pending_requests()
543+
# to trigger network I/O
544+
future = self._conns[node_id].send(request, blocking=False)
524545

525-
return self._conns[node_id].send(request)
546+
# Wakeup signal is useful in case another thread is
547+
# blocked waiting for incoming network traffic while holding
548+
# the client lock in poll().
549+
self.wakeup()
550+
551+
return future
526552

527553
def poll(self, timeout_ms=None, future=None):
528554
"""Try to read and write to sockets.
@@ -640,6 +666,8 @@ def _poll(self, timeout):
640666
conn.close(error=Errors.RequestTimedOutError(
641667
'Request timed out after %s ms' %
642668
conn.config['request_timeout_ms']))
669+
else:
670+
conn.send_pending_requests()
643671

644672
if self._sensors:
645673
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
@@ -801,9 +829,8 @@ def refresh_done(val_or_error):
801829
# have such application level configuration, using request timeout instead.
802830
return self.config['request_timeout_ms']
803831

804-
if self._can_connect(node_id):
832+
if self.maybe_connect(node_id):
805833
log.debug("Initializing connection to node %s for metadata request", node_id)
806-
self._maybe_connect(node_id)
807834
return self.config['reconnect_backoff_ms']
808835

809836
# connected but can't send more, OR connecting

kafka/conn.py

+30-19
Original file line numberDiff line numberDiff line change
@@ -733,47 +733,58 @@ def close(self, error=None):
733733
future.failure(error)
734734
self.config['state_change_callback'](self)
735735

736-
def send(self, request):
737-
"""send request, return Future()
738-
739-
Can block on network if request is larger than send_buffer_bytes
740-
"""
736+
def send(self, request, blocking=True):
737+
"""Queue request for async network send, return Future()"""
741738
future = Future()
742739
if self.connecting():
743740
return future.failure(Errors.NodeNotReadyError(str(self)))
744741
elif not self.connected():
745742
return future.failure(Errors.KafkaConnectionError(str(self)))
746743
elif not self.can_send_more():
747744
return future.failure(Errors.TooManyInFlightRequests(str(self)))
748-
return self._send(request)
745+
return self._send(request, blocking=blocking)
749746

750-
def _send(self, request):
747+
def _send(self, request, blocking=True):
751748
assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED)
752749
future = Future()
753750
correlation_id = self._protocol.send_request(request)
751+
752+
# Attempt to replicate behavior from prior to introduction of
753+
# send_pending_requests() / async sends
754+
if blocking:
755+
error = self.send_pending_requests()
756+
if isinstance(error, Exception):
757+
future.failure(error)
758+
return future
759+
760+
log.debug('%s Request %d: %s', self, correlation_id, request)
761+
if request.expect_response():
762+
sent_time = time.time()
763+
ifr = (correlation_id, future, sent_time)
764+
self.in_flight_requests.append(ifr)
765+
else:
766+
future.success(None)
767+
return future
768+
769+
def send_pending_requests(self):
770+
"""Can block on network if request is larger than send_buffer_bytes"""
771+
if self.state not in (ConnectionStates.AUTHENTICATING,
772+
ConnectionStates.CONNECTED):
773+
return Errors.NodeNotReadyError(str(self))
754774
data = self._protocol.send_bytes()
755775
try:
756776
# In the future we might manage an internal write buffer
757777
# and send bytes asynchronously. For now, just block
758778
# sending each request payload
759-
sent_time = time.time()
760779
total_bytes = self._send_bytes_blocking(data)
761780
if self._sensors:
762781
self._sensors.bytes_sent.record(total_bytes)
782+
return total_bytes
763783
except ConnectionError as e:
764-
log.exception("Error sending %s to %s", request, self)
784+
log.exception("Error sending request data to %s", self)
765785
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
766786
self.close(error=error)
767-
return future.failure(error)
768-
log.debug('%s Request %d: %s', self, correlation_id, request)
769-
770-
if request.expect_response():
771-
ifr = (correlation_id, future, sent_time)
772-
self.in_flight_requests.append(ifr)
773-
else:
774-
future.success(None)
775-
776-
return future
787+
return error
777788

778789
def can_send_more(self):
779790
"""Return True unless there are max_in_flight_requests_per_connection."""

kafka/consumer/group.py

+1-12
Original file line numberDiff line numberDiff line change
@@ -1070,16 +1070,6 @@ def _message_generator(self):
10701070
# like heartbeats, auto-commits, and metadata refreshes
10711071
timeout_at = self._next_timeout()
10721072

1073-
# Because the consumer client poll does not sleep unless blocking on
1074-
# network IO, we need to explicitly sleep when we know we are idle
1075-
# because we haven't been assigned any partitions to fetch / consume
1076-
if self._use_consumer_group() and not self.assignment():
1077-
sleep_time = max(timeout_at - time.time(), 0)
1078-
if sleep_time > 0 and not self._client.in_flight_request_count():
1079-
log.debug('No partitions assigned; sleeping for %s', sleep_time)
1080-
time.sleep(sleep_time)
1081-
continue
1082-
10831073
# Short-circuit the fetch iterator if we are already timed out
10841074
# to avoid any unintentional interaction with fetcher setup
10851075
if time.time() > timeout_at:
@@ -1090,8 +1080,7 @@ def _message_generator(self):
10901080
if time.time() > timeout_at:
10911081
log.debug("internal iterator timeout - breaking for poll")
10921082
break
1093-
if self._client.in_flight_request_count():
1094-
self._client.poll(timeout_ms=0)
1083+
self._client.poll(timeout_ms=0)
10951084

10961085
# An else block on a for loop only executes if there was no break
10971086
# so this should only be called on a StopIteration from the fetcher

kafka/coordinator/base.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ def ensure_coordinator_ready(self):
252252
if self.config['api_version'] < (0, 8, 2):
253253
self.coordinator_id = self._client.least_loaded_node()
254254
if self.coordinator_id is not None:
255-
self._client.ready(self.coordinator_id)
255+
self._client.maybe_connect(self.coordinator_id)
256256
continue
257257

258258
future = self.lookup_coordinator()
@@ -686,7 +686,7 @@ def _handle_group_coordinator_response(self, future, response):
686686
self.coordinator_id = response.coordinator_id
687687
log.info("Discovered coordinator %s for group %s",
688688
self.coordinator_id, self.group_id)
689-
self._client.ready(self.coordinator_id)
689+
self._client.maybe_connect(self.coordinator_id)
690690
self.heartbeat.reset_timeouts()
691691
future.success(self.coordinator_id)
692692

test/fixtures.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -405,10 +405,11 @@ def _failure(error):
405405
retries = 10
406406
while True:
407407
node_id = self._client.least_loaded_node()
408-
for ready_retry in range(40):
409-
if self._client.ready(node_id, False):
408+
for connect_retry in range(40):
409+
self._client.maybe_connect(node_id)
410+
if self._client.connected(node_id):
410411
break
411-
time.sleep(.1)
412+
self._client.poll(timeout_ms=100)
412413
else:
413414
raise RuntimeError('Could not connect to broker with node id %d' % (node_id,))
414415

test/test_client_async.py

+4-5
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,7 @@ def test_conn_state_change(mocker, cli, conn):
125125
conn.state = ConnectionStates.CONNECTED
126126
cli._conn_state_change(node_id, conn)
127127
assert node_id not in cli._connecting
128-
sel.unregister.assert_called_with(conn._sock)
129-
sel.register.assert_called_with(conn._sock, selectors.EVENT_READ, conn)
128+
sel.modify.assert_called_with(conn._sock, selectors.EVENT_READ, conn)
130129

131130
# Failure to connect should trigger metadata update
132131
assert cli.cluster._need_update is False
@@ -145,7 +144,7 @@ def test_conn_state_change(mocker, cli, conn):
145144

146145

147146
def test_ready(mocker, cli, conn):
148-
maybe_connect = mocker.patch.object(cli, '_maybe_connect')
147+
maybe_connect = mocker.patch.object(cli, 'maybe_connect')
149148
node_id = 1
150149
cli.ready(node_id)
151150
maybe_connect.assert_called_with(node_id)
@@ -362,6 +361,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
362361
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
363362
mocker.patch.object(client, '_can_connect', return_value=True)
364363
mocker.patch.object(client, '_maybe_connect', return_value=True)
364+
mocker.patch.object(client, 'maybe_connect', return_value=True)
365365

366366
now = time.time()
367367
t = mocker.patch('time.time')
@@ -370,8 +370,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
370370
# first poll attempts connection
371371
client.poll(timeout_ms=12345678)
372372
client._poll.assert_called_with(2.222) # reconnect backoff
373-
client._can_connect.assert_called_once_with('foobar')
374-
client._maybe_connect.assert_called_once_with('foobar')
373+
client.maybe_connect.assert_called_once_with('foobar')
375374

376375
# poll while connecting should not attempt a new connection
377376
client._connecting.add('foobar')

0 commit comments

Comments
 (0)