Skip to content

Commit 146b893

Browse files
Merge pull request #1258 from dpkp/pending_completions
Move callback processing from BrokerConnection to KafkaClient
2 parents 0bd5d2a + e3b1ad2 commit 146b893

File tree

4 files changed

+65
-31
lines changed

4 files changed

+65
-31
lines changed

kafka/client.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
175175

176176
# Block
177177
while not future.is_done:
178-
conn.recv()
178+
for r, f in conn.recv():
179+
f.success(r)
179180

180181
if future.failed():
181182
log.error("Request failed: %s", future.exception)
@@ -288,7 +289,8 @@ def failed_payloads(payloads):
288289

289290
if not future.is_done:
290291
conn, _ = connections_by_future[future]
291-
conn.recv()
292+
for r, f in conn.recv():
293+
f.success(r)
292294
continue
293295

294296
_, broker = connections_by_future.pop(future)
@@ -352,8 +354,6 @@ def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
352354
try:
353355
host, port, afi = get_ip_port_afi(broker.host)
354356
conn = self._get_conn(host, broker.port, afi)
355-
conn.send(request_id, request)
356-
357357
except ConnectionError as e:
358358
log.warning('ConnectionError attempting to send request %s '
359359
'to server %s: %s', request_id, broker, e)
@@ -365,6 +365,11 @@ def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
365365
# No exception, try to get response
366366
else:
367367

368+
future = conn.send(request_id, request)
369+
while not future.is_done:
370+
for r, f in conn.recv():
371+
f.success(r)
372+
368373
# decoder_fn=None signal that the server is expected to not
369374
# send a response. This probably only applies to
370375
# ProduceRequest w/ acks = 0
@@ -376,18 +381,17 @@ def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
376381
responses[topic_partition] = None
377382
return []
378383

379-
try:
380-
response = conn.recv(request_id)
381-
except ConnectionError as e:
382-
log.warning('ConnectionError attempting to receive a '
384+
if future.failed():
385+
log.warning('Error attempting to receive a '
383386
'response to request %s from server %s: %s',
384-
request_id, broker, e)
387+
request_id, broker, future.exception)
385388

386389
for payload in payloads:
387390
topic_partition = (payload.topic, payload.partition)
388391
responses[topic_partition] = FailedPayloadsError(payload)
389392

390393
else:
394+
response = future.value
391395
_resps = []
392396
for payload_response in decoder_fn(response):
393397
topic_partition = (payload_response.topic,

kafka/client_async.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import absolute_import, division
22

3+
import collections
34
import copy
45
import functools
56
import heapq
@@ -204,6 +205,11 @@ def __init__(self, **configs):
204205
self._wake_r, self._wake_w = socket.socketpair()
205206
self._wake_r.setblocking(False)
206207
self._wake_lock = threading.Lock()
208+
209+
# when requests complete, they are transferred to this queue prior to
210+
# invocation.
211+
self._pending_completion = collections.deque()
212+
207213
self._selector.register(self._wake_r, selectors.EVENT_READ)
208214
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
209215
self._closed = False
@@ -254,7 +260,8 @@ def _bootstrap(self, hosts):
254260
future = bootstrap.send(metadata_request)
255261
while not future.is_done:
256262
self._selector.select(1)
257-
bootstrap.recv()
263+
for r, f in bootstrap.recv():
264+
f.success(r)
258265
if future.failed():
259266
bootstrap.close()
260267
continue
@@ -512,7 +519,9 @@ def poll(self, timeout_ms=None, future=None, delayed_tasks=True):
512519
Returns:
513520
list: responses received (can be empty)
514521
"""
515-
if timeout_ms is None:
522+
if future is not None:
523+
timeout_ms = 100
524+
elif timeout_ms is None:
516525
timeout_ms = self.config['request_timeout_ms']
517526

518527
responses = []
@@ -551,7 +560,9 @@ def poll(self, timeout_ms=None, future=None, delayed_tasks=True):
551560
self.config['request_timeout_ms'])
552561
timeout = max(0, timeout / 1000.0) # avoid negative timeouts
553562

554-
responses.extend(self._poll(timeout))
563+
self._poll(timeout)
564+
565+
responses.extend(self._fire_pending_completed_requests())
555566

556567
# If all we had was a timeout (future is None) - only do one poll
557568
# If we do have a future, we keep looping until it is done
@@ -561,7 +572,7 @@ def poll(self, timeout_ms=None, future=None, delayed_tasks=True):
561572
return responses
562573

563574
def _poll(self, timeout):
564-
responses = []
575+
"""Returns list of (response, future) tuples"""
565576
processed = set()
566577

567578
start_select = time.time()
@@ -600,14 +611,14 @@ def _poll(self, timeout):
600611
continue
601612

602613
self._idle_expiry_manager.update(conn.node_id)
603-
responses.extend(conn.recv()) # Note: conn.recv runs callbacks / errbacks
614+
self._pending_completion.extend(conn.recv())
604615

605616
# Check for additional pending SSL bytes
606617
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
607618
# TODO: optimize
608619
for conn in self._conns.values():
609620
if conn not in processed and conn.connected() and conn._sock.pending():
610-
responses.extend(conn.recv())
621+
self._pending_completion.extend(conn.recv())
611622

612623
for conn in six.itervalues(self._conns):
613624
if conn.requests_timed_out():
@@ -621,7 +632,6 @@ def _poll(self, timeout):
621632
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
622633

623634
self._maybe_close_oldest_connection()
624-
return responses
625635

626636
def in_flight_request_count(self, node_id=None):
627637
"""Get the number of in-flight requests for a node or all nodes.
@@ -640,6 +650,14 @@ def in_flight_request_count(self, node_id=None):
640650
else:
641651
return sum([len(conn.in_flight_requests) for conn in self._conns.values()])
642652

653+
def _fire_pending_completed_requests(self):
654+
responses = []
655+
while self._pending_completion:
656+
response, future = self._pending_completion.popleft()
657+
future.success(response)
658+
responses.append(response)
659+
return responses
660+
643661
def least_loaded_node(self):
644662
"""Choose the node with fewest outstanding requests, with fallbacks.
645663

kafka/conn.py

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@
55
import errno
66
import logging
77
from random import shuffle, uniform
8+
9+
# selectors in stdlib as of py3.4
10+
try:
11+
import selectors # pylint: disable=import-error
12+
except ImportError:
13+
# vendored backport module
14+
from .vendor import selectors34 as selectors
15+
816
import socket
917
import struct
1018
import sys
@@ -138,6 +146,9 @@ class BrokerConnection(object):
138146
api_version_auto_timeout_ms (int): number of milliseconds to throw a
139147
timeout exception from the constructor when checking the broker
140148
api version. Only applies if api_version is None
149+
selector (selectors.BaseSelector): Provide a specific selector
150+
implementation to use for I/O multiplexing.
151+
Default: selectors.DefaultSelector
141152
state_change_callback (callable): function to be called when the
142153
connection state changes from CONNECTING to CONNECTED etc.
143154
metrics (kafka.metrics.Metrics): Optionally provide a metrics
@@ -173,6 +184,7 @@ class BrokerConnection(object):
173184
'ssl_crlfile': None,
174185
'ssl_password': None,
175186
'api_version': (0, 8, 2), # default to most restrictive
187+
'selector': selectors.DefaultSelector,
176188
'state_change_callback': lambda conn: True,
177189
'metrics': None,
178190
'metric_group_prefix': '',
@@ -704,7 +716,7 @@ def can_send_more(self):
704716
def recv(self):
705717
"""Non-blocking network receive.
706718
707-
Return response if available
719+
Return list of (response, future)
708720
"""
709721
if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING:
710722
log.warning('%s cannot recv: socket not connected', self)
@@ -727,17 +739,16 @@ def recv(self):
727739
self.config['request_timeout_ms']))
728740
return ()
729741

730-
for response in responses:
742+
# augment respones w/ correlation_id, future, and timestamp
743+
for i in range(len(responses)):
731744
(correlation_id, future, timestamp) = self.in_flight_requests.popleft()
732-
if isinstance(response, Errors.KafkaError):
733-
self.close(response)
734-
break
735-
745+
latency_ms = (time.time() - timestamp) * 1000
736746
if self._sensors:
737-
self._sensors.request_time.record((time.time() - timestamp) * 1000)
747+
self._sensors.request_time.record(latency_ms)
738748

739-
log.debug('%s Response %d: %s', self, correlation_id, response)
740-
future.success(response)
749+
response = responses[i]
750+
log.debug('%s Response %d (%s ms): %s', self, correlation_id, latency_ms, response)
751+
responses[i] = (response, future)
741752

742753
return responses
743754

@@ -899,12 +910,12 @@ def connect():
899910
# request was unrecognized
900911
mr = self.send(MetadataRequest[0]([]))
901912

902-
if self._sock:
903-
self._sock.setblocking(True)
913+
selector = self.config['selector']()
914+
selector.register(self._sock, selectors.EVENT_READ)
904915
while not (f.is_done and mr.is_done):
905-
self.recv()
906-
if self._sock:
907-
self._sock.setblocking(False)
916+
for response, future in self.recv():
917+
future.success(response)
918+
selector.select(1)
908919

909920
if f.succeeded():
910921
if isinstance(request, ApiVersionRequest[0]):

test/test_client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def mock_conn(conn, success=True):
2828
else:
2929
mocked.send.return_value = Future().failure(Exception())
3030
conn.return_value = mocked
31+
conn.recv.return_value = []
3132

3233

3334
class TestSimpleClient(unittest.TestCase):
@@ -94,7 +95,7 @@ def test_send_broker_unaware_request(self):
9495
mock_conn(mocked_conns[('kafka03', 9092)], success=False)
9596
future = Future()
9697
mocked_conns[('kafka02', 9092)].send.return_value = future
97-
mocked_conns[('kafka02', 9092)].recv.side_effect = lambda: future.success('valid response')
98+
mocked_conns[('kafka02', 9092)].recv.return_value = [('valid response', future)]
9899

99100
def mock_get_conn(host, port, afi):
100101
return mocked_conns[(host, port)]

0 commit comments

Comments
 (0)