Skip to content

Commit 7a99013

Browse files
authored
Do not require client lock for read-only operations (#1730)
In an effort to reduce the surface area of lock coordination, and thereby hopefully reduce lock contention, I think we can remove locking from the read-only KafkaClient methods: connected, is_disconnected, in_flight_request_count, and least_loaded_node . Given that the read data could change after the lock is released but before the caller uses it, the value of acquiring a lock here does not seem high to me.
1 parent 37699be commit 7a99013

File tree

1 file changed

+50
-50
lines changed

1 file changed

+50
-50
lines changed

kafka/client_async.py

+50-50
Original file line numberDiff line numberDiff line change
@@ -402,10 +402,10 @@ def ready(self, node_id, metadata_priority=True):
402402

403403
def connected(self, node_id):
404404
"""Return True iff the node_id is connected."""
405-
with self._lock:
406-
if node_id not in self._conns:
407-
return False
408-
return self._conns[node_id].connected()
405+
conn = self._conns.get(node_id)
406+
if conn is None:
407+
return False
408+
return conn.connected()
409409

410410
def _close(self):
411411
if not self._closed:
@@ -448,10 +448,10 @@ def is_disconnected(self, node_id):
448448
Returns:
449449
bool: True iff the node exists and is disconnected
450450
"""
451-
with self._lock:
452-
if node_id not in self._conns:
453-
return False
454-
return self._conns[node_id].disconnected()
451+
conn = self._conns.get(node_id)
452+
if conn is None:
453+
return False
454+
return conn.disconnected()
455455

456456
def connection_delay(self, node_id):
457457
"""
@@ -467,10 +467,10 @@ def connection_delay(self, node_id):
467467
Returns:
468468
int: The number of milliseconds to wait.
469469
"""
470-
with self._lock:
471-
if node_id not in self._conns:
472-
return 0
473-
return self._conns[node_id].connection_delay()
470+
conn = self._conns.get(node_id)
471+
if conn is None:
472+
return 0
473+
return conn.connection_delay()
474474

475475
def is_ready(self, node_id, metadata_priority=True):
476476
"""Check whether a node is ready to send more requests.
@@ -656,13 +656,14 @@ def in_flight_request_count(self, node_id=None):
656656
Returns:
657657
int: pending in-flight requests for the node, or all nodes if None
658658
"""
659-
with self._lock:
660-
if node_id is not None:
661-
if node_id not in self._conns:
662-
return 0
663-
return len(self._conns[node_id].in_flight_requests)
664-
else:
665-
return sum([len(conn.in_flight_requests) for conn in self._conns.values()])
659+
if node_id is not None:
660+
conn = self._conns.get(node_id)
661+
if conn is None:
662+
return 0
663+
return len(conn.in_flight_requests)
664+
else:
665+
return sum([len(conn.in_flight_requests)
666+
for conn in list(self._conns.values())])
666667

667668
def _fire_pending_completed_requests(self):
668669
responses = []
@@ -689,38 +690,37 @@ def least_loaded_node(self):
689690
Returns:
690691
node_id or None if no suitable node was found
691692
"""
692-
with self._lock:
693-
nodes = [broker.nodeId for broker in self.cluster.brokers()]
694-
random.shuffle(nodes)
695-
696-
inflight = float('inf')
697-
found = None
698-
for node_id in nodes:
699-
conn = self._conns.get(node_id)
700-
connected = conn is not None and conn.connected()
701-
blacked_out = conn is not None and conn.blacked_out()
702-
curr_inflight = len(conn.in_flight_requests) if conn is not None else 0
703-
if connected and curr_inflight == 0:
704-
# if we find an established connection
705-
# with no in-flight requests, we can stop right away
706-
return node_id
707-
elif not blacked_out and curr_inflight < inflight:
708-
# otherwise if this is the best we have found so far, record that
709-
inflight = curr_inflight
710-
found = node_id
711-
712-
if found is not None:
713-
return found
714-
715-
# some broker versions return an empty list of broker metadata
716-
# if there are no topics created yet. the bootstrap process
717-
# should detect this and keep a 'bootstrap' node alive until
718-
# a non-bootstrap node is connected and non-empty broker
719-
# metadata is available
720-
elif 'bootstrap' in self._conns:
721-
return 'bootstrap'
693+
nodes = [broker.nodeId for broker in self.cluster.brokers()]
694+
random.shuffle(nodes)
722695

723-
return None
696+
inflight = float('inf')
697+
found = None
698+
for node_id in nodes:
699+
conn = self._conns.get(node_id)
700+
connected = conn is not None and conn.connected()
701+
blacked_out = conn is not None and conn.blacked_out()
702+
curr_inflight = len(conn.in_flight_requests) if conn is not None else 0
703+
if connected and curr_inflight == 0:
704+
# if we find an established connection
705+
# with no in-flight requests, we can stop right away
706+
return node_id
707+
elif not blacked_out and curr_inflight < inflight:
708+
# otherwise if this is the best we have found so far, record that
709+
inflight = curr_inflight
710+
found = node_id
711+
712+
if found is not None:
713+
return found
714+
715+
# some broker versions return an empty list of broker metadata
716+
# if there are no topics created yet. the bootstrap process
717+
# should detect this and keep a 'bootstrap' node alive until
718+
# a non-bootstrap node is connected and non-empty broker
719+
# metadata is available
720+
elif 'bootstrap' in self._conns:
721+
return 'bootstrap'
722+
723+
return None
724724

725725
def set_topics(self, topics):
726726
"""Set specific topics to track for metadata.

0 commit comments

Comments
 (0)