Skip to content

Do not acquire KafkaClient lock for read-only operations #1730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 7, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 50 additions & 50 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,10 @@ def ready(self, node_id, metadata_priority=True):

def connected(self, node_id):
"""Return True iff the node_id is connected."""
with self._lock:
if node_id not in self._conns:
return False
return self._conns[node_id].connected()
conn = self._conns.get(node_id)
if conn is None:
return False
return conn.connected()

def _close(self):
if not self._closed:
Expand Down Expand Up @@ -448,10 +448,10 @@ def is_disconnected(self, node_id):
Returns:
bool: True iff the node exists and is disconnected
"""
with self._lock:
if node_id not in self._conns:
return False
return self._conns[node_id].disconnected()
conn = self._conns.get(node_id)
if conn is None:
return False
return conn.disconnected()

def connection_delay(self, node_id):
"""
Expand All @@ -467,10 +467,10 @@ def connection_delay(self, node_id):
Returns:
int: The number of milliseconds to wait.
"""
with self._lock:
if node_id not in self._conns:
return 0
return self._conns[node_id].connection_delay()
conn = self._conns.get(node_id)
if conn is None:
return 0
return conn.connection_delay()

def is_ready(self, node_id, metadata_priority=True):
"""Check whether a node is ready to send more requests.
Expand Down Expand Up @@ -656,13 +656,14 @@ def in_flight_request_count(self, node_id=None):
Returns:
int: pending in-flight requests for the node, or all nodes if None
"""
with self._lock:
if node_id is not None:
if node_id not in self._conns:
return 0
return len(self._conns[node_id].in_flight_requests)
else:
return sum([len(conn.in_flight_requests) for conn in self._conns.values()])
if node_id is not None:
conn = self._conns.get(node_id)
if conn is None:
return 0
return len(conn.in_flight_requests)
else:
return sum([len(conn.in_flight_requests)
for conn in list(self._conns.values())])

def _fire_pending_completed_requests(self):
responses = []
Expand All @@ -689,38 +690,37 @@ def least_loaded_node(self):
Returns:
node_id or None if no suitable node was found
"""
with self._lock:
nodes = [broker.nodeId for broker in self.cluster.brokers()]
random.shuffle(nodes)

inflight = float('inf')
found = None
for node_id in nodes:
conn = self._conns.get(node_id)
connected = conn is not None and conn.connected()
blacked_out = conn is not None and conn.blacked_out()
curr_inflight = len(conn.in_flight_requests) if conn is not None else 0
if connected and curr_inflight == 0:
# if we find an established connection
# with no in-flight requests, we can stop right away
return node_id
elif not blacked_out and curr_inflight < inflight:
# otherwise if this is the best we have found so far, record that
inflight = curr_inflight
found = node_id

if found is not None:
return found

# some broker versions return an empty list of broker metadata
# if there are no topics created yet. the bootstrap process
# should detect this and keep a 'bootstrap' node alive until
# a non-bootstrap node is connected and non-empty broker
# metadata is available
elif 'bootstrap' in self._conns:
return 'bootstrap'
nodes = [broker.nodeId for broker in self.cluster.brokers()]
random.shuffle(nodes)

return None
inflight = float('inf')
found = None
for node_id in nodes:
conn = self._conns.get(node_id)
connected = conn is not None and conn.connected()
blacked_out = conn is not None and conn.blacked_out()
curr_inflight = len(conn.in_flight_requests) if conn is not None else 0
if connected and curr_inflight == 0:
# if we find an established connection
# with no in-flight requests, we can stop right away
return node_id
elif not blacked_out and curr_inflight < inflight:
# otherwise if this is the best we have found so far, record that
inflight = curr_inflight
found = node_id

if found is not None:
return found

# some broker versions return an empty list of broker metadata
# if there are no topics created yet. the bootstrap process
# should detect this and keep a 'bootstrap' node alive until
# a non-bootstrap node is connected and non-empty broker
# metadata is available
elif 'bootstrap' in self._conns:
return 'bootstrap'

return None

def set_topics(self, topics):
"""Set specific topics to track for metadata.
Expand Down