Skip to content

Commit c03dd33

Browse files
authored
Improve test_consumer_group::test_group logging before group stabilized (#2534)
1 parent 17abc60 commit c03dd33

File tree

1 file changed

+33
-34
lines changed

1 file changed

+33
-34
lines changed

test/test_consumer_group.py

+33-34
Original file line numberDiff line numberDiff line change
@@ -75,43 +75,42 @@ def consumer_thread(i):
7575
try:
7676
timeout = time.time() + 15
7777
while True:
78-
for c in range(num_consumers):
79-
80-
# Verify all consumers have been created
81-
if c not in consumers:
82-
logging.info('%s not in consumers list yet...', c)
83-
break
84-
85-
# Verify all consumers have an assignment
86-
elif not consumers[c].assignment():
87-
logging.info('Consumer %s does not have assignment yet...', c)
88-
break
78+
assert time.time() < timeout, "timeout waiting for assignments"
79+
# Verify all consumers have been created
80+
missing_consumers = set(consumers.keys()) - set(range(num_consumers))
81+
if missing_consumers:
82+
logging.info('Waiting on consumer threads: %s', missing_consumers)
83+
time.sleep(1)
84+
continue
85+
86+
unassigned_consumers = {c for c, consumer in six.iteritems(consumers) if not consumer.assignment()}
87+
if unassigned_consumers:
88+
logging.info('Waiting for consumer assignments: %s', unassigned_consumers)
89+
time.sleep(1)
90+
continue
8991

9092
# If all consumers exist and have an assignment
93+
logging.info('All consumers have assignment... checking for stable group')
94+
# Verify all consumers are in the same generation
95+
# then log state and break while loop
96+
generations = set([consumer._coordinator._generation.generation_id
97+
for consumer in six.itervalues(consumers)])
98+
99+
# New generation assignment is not complete until
100+
# coordinator.rejoining = False
101+
rejoining = set([c for c, consumer in six.iteritems(consumers) if consumer._coordinator.rejoining])
102+
103+
if not rejoining and len(generations) == 1:
104+
for c, consumer in six.iteritems(consumers):
105+
logging.info("[%s] %s %s: %s", c,
106+
consumer._coordinator._generation.generation_id,
107+
consumer._coordinator._generation.member_id,
108+
consumer.assignment())
109+
break
91110
else:
92-
93-
logging.info('All consumers have assignment... checking for stable group')
94-
# Verify all consumers are in the same generation
95-
# then log state and break while loop
96-
generations = set([consumer._coordinator._generation.generation_id
97-
for consumer in list(consumers.values())])
98-
99-
# New generation assignment is not complete until
100-
# coordinator.rejoining = False
101-
rejoining = set([c for c, consumer in list(consumers.items()) if consumer._coordinator.rejoining])
102-
103-
if not rejoining and len(generations) == 1:
104-
for c, consumer in list(consumers.items()):
105-
logging.info("[%s] %s %s: %s", c,
106-
consumer._coordinator._generation.generation_id,
107-
consumer._coordinator._generation.member_id,
108-
consumer.assignment())
109-
break
110-
else:
111-
logging.info('Rejoining: %s, generations: %s', rejoining, generations)
112-
time.sleep(1)
113-
assert time.time() < timeout, "timeout waiting for assignments"
114-
time.sleep(1)
111+
logging.info('Rejoining: %s, generations: %s', rejoining, generations)
112+
time.sleep(1)
113+
continue
115114

116115
logging.info('Group stabilized; verifying assignment')
117116
group_assignment = set()

0 commit comments

Comments
 (0)