Skip to content

Improve test_consumer_group::test_group logging before group stabilized #2534

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 33 additions & 34 deletions test/test_consumer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,43 +75,42 @@ def consumer_thread(i):
try:
timeout = time.time() + 15
while True:
for c in range(num_consumers):

# Verify all consumers have been created
if c not in consumers:
logging.info('%s not in consumers list yet...', c)
break

# Verify all consumers have an assignment
elif not consumers[c].assignment():
logging.info('Consumer %s does not have assignment yet...', c)
break
assert time.time() < timeout, "timeout waiting for assignments"
# Verify all consumers have been created
missing_consumers = set(consumers.keys()) - set(range(num_consumers))
if missing_consumers:
logging.info('Waiting on consumer threads: %s', missing_consumers)
time.sleep(1)
continue

unassigned_consumers = {c for c, consumer in six.iteritems(consumers) if not consumer.assignment()}
if unassigned_consumers:
logging.info('Waiting for consumer assignments: %s', unassigned_consumers)
time.sleep(1)
continue

# If all consumers exist and have an assignment
logging.info('All consumers have assignment... checking for stable group')
# Verify all consumers are in the same generation
# then log state and break while loop
generations = set([consumer._coordinator._generation.generation_id
for consumer in six.itervalues(consumers)])

# New generation assignment is not complete until
# coordinator.rejoining = False
rejoining = set([c for c, consumer in six.iteritems(consumers) if consumer._coordinator.rejoining])

if not rejoining and len(generations) == 1:
for c, consumer in six.iteritems(consumers):
logging.info("[%s] %s %s: %s", c,
consumer._coordinator._generation.generation_id,
consumer._coordinator._generation.member_id,
consumer.assignment())
break
else:

logging.info('All consumers have assignment... checking for stable group')
# Verify all consumers are in the same generation
# then log state and break while loop
generations = set([consumer._coordinator._generation.generation_id
for consumer in list(consumers.values())])

# New generation assignment is not complete until
# coordinator.rejoining = False
rejoining = set([c for c, consumer in list(consumers.items()) if consumer._coordinator.rejoining])

if not rejoining and len(generations) == 1:
for c, consumer in list(consumers.items()):
logging.info("[%s] %s %s: %s", c,
consumer._coordinator._generation.generation_id,
consumer._coordinator._generation.member_id,
consumer.assignment())
break
else:
logging.info('Rejoining: %s, generations: %s', rejoining, generations)
time.sleep(1)
assert time.time() < timeout, "timeout waiting for assignments"
time.sleep(1)
logging.info('Rejoining: %s, generations: %s', rejoining, generations)
time.sleep(1)
continue

logging.info('Group stabilized; verifying assignment')
group_assignment = set()
Expand Down
Loading