Skip to content

Commit df227a6

Browse files
committed
BrokerConnection.receive_bytes(data) -> response events
1 parent f13ce1d commit df227a6

File tree

3 files changed

+91
-93
lines changed

3 files changed

+91
-93
lines changed

kafka/client_async.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -605,25 +605,14 @@ def _poll(self, timeout, sleep=True):
605605
continue
606606

607607
self._idle_expiry_manager.update(conn.node_id)
608-
609-
# Accumulate as many responses as the connection has pending
610-
while conn.in_flight_requests:
611-
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
612-
613-
# Incomplete responses are buffered internally
614-
# while conn.in_flight_requests retains the request
615-
if not response:
616-
break
617-
responses.append(response)
608+
responses.extend(conn.recv()) # Note: conn.recv runs callbacks / errbacks
618609

619610
# Check for additional pending SSL bytes
620611
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
621612
# TODO: optimize
622613
for conn in self._conns.values():
623614
if conn not in processed and conn.connected() and conn._sock.pending():
624-
response = conn.recv()
625-
if response:
626-
responses.append(response)
615+
responses.extend(conn.recv())
627616

628617
for conn in six.itervalues(self._conns):
629618
if conn.requests_timed_out():
@@ -635,6 +624,7 @@ def _poll(self, timeout, sleep=True):
635624

636625
if self._sensors:
637626
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
627+
638628
self._maybe_close_oldest_connection()
639629
return responses
640630

kafka/conn.py

Lines changed: 84 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import copy
55
import errno
66
import logging
7-
import io
87
from random import shuffle, uniform
98
import socket
109
import time
@@ -18,6 +17,7 @@
1817
from kafka.protocol.api import RequestHeader
1918
from kafka.protocol.admin import SaslHandShakeRequest
2019
from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest
20+
from kafka.protocol.frame import KafkaBytes
2121
from kafka.protocol.metadata import MetadataRequest
2222
from kafka.protocol.fetch import FetchRequest
2323
from kafka.protocol.types import Int32
@@ -234,9 +234,9 @@ def __init__(self, host, port, afi, **configs):
234234
if self.config['ssl_context'] is not None:
235235
self._ssl_context = self.config['ssl_context']
236236
self._sasl_auth_future = None
237-
self._rbuffer = io.BytesIO()
237+
self._header = KafkaBytes(4)
238+
self._rbuffer = None
238239
self._receiving = False
239-
self._next_payload_bytes = 0
240240
self.last_attempt = 0
241241
self._processing = False
242242
self._correlation_id = 0
@@ -629,17 +629,19 @@ def close(self, error=None):
629629
self.state = ConnectionStates.DISCONNECTED
630630
self.last_attempt = time.time()
631631
self._sasl_auth_future = None
632-
self._receiving = False
633-
self._next_payload_bytes = 0
634-
self._rbuffer.seek(0)
635-
self._rbuffer.truncate()
632+
self._reset_buffer()
636633
if error is None:
637634
error = Errors.Cancelled(str(self))
638635
while self.in_flight_requests:
639636
ifr = self.in_flight_requests.popleft()
640637
ifr.future.failure(error)
641638
self.config['state_change_callback'](self)
642639

640+
def _reset_buffer(self):
641+
self._receiving = False
642+
self._header.seek(0)
643+
self._rbuffer = None
644+
643645
def send(self, request):
644646
"""send request, return Future()
645647
@@ -713,11 +715,11 @@ def recv(self):
713715
# fail all the pending request futures
714716
if self.in_flight_requests:
715717
self.close(Errors.ConnectionError('Socket not connected during recv with in-flight-requests'))
716-
return None
718+
return ()
717719

718720
elif not self.in_flight_requests:
719721
log.warning('%s: No in-flight-requests to recv', self)
720-
return None
722+
return ()
721723

722724
response = self._recv()
723725
if not response and self.requests_timed_out():
@@ -726,103 +728,108 @@ def recv(self):
726728
self.close(error=Errors.RequestTimedOutError(
727729
'Request timed out after %s ms' %
728730
self.config['request_timeout_ms']))
729-
return None
731+
return ()
730732
return response
731733

732734
def _recv(self):
733-
# Not receiving is the state of reading the payload header
734-
if not self._receiving:
735+
responses = []
736+
SOCK_CHUNK_BYTES = 4096
737+
while True:
735738
try:
736-
bytes_to_read = 4 - self._rbuffer.tell()
737-
data = self._sock.recv(bytes_to_read)
739+
data = self._sock.recv(SOCK_CHUNK_BYTES)
738740
# We expect socket.recv to raise an exception if there is not
739741
# enough data to read the full bytes_to_read
740742
# but if the socket is disconnected, we will get empty data
741743
# without an exception raised
742744
if not data:
743745
log.error('%s: socket disconnected', self)
744746
self.close(error=Errors.ConnectionError('socket disconnected'))
745-
return None
746-
self._rbuffer.write(data)
747+
break
748+
else:
749+
responses.extend(self.receive_bytes(data))
750+
if len(data) < SOCK_CHUNK_BYTES:
751+
break
747752
except SSLWantReadError:
748-
return None
753+
break
749754
except ConnectionError as e:
750755
if six.PY2 and e.errno == errno.EWOULDBLOCK:
751-
return None
752-
log.exception('%s: Error receiving 4-byte payload header -'
756+
break
757+
log.exception('%s: Error receiving network data'
753758
' closing socket', self)
754759
self.close(error=Errors.ConnectionError(e))
755-
return None
756-
except BlockingIOError:
757-
if six.PY3:
758-
return None
759-
raise
760-
761-
if self._rbuffer.tell() == 4:
762-
self._rbuffer.seek(0)
763-
self._next_payload_bytes = Int32.decode(self._rbuffer)
764-
# reset buffer and switch state to receiving payload bytes
765-
self._rbuffer.seek(0)
766-
self._rbuffer.truncate()
767-
self._receiving = True
768-
elif self._rbuffer.tell() > 4:
769-
raise Errors.KafkaError('this should not happen - are you threading?')
770-
771-
if self._receiving:
772-
staged_bytes = self._rbuffer.tell()
773-
try:
774-
bytes_to_read = self._next_payload_bytes - staged_bytes
775-
data = self._sock.recv(bytes_to_read)
776-
# We expect socket.recv to raise an exception if there is not
777-
# enough data to read the full bytes_to_read
778-
# but if the socket is disconnected, we will get empty data
779-
# without an exception raised
780-
if bytes_to_read and not data:
781-
log.error('%s: socket disconnected', self)
782-
self.close(error=Errors.ConnectionError('socket disconnected'))
783-
return None
784-
self._rbuffer.write(data)
785-
except SSLWantReadError:
786-
return None
787-
except ConnectionError as e:
788-
# Extremely small chance that we have exactly 4 bytes for a
789-
# header, but nothing to read in the body yet
790-
if six.PY2 and e.errno == errno.EWOULDBLOCK:
791-
return None
792-
log.exception('%s: Error in recv', self)
793-
self.close(error=Errors.ConnectionError(e))
794-
return None
760+
break
795761
except BlockingIOError:
796762
if six.PY3:
797-
return None
763+
break
798764
raise
765+
return responses
799766

800-
staged_bytes = self._rbuffer.tell()
801-
if staged_bytes > self._next_payload_bytes:
802-
self.close(error=Errors.KafkaError('Receive buffer has more bytes than expected?'))
803-
804-
if staged_bytes != self._next_payload_bytes:
805-
return None
767+
def receive_bytes(self, data):
768+
i = 0
769+
n = len(data)
770+
responses = []
771+
if self._sensors:
772+
self._sensors.bytes_received.record(n)
773+
while i < n:
774+
775+
# Not receiving is the state of reading the payload header
776+
if not self._receiving:
777+
bytes_to_read = min(4 - self._header.tell(), n - i)
778+
self._header.write(data[i:i+bytes_to_read])
779+
i += bytes_to_read
780+
781+
if self._header.tell() == 4:
782+
self._header.seek(0)
783+
nbytes = Int32.decode(self._header)
784+
# reset buffer and switch state to receiving payload bytes
785+
self._rbuffer = KafkaBytes(nbytes)
786+
self._receiving = True
787+
elif self._header.tell() > 4:
788+
raise Errors.KafkaError('this should not happen - are you threading?')
789+
790+
791+
if self._receiving:
792+
total_bytes = len(self._rbuffer)
793+
staged_bytes = self._rbuffer.tell()
794+
bytes_to_read = min(total_bytes - staged_bytes, n - i)
795+
self._rbuffer.write(data[i:i+bytes_to_read])
796+
i += bytes_to_read
797+
798+
staged_bytes = self._rbuffer.tell()
799+
if staged_bytes > total_bytes:
800+
self.close(error=Errors.KafkaError('Receive buffer has more bytes than expected?'))
801+
802+
if staged_bytes != total_bytes:
803+
break
806804

807-
self._receiving = False
808-
self._next_payload_bytes = 0
809-
if self._sensors:
810-
self._sensors.bytes_received.record(4 + self._rbuffer.tell())
811-
self._rbuffer.seek(0)
812-
response = self._process_response(self._rbuffer)
813-
self._rbuffer.seek(0)
814-
self._rbuffer.truncate()
815-
return response
805+
self._receiving = False
806+
self._rbuffer.seek(0)
807+
resp = self._process_response(self._rbuffer)
808+
if resp is not None:
809+
responses.append(resp)
810+
self._reset_buffer()
811+
return responses
816812

817813
def _process_response(self, read_buffer):
818814
assert not self._processing, 'Recursion not supported'
819815
self._processing = True
820-
ifr = self.in_flight_requests.popleft()
816+
recv_correlation_id = Int32.decode(read_buffer)
817+
818+
if not self.in_flight_requests:
819+
error = Errors.CorrelationIdError(
820+
'%s: No in-flight-request found for server response'
821+
' with correlation ID %d'
822+
% (self, recv_correlation_id))
823+
self.close(error)
824+
self._processing = False
825+
return None
826+
else:
827+
ifr = self.in_flight_requests.popleft()
828+
821829
if self._sensors:
822830
self._sensors.request_time.record((time.time() - ifr.timestamp) * 1000)
823831

824832
# verify send/recv correlation ids match
825-
recv_correlation_id = Int32.decode(read_buffer)
826833

827834
# 0.8.2 quirk
828835
if (self.config['api_version'] == (0, 8, 2) and

kafka/protocol/message.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from ..codec import (has_gzip, has_snappy, has_lz4,
77
gzip_decode, snappy_decode,
88
lz4_decode, lz4_decode_old_kafka)
9+
from .frame import KafkaBytes
910
from .struct import Struct
1011
from .types import (
1112
Int8, Int32, Int64, Bytes, Schema, AbstractType
@@ -155,10 +156,10 @@ class MessageSet(AbstractType):
155156
@classmethod
156157
def encode(cls, items):
157158
# RecordAccumulator encodes messagesets internally
158-
if isinstance(items, io.BytesIO):
159+
if isinstance(items, (io.BytesIO, KafkaBytes)):
159160
size = Int32.decode(items)
160161
# rewind and return all the bytes
161-
items.seek(-4, 1)
162+
items.seek(items.tell() - 4)
162163
return items.read(size + 4)
163164

164165
encoded_values = []
@@ -198,7 +199,7 @@ def decode(cls, data, bytes_to_read=None):
198199

199200
@classmethod
200201
def repr(cls, messages):
201-
if isinstance(messages, io.BytesIO):
202+
if isinstance(messages, (KafkaBytes, io.BytesIO)):
202203
offset = messages.tell()
203204
decoded = cls.decode(messages)
204205
messages.seek(offset)

0 commit comments

Comments
 (0)