@@ -87,7 +87,7 @@ def __init__(self, client, subscription, metrics, **configs):
87
87
assert self .config ['assignors' ], 'Coordinator requires assignors'
88
88
89
89
self ._subscription = subscription
90
- self ._metadata_snapshot = {}
90
+ self ._metadata_snapshot = self . _build_metadata_snapshot ( subscription , client . cluster )
91
91
self ._assignment_snapshot = None
92
92
self ._cluster = client .cluster
93
93
self ._cluster .request_update ()
@@ -162,15 +162,18 @@ def _handle_metadata_update(self, cluster):
162
162
for partition in self ._metadata_snapshot [topic ]
163
163
])
164
164
165
- def _subscription_metadata_changed (self , cluster ):
166
- if not self ._subscription .partitions_auto_assigned ():
167
- return False
168
-
165
+ def _build_metadata_snapshot (self , subscription , cluster ):
169
166
metadata_snapshot = {}
170
- for topic in self . _subscription .group_subscription ():
167
+ for topic in subscription .group_subscription ():
171
168
partitions = cluster .partitions_for_topic (topic ) or []
172
169
metadata_snapshot [topic ] = set (partitions )
170
+ return metadata_snapshot
171
+
172
+ def _subscription_metadata_changed (self , cluster ):
173
+ if not self ._subscription .partitions_auto_assigned ():
174
+ return False
173
175
176
+ metadata_snapshot = self ._build_metadata_snapshot (self ._subscription , cluster )
174
177
if self ._metadata_snapshot != metadata_snapshot :
175
178
self ._metadata_snapshot = metadata_snapshot
176
179
return True
0 commit comments