Skip to content

Commit cfddc6b

Browse files
authored
KAFKA-4034: Avoid unnecessary consumer coordinator lookup (#1254)
1 parent f04435c commit cfddc6b

File tree

5 files changed

+78
-26
lines changed

5 files changed

+78
-26
lines changed

kafka/consumer/fetcher.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,18 @@ def send_fetches(self):
134134
self._clean_done_fetch_futures()
135135
return futures
136136

137+
def reset_offsets_if_needed(self, partitions):
138+
"""Lookup and set offsets for any partitions which are awaiting an
139+
explicit reset.
140+
141+
Arguments:
142+
partitions (set of TopicPartitions): the partitions to reset
143+
"""
144+
for tp in partitions:
145+
# TODO: If there are several offsets to reset, we could submit offset requests in parallel
146+
if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp):
147+
self._reset_offset(tp)
148+
137149
def _clean_done_fetch_futures(self):
138150
while True:
139151
if not self._fetch_futures:
@@ -168,9 +180,6 @@ def update_fetch_positions(self, partitions):
168180
" update", tp)
169181
continue
170182

171-
# TODO: If there are several offsets to reset,
172-
# we could submit offset requests in parallel
173-
# for now, each call to _reset_offset will block
174183
if self._subscriptions.is_offset_reset_needed(tp):
175184
self._reset_offset(tp)
176185
elif self._subscriptions.assignment[tp].committed is None:

kafka/consumer/group.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -585,12 +585,11 @@ def _poll_once(self, timeout_ms, max_records):
585585
dict: Map of topic to list of records (may be empty).
586586
"""
587587
if self._use_consumer_group():
588-
self._coordinator.ensure_coordinator_known()
589588
self._coordinator.ensure_active_group()
590589

591590
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
592591
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
593-
self._coordinator.ensure_coordinator_known()
592+
self._coordinator.ensure_coordinator_ready()
594593

595594
# Fetch positions if we have partitions we're subscribed to that we
596595
# don't know the offset for
@@ -835,6 +834,8 @@ def subscription(self):
835834
Returns:
836835
set: {topic, ...}
837836
"""
837+
if self._subscription.subscription is None:
838+
return None
838839
return self._subscription.subscription.copy()
839840

840841
def unsubscribe(self):
@@ -988,26 +989,34 @@ def _update_fetch_positions(self, partitions):
988989
NoOffsetForPartitionError: If no offset is stored for a given
989990
partition and no offset reset policy is defined.
990991
"""
991-
if (self.config['api_version'] >= (0, 8, 1) and
992-
self.config['group_id'] is not None):
992+
# Lookup any positions for partitions which are awaiting reset (which may be the
993+
# case if the user called seekToBeginning or seekToEnd. We do this check first to
994+
# avoid an unnecessary lookup of committed offsets (which typically occurs when
995+
# the user is manually assigning partitions and managing their own offsets).
996+
self._fetcher.reset_offsets_if_needed(partitions)
993997

994-
# Refresh commits for all assigned partitions
995-
self._coordinator.refresh_committed_offsets_if_needed()
998+
if not self._subscription.has_all_fetch_positions():
999+
# if we still don't have offsets for all partitions, then we should either seek
1000+
# to the last committed position or reset using the auto reset policy
1001+
if (self.config['api_version'] >= (0, 8, 1) and
1002+
self.config['group_id'] is not None):
1003+
# first refresh commits for all assigned partitions
1004+
self._coordinator.refresh_committed_offsets_if_needed()
9961005

997-
# Then, do any offset lookups in case some positions are not known
998-
self._fetcher.update_fetch_positions(partitions)
1006+
# Then, do any offset lookups in case some positions are not known
1007+
self._fetcher.update_fetch_positions(partitions)
9991008

10001009
def _message_generator(self):
10011010
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
10021011
while time.time() < self._consumer_timeout:
10031012

10041013
if self._use_consumer_group():
1005-
self._coordinator.ensure_coordinator_known()
1014+
self._coordinator.ensure_coordinator_ready()
10061015
self._coordinator.ensure_active_group()
10071016

10081017
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
10091018
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
1010-
self._coordinator.ensure_coordinator_known()
1019+
self._coordinator.ensure_coordinator_ready()
10111020

10121021
# Fetch offsets for any subscribed partitions that we arent tracking yet
10131022
if not self._subscription.has_all_fetch_positions():

kafka/coordinator/base.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ def __init__(self, client, metrics, **configs):
8888
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
8989
self.group_id = self.config['group_id']
9090
self.coordinator_id = None
91+
self._find_coordinator_future = None
9192
self.rejoin_needed = True
9293
self.rejoining = False
9394
self.heartbeat = Heartbeat(**self.config)
@@ -195,12 +196,11 @@ def coordinator_unknown(self):
195196

196197
return False
197198

198-
def ensure_coordinator_known(self):
199+
def ensure_coordinator_ready(self):
199200
"""Block until the coordinator for this group is known
200201
(and we have an active connection -- java client uses unsent queue).
201202
"""
202203
while self.coordinator_unknown():
203-
204204
# Prior to 0.8.2 there was no group coordinator
205205
# so we will just pick a node at random and treat
206206
# it as the "coordinator"
@@ -210,7 +210,7 @@ def ensure_coordinator_known(self):
210210
self._client.ready(self.coordinator_id)
211211
continue
212212

213-
future = self._send_group_coordinator_request()
213+
future = self.lookup_coordinator()
214214
self._client.poll(future=future)
215215

216216
if future.failed():
@@ -224,6 +224,16 @@ def ensure_coordinator_known(self):
224224
else:
225225
raise future.exception # pylint: disable-msg=raising-bad-type
226226

227+
def _reset_find_coordinator_future(self, result):
228+
self._find_coordinator_future = None
229+
230+
def lookup_coordinator(self):
231+
if self._find_coordinator_future is None:
232+
self._find_coordinator_future = self._send_group_coordinator_request()
233+
234+
self._find_coordinator_future.add_both(self._reset_find_coordinator_future)
235+
return self._find_coordinator_future
236+
227237
def need_rejoin(self):
228238
"""Check whether the group should be rejoined (e.g. if metadata changes)
229239
@@ -234,6 +244,11 @@ def need_rejoin(self):
234244

235245
def ensure_active_group(self):
236246
"""Ensure that the group is active (i.e. joined and synced)"""
247+
# always ensure that the coordinator is ready because we may have been
248+
# disconnected when sending heartbeats and does not necessarily require
249+
# us to rejoin the group.
250+
self.ensure_coordinator_ready()
251+
237252
if not self.need_rejoin():
238253
return
239254

@@ -242,7 +257,7 @@ def ensure_active_group(self):
242257
self.rejoining = True
243258

244259
while self.need_rejoin():
245-
self.ensure_coordinator_known()
260+
self.ensure_coordinator_ready()
246261

247262
# ensure that there are no pending requests to the coordinator.
248263
# This is important in particular to avoid resending a pending

kafka/coordinator/consumer.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ def fetch_committed_offsets(self, partitions):
315315
return {}
316316

317317
while True:
318-
self.ensure_coordinator_known()
318+
self.ensure_coordinator_ready()
319319

320320
# contact coordinator to fetch committed offsets
321321
future = self._send_offset_fetch_request(partitions)
@@ -353,9 +353,29 @@ def commit_offsets_async(self, offsets, callback=None):
353353
response will be either an Exception or a OffsetCommitResponse
354354
struct. This callback can be used to trigger custom actions when
355355
a commit request completes.
356-
Returns:
357-
Future: indicating whether the commit was successful or not
358356
"""
357+
if not self.coordinator_unknown():
358+
self._do_commit_offsets_async(offsets, callback)
359+
else:
360+
# we don't know the current coordinator, so try to find it and then
361+
# send the commit or fail (we don't want recursive retries which can
362+
# cause offset commits to arrive out of order). Note that there may
363+
# be multiple offset commits chained to the same coordinator lookup
364+
# request. This is fine because the listeners will be invoked in the
365+
# same order that they were added. Note also that BaseCoordinator
366+
# prevents multiple concurrent coordinator lookup requests.
367+
future = self.lookup_coordinator()
368+
future.add_callback(self._do_commit_offsets_async, offsets, callback)
369+
if callback:
370+
future.add_errback(callback)
371+
372+
# ensure the commit has a chance to be transmitted (without blocking on
373+
# its completion). Note that commits are treated as heartbeats by the
374+
# coordinator, so there is no need to explicitly allow heartbeats
375+
# through delayed task execution.
376+
self._client.poll() # no wakeup if we add that feature
377+
378+
def _do_commit_offsets_async(self, offsets, callback=None):
359379
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
360380
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
361381
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
@@ -386,7 +406,7 @@ def commit_offsets_sync(self, offsets):
386406
return
387407

388408
while True:
389-
self.ensure_coordinator_known()
409+
self.ensure_coordinator_ready()
390410

391411
future = self._send_offset_commit_request(offsets)
392412
self._client.poll(future=future)

test/test_coordinator.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ def test_fetch_committed_offsets(mocker, coordinator):
234234
assert coordinator._client.poll.call_count == 0
235235

236236
# general case -- send offset fetch request, get successful future
237-
mocker.patch.object(coordinator, 'ensure_coordinator_known')
237+
mocker.patch.object(coordinator, 'ensure_coordinator_ready')
238238
mocker.patch.object(coordinator, '_send_offset_fetch_request',
239239
return_value=Future().success('foobar'))
240240
partitions = [TopicPartition('foobar', 0)]
@@ -295,16 +295,15 @@ def offsets():
295295

296296
def test_commit_offsets_async(mocker, coordinator, offsets):
297297
mocker.patch.object(coordinator._client, 'poll')
298-
mocker.patch.object(coordinator, 'ensure_coordinator_known')
298+
mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
299299
mocker.patch.object(coordinator, '_send_offset_commit_request',
300300
return_value=Future().success('fizzbuzz'))
301-
ret = coordinator.commit_offsets_async(offsets)
302-
assert isinstance(ret, Future)
301+
coordinator.commit_offsets_async(offsets)
303302
assert coordinator._send_offset_commit_request.call_count == 1
304303

305304

306305
def test_commit_offsets_sync(mocker, coordinator, offsets):
307-
mocker.patch.object(coordinator, 'ensure_coordinator_known')
306+
mocker.patch.object(coordinator, 'ensure_coordinator_ready')
308307
mocker.patch.object(coordinator, '_send_offset_commit_request',
309308
return_value=Future().success('fizzbuzz'))
310309
cli = coordinator._client

0 commit comments

Comments
 (0)