Skip to content

Commit dd50847

Browse files
authored
KAFKA-3117: handle metadata updates during consumer rebalance (#766 / #701)
1 parent 31a29ec commit dd50847

File tree

2 files changed

+25
-11
lines changed

2 files changed

+25
-11
lines changed

kafka/coordinator/consumer.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ def __init__(self, client, subscription, metrics, metric_group_prefix,
8686
assert self.config['assignors'], 'Coordinator requires assignors'
8787

8888
self._subscription = subscription
89-
self._partitions_per_topic = {}
89+
self._metadata_snapshot = {}
90+
self._assignment_snapshot = None
9091
self._cluster = client.cluster
9192
self._cluster.request_update()
9293
self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
@@ -150,7 +151,7 @@ def _handle_metadata_update(self, cluster):
150151

151152
# check if there are any changes to the metadata which should trigger
152153
# a rebalance
153-
if self._subscription_metadata_changed():
154+
if self._subscription_metadata_changed(cluster):
154155

155156
if (self.config['api_version'] >= (0, 9)
156157
and self.config['group_id'] is not None):
@@ -163,20 +164,20 @@ def _handle_metadata_update(self, cluster):
163164
self._subscription.assign_from_subscribed([
164165
TopicPartition(topic, partition)
165166
for topic in self._subscription.subscription
166-
for partition in self._partitions_per_topic[topic]
167+
for partition in self._metadata_snapshot[topic]
167168
])
168169

169-
def _subscription_metadata_changed(self):
170+
def _subscription_metadata_changed(self, cluster):
170171
if not self._subscription.partitions_auto_assigned():
171172
return False
172173

173-
old_partitions_per_topic = self._partitions_per_topic
174-
self._partitions_per_topic = {}
174+
metadata_snapshot = {}
175175
for topic in self._subscription.group_subscription():
176-
partitions = self._cluster.partitions_for_topic(topic) or []
177-
self._partitions_per_topic[topic] = set(partitions)
176+
partitions = cluster.partitions_for_topic(topic) or []
177+
metadata_snapshot[topic] = set(partitions)
178178

179-
if self._partitions_per_topic != old_partitions_per_topic:
179+
if self._metadata_snapshot != metadata_snapshot:
180+
self._metadata_snapshot = metadata_snapshot
180181
return True
181182
return False
182183

@@ -188,8 +189,15 @@ def _lookup_assignor(self, name):
188189

189190
def _on_join_complete(self, generation, member_id, protocol,
190191
member_assignment_bytes):
192+
# if we were the assignor, then we need to make sure that there have
193+
# been no metadata updates since the rebalance begin. Otherwise, we
194+
# won't rebalance again until the next metadata change
195+
if self._assignment_snapshot and self._assignment_snapshot != self._metadata_snapshot:
196+
self._subscription.mark_for_reassignment()
197+
return
198+
191199
assignor = self._lookup_assignor(protocol)
192-
assert assignor, 'invalid assignment protocol: %s' % protocol
200+
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol
193201

194202
assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes)
195203

@@ -239,6 +247,11 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
239247
self._subscription.group_subscribe(all_subscribed_topics)
240248
self._client.set_topics(self._subscription.group_subscription())
241249

250+
# keep track of the metadata used for assignment so that we can check
251+
# after rebalance completion whether anything has changed
252+
self._cluster.request_update()
253+
self._assignment_snapshot = self._metadata_snapshot
254+
242255
log.debug("Performing assignment for group %s using strategy %s"
243256
" with subscriptions %s", self.group_id, assignor.name,
244257
member_metadata)
@@ -268,6 +281,7 @@ def _on_join_prepare(self, generation, member_id):
268281
" for group %s failed on_partitions_revoked",
269282
self._subscription.listener, self.group_id)
270283

284+
self._assignment_snapshot = None
271285
self._subscription.mark_for_reassignment()
272286

273287
def need_rejoin(self):

test/test_coordinator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def test_pattern_subscription(coordinator, api_version):
8585
coordinator.config['api_version'] = api_version
8686
coordinator._subscription.subscribe(pattern='foo')
8787
assert coordinator._subscription.subscription == set([])
88-
assert coordinator._subscription_metadata_changed() is False
88+
assert coordinator._subscription_metadata_changed({}) is False
8989
assert coordinator._subscription.needs_partition_assignment is False
9090

9191
cluster = coordinator._client.cluster

0 commit comments

Comments
 (0)