Skip to content

Commit 618c505

Browse files
authored
KAFKA-3949: Avoid race condition when subscription changes during rebalance (#1364)
1 parent 08a7fb7 commit 618c505

File tree

7 files changed

+128
-109
lines changed

7 files changed

+128
-109
lines changed

kafka/cluster.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,13 @@ def update_metadata(self, metadata):
291291
for listener in self._listeners:
292292
listener(self)
293293

294+
if self.need_all_topic_metadata:
295+
# the listener may change the interested topics,
296+
# which could cause another metadata refresh.
297+
# If we have already fetched all topics, however,
298+
# another fetch should be unnecessary.
299+
self._need_update = False
300+
294301
def add_listener(self, listener):
295302
"""Add a callback function to be called on each metadata update"""
296303
self._listeners.add(listener)

kafka/consumer/fetcher.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -326,9 +326,6 @@ def fetched_records(self, max_records=None):
326326
max_records = self.config['max_poll_records']
327327
assert max_records > 0
328328

329-
if self._subscriptions.needs_partition_assignment:
330-
return {}, False
331-
332329
drained = collections.defaultdict(list)
333330
records_remaining = max_records
334331

@@ -397,9 +394,6 @@ def _append(self, drained, part, max_records):
397394

398395
def _message_generator(self):
399396
"""Iterate over fetched_records"""
400-
if self._subscriptions.needs_partition_assignment:
401-
raise StopIteration('Subscription needs partition assignment')
402-
403397
while self._next_partition_records or self._completed_fetches:
404398

405399
if not self._next_partition_records:

kafka/consumer/group.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,11 @@ def _poll_once(self, timeout_ms, max_records):
644644

645645
timeout_ms = min(timeout_ms, self._coordinator.time_to_next_poll())
646646
self._client.poll(timeout_ms=timeout_ms)
647+
# after the long poll, we should check whether the group needs to rebalance
648+
# prior to returning data so that the group can stabilize faster
649+
if self._coordinator.need_rejoin():
650+
return {}
651+
647652
records, _ = self._fetcher.fetched_records(max_records)
648653
return records
649654

@@ -1055,6 +1060,11 @@ def _message_generator(self):
10551060
poll_ms = 0
10561061
self._client.poll(timeout_ms=poll_ms)
10571062

1063+
# after the long poll, we should check whether the group needs to rebalance
1064+
# prior to returning data so that the group can stabilize faster
1065+
if self._coordinator.need_rejoin():
1066+
continue
1067+
10581068
# We need to make sure we at least keep up with scheduled tasks,
10591069
# like heartbeats, auto-commits, and metadata refreshes
10601070
timeout_at = self._next_timeout()

kafka/consumer/subscription_state.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ def __init__(self, offset_reset_strategy='earliest'):
6868
self._group_subscription = set()
6969
self._user_assignment = set()
7070
self.assignment = dict()
71-
self.needs_partition_assignment = False
7271
self.listener = None
7372

7473
# initialize to true for the consumers to fetch offset upon starting up
@@ -172,7 +171,6 @@ def change_subscription(self, topics):
172171
log.info('Updating subscribed topics to: %s', topics)
173172
self.subscription = set(topics)
174173
self._group_subscription.update(topics)
175-
self.needs_partition_assignment = True
176174

177175
# Remove any assigned partitions which are no longer subscribed to
178176
for tp in set(self.assignment.keys()):
@@ -192,12 +190,12 @@ def group_subscribe(self, topics):
192190
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
193191
self._group_subscription.update(topics)
194192

195-
def mark_for_reassignment(self):
193+
def reset_group_subscription(self):
194+
"""Reset the group's subscription to only contain topics subscribed by this consumer."""
196195
if self._user_assignment:
197196
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
198197
assert self.subscription is not None, 'Subscription required'
199198
self._group_subscription.intersection_update(self.subscription)
200-
self.needs_partition_assignment = True
201199

202200
def assign_from_user(self, partitions):
203201
"""Manually assign a list of TopicPartitions to this consumer.
@@ -220,18 +218,17 @@ def assign_from_user(self, partitions):
220218
if self.subscription is not None:
221219
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
222220

223-
self._user_assignment.clear()
224-
self._user_assignment.update(partitions)
221+
if self._user_assignment != set(partitions):
222+
self._user_assignment = set(partitions)
225223

226-
for partition in partitions:
227-
if partition not in self.assignment:
228-
self._add_assigned_partition(partition)
224+
for partition in partitions:
225+
if partition not in self.assignment:
226+
self._add_assigned_partition(partition)
229227

230-
for tp in set(self.assignment.keys()) - self._user_assignment:
231-
del self.assignment[tp]
228+
for tp in set(self.assignment.keys()) - self._user_assignment:
229+
del self.assignment[tp]
232230

233-
self.needs_partition_assignment = False
234-
self.needs_fetch_committed_offsets = True
231+
self.needs_fetch_committed_offsets = True
235232

236233
def assign_from_subscribed(self, assignments):
237234
"""Update the assignment to the specified partitions
@@ -245,24 +242,25 @@ def assign_from_subscribed(self, assignments):
245242
assignments (list of TopicPartition): partitions to assign to this
246243
consumer instance.
247244
"""
248-
if self.subscription is None:
245+
if not self.partitions_auto_assigned():
249246
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
250247

251248
for tp in assignments:
252249
if tp.topic not in self.subscription:
253250
raise ValueError("Assigned partition %s for non-subscribed topic." % str(tp))
251+
252+
# after rebalancing, we always reinitialize the assignment state
254253
self.assignment.clear()
255254
for tp in assignments:
256255
self._add_assigned_partition(tp)
257-
self.needs_partition_assignment = False
256+
self.needs_fetch_committed_offsets = True
258257
log.info("Updated partition assignment: %s", assignments)
259258

260259
def unsubscribe(self):
261260
"""Clear all topic subscriptions and partition assignments"""
262261
self.subscription = None
263262
self._user_assignment.clear()
264263
self.assignment.clear()
265-
self.needs_partition_assignment = True
266264
self.subscribed_pattern = None
267265

268266
def group_subscription(self):

kafka/coordinator/base.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -344,23 +344,25 @@ def _handle_join_failure(self, _):
344344
def ensure_active_group(self):
345345
"""Ensure that the group is active (i.e. joined and synced)"""
346346
with self._lock:
347-
if not self.need_rejoin():
348-
return
349-
350-
# call on_join_prepare if needed. We set a flag to make sure that
351-
# we do not call it a second time if the client is woken up before
352-
# a pending rebalance completes.
353-
if not self.rejoining:
354-
self._on_join_prepare(self._generation.generation_id,
355-
self._generation.member_id)
356-
self.rejoining = True
357-
358347
if self._heartbeat_thread is None:
359348
self._start_heartbeat_thread()
360349

361350
while self.need_rejoin():
362351
self.ensure_coordinator_ready()
363352

353+
# call on_join_prepare if needed. We set a flag
354+
# to make sure that we do not call it a second
355+
# time if the client is woken up before a pending
356+
# rebalance completes. This must be called on each
357+
# iteration of the loop because an event requiring
358+
# a rebalance (such as a metadata refresh which
359+
# changes the matched subscription set) can occur
360+
# while another rebalance is still in progress.
361+
if not self.rejoining:
362+
self._on_join_prepare(self._generation.generation_id,
363+
self._generation.member_id)
364+
self.rejoining = True
365+
364366
# ensure that there are no pending requests to the coordinator.
365367
# This is important in particular to avoid resending a pending
366368
# JoinGroup request.

kafka/coordinator/consumer.py

Lines changed: 63 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ def __init__(self, client, subscription, metrics, **configs):
8484
self.config[key] = configs[key]
8585

8686
self._subscription = subscription
87+
self._is_leader = False
88+
self._joined_subscription = set()
8789
self._metadata_snapshot = self._build_metadata_snapshot(subscription, client.cluster)
8890
self._assignment_snapshot = None
8991
self._cluster = client.cluster
@@ -132,11 +134,22 @@ def protocol_type(self):
132134

133135
def group_protocols(self):
134136
"""Returns list of preferred (protocols, metadata)"""
135-
topics = self._subscription.subscription
136-
assert topics is not None, 'Consumer has not subscribed to topics'
137+
if self._subscription.subscription is None:
138+
raise Errors.IllegalStateError('Consumer has not subscribed to topics')
139+
# dpkp note: I really dislike this.
140+
# why? because we are using this strange method group_protocols,
141+
# which is seemingly innocuous, to set internal state (_joined_subscription)
142+
# that is later used to check whether metadata has changed since we joined a group
143+
# but there is no guarantee that this method, group_protocols, will get called
144+
# in the correct sequence or that it will only be called when we want it to be.
145+
# So this really should be moved elsewhere, but I don't have the energy to
146+
# work that out right now. If you read this at some later date after the mutable
147+
# state has bitten you... I'm sorry! It mimics the java client, and that's the
148+
# best I've got for now.
149+
self._joined_subscription = set(self._subscription.subscription)
137150
metadata_list = []
138151
for assignor in self.config['assignors']:
139-
metadata = assignor.metadata(topics)
152+
metadata = assignor.metadata(self._joined_subscription)
140153
group_protocol = (assignor.name, metadata)
141154
metadata_list.append(group_protocol)
142155
return metadata_list
@@ -158,21 +171,29 @@ def _handle_metadata_update(self, cluster):
158171

159172
# check if there are any changes to the metadata which should trigger
160173
# a rebalance
161-
if self._subscription_metadata_changed(cluster):
162-
163-
if (self.config['api_version'] >= (0, 9)
164-
and self.config['group_id'] is not None):
165-
166-
self._subscription.mark_for_reassignment()
167-
168-
# If we haven't got group coordinator support,
169-
# just assign all partitions locally
170-
else:
171-
self._subscription.assign_from_subscribed([
172-
TopicPartition(topic, partition)
173-
for topic in self._subscription.subscription
174-
for partition in self._metadata_snapshot[topic]
175-
])
174+
if self._subscription.partitions_auto_assigned():
175+
metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster)
176+
if self._metadata_snapshot != metadata_snapshot:
177+
self._metadata_snapshot = metadata_snapshot
178+
179+
# If we haven't got group coordinator support,
180+
# just assign all partitions locally
181+
if self._auto_assign_all_partitions():
182+
self._subscription.assign_from_subscribed([
183+
TopicPartition(topic, partition)
184+
for topic in self._subscription.subscription
185+
for partition in self._metadata_snapshot[topic]
186+
])
187+
188+
def _auto_assign_all_partitions(self):
189+
# For users that use "subscribe" without group support,
190+
# we will simply assign all partitions to this consumer
191+
if self.config['api_version'] < (0, 9):
192+
return True
193+
elif self.config['group_id'] is None:
194+
return True
195+
else:
196+
return False
176197

177198
def _build_metadata_snapshot(self, subscription, cluster):
178199
metadata_snapshot = {}
@@ -181,16 +202,6 @@ def _build_metadata_snapshot(self, subscription, cluster):
181202
metadata_snapshot[topic] = set(partitions)
182203
return metadata_snapshot
183204

184-
def _subscription_metadata_changed(self, cluster):
185-
if not self._subscription.partitions_auto_assigned():
186-
return False
187-
188-
metadata_snapshot = self._build_metadata_snapshot(self._subscription, cluster)
189-
if self._metadata_snapshot != metadata_snapshot:
190-
self._metadata_snapshot = metadata_snapshot
191-
return True
192-
return False
193-
194205
def _lookup_assignor(self, name):
195206
for assignor in self.config['assignors']:
196207
if assignor.name == name:
@@ -199,12 +210,10 @@ def _lookup_assignor(self, name):
199210

200211
def _on_join_complete(self, generation, member_id, protocol,
201212
member_assignment_bytes):
202-
# if we were the assignor, then we need to make sure that there have
203-
# been no metadata updates since the rebalance begin. Otherwise, we
204-
# won't rebalance again until the next metadata change
205-
if self._assignment_snapshot is not None and self._assignment_snapshot != self._metadata_snapshot:
206-
self._subscription.mark_for_reassignment()
207-
return
213+
# only the leader is responsible for monitoring for metadata changes
214+
# (i.e. partition changes)
215+
if not self._is_leader:
216+
self._assignment_snapshot = None
208217

209218
assignor = self._lookup_assignor(protocol)
210219
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol
@@ -307,6 +316,7 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
307316
# keep track of the metadata used for assignment so that we can check
308317
# after rebalance completion whether anything has changed
309318
self._cluster.request_update()
319+
self._is_leader = True
310320
self._assignment_snapshot = self._metadata_snapshot
311321

312322
log.debug("Performing assignment for group %s using strategy %s"
@@ -338,18 +348,32 @@ def _on_join_prepare(self, generation, member_id):
338348
" for group %s failed on_partitions_revoked",
339349
self._subscription.listener, self.group_id)
340350

341-
self._assignment_snapshot = None
342-
self._subscription.mark_for_reassignment()
351+
self._is_leader = False
352+
self._subscription.reset_group_subscription()
343353

344354
def need_rejoin(self):
345355
"""Check whether the group should be rejoined
346356
347357
Returns:
348358
bool: True if consumer should rejoin group, False otherwise
349359
"""
350-
return (self._subscription.partitions_auto_assigned() and
351-
(super(ConsumerCoordinator, self).need_rejoin() or
352-
self._subscription.needs_partition_assignment))
360+
if not self._subscription.partitions_auto_assigned():
361+
return False
362+
363+
if self._auto_assign_all_partitions():
364+
return False
365+
366+
# we need to rejoin if we performed the assignment and metadata has changed
367+
if (self._assignment_snapshot is not None
368+
and self._assignment_snapshot != self._metadata_snapshot):
369+
return True
370+
371+
# we need to join if our subscription has changed since the last join
372+
if (self._joined_subscription is not None
373+
and self._joined_subscription != self._subscription.subscription):
374+
return True
375+
376+
return super(ConsumerCoordinator, self).need_rejoin()
353377

354378
def refresh_committed_offsets_if_needed(self):
355379
"""Fetch committed offsets for assigned partitions."""

0 commit comments

Comments
 (0)