@@ -733,47 +733,58 @@ def close(self, error=None):
733
733
future .failure (error )
734
734
self .config ['state_change_callback' ](self )
735
735
736
- def send (self , request ):
737
- """send request, return Future()
738
-
739
- Can block on network if request is larger than send_buffer_bytes
740
- """
736
+ def send (self , request , blocking = True ):
737
+ """Queue request for async network send, return Future()"""
741
738
future = Future ()
742
739
if self .connecting ():
743
740
return future .failure (Errors .NodeNotReadyError (str (self )))
744
741
elif not self .connected ():
745
742
return future .failure (Errors .KafkaConnectionError (str (self )))
746
743
elif not self .can_send_more ():
747
744
return future .failure (Errors .TooManyInFlightRequests (str (self )))
748
- return self ._send (request )
745
+ return self ._send (request , blocking = blocking )
749
746
750
- def _send (self , request ):
747
+ def _send (self , request , blocking = True ):
751
748
assert self .state in (ConnectionStates .AUTHENTICATING , ConnectionStates .CONNECTED )
752
749
future = Future ()
753
750
correlation_id = self ._protocol .send_request (request )
751
+
752
+ # Attempt to replicate behavior from prior to introduction of
753
+ # send_pending_requests() / async sends
754
+ if blocking :
755
+ error = self .send_pending_requests ()
756
+ if isinstance (error , Exception ):
757
+ future .failure (error )
758
+ return future
759
+
760
+ log .debug ('%s Request %d: %s' , self , correlation_id , request )
761
+ if request .expect_response ():
762
+ sent_time = time .time ()
763
+ ifr = (correlation_id , future , sent_time )
764
+ self .in_flight_requests .append (ifr )
765
+ else :
766
+ future .success (None )
767
+ return future
768
+
769
+ def send_pending_requests (self ):
770
+ """Can block on network if request is larger than send_buffer_bytes"""
771
+ if self .state not in (ConnectionStates .AUTHENTICATING ,
772
+ ConnectionStates .CONNECTED ):
773
+ return Errors .NodeNotReadyError (str (self ))
754
774
data = self ._protocol .send_bytes ()
755
775
try :
756
776
# In the future we might manage an internal write buffer
757
777
# and send bytes asynchronously. For now, just block
758
778
# sending each request payload
759
- sent_time = time .time ()
760
779
total_bytes = self ._send_bytes_blocking (data )
761
780
if self ._sensors :
762
781
self ._sensors .bytes_sent .record (total_bytes )
782
+ return total_bytes
763
783
except ConnectionError as e :
764
- log .exception ("Error sending %s to %s" , request , self )
784
+ log .exception ("Error sending request data to %s" , self )
765
785
error = Errors .KafkaConnectionError ("%s: %s" % (self , e ))
766
786
self .close (error = error )
767
- return future .failure (error )
768
- log .debug ('%s Request %d: %s' , self , correlation_id , request )
769
-
770
- if request .expect_response ():
771
- ifr = (correlation_id , future , sent_time )
772
- self .in_flight_requests .append (ifr )
773
- else :
774
- future .success (None )
775
-
776
- return future
787
+ return error
777
788
778
789
def can_send_more (self ):
779
790
"""Return True unless there are max_in_flight_requests_per_connection."""
0 commit comments