Skip to content

Commit 72155d2

Browse files
committed
Add optional timeout_ms kwarg to consumer.close()
1 parent 7c369db commit 72155d2

File tree

2 files changed

+6
-4
lines changed

2 files changed

+6
-4
lines changed

kafka/consumer/group.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -470,19 +470,21 @@ def assignment(self):
470470
"""
471471
return self._subscription.assigned_partitions()
472472

473-
def close(self, autocommit=True):
473+
def close(self, autocommit=True, timeout_ms=None):
474474
"""Close the consumer, waiting indefinitely for any needed cleanup.
475475
476476
Keyword Arguments:
477477
autocommit (bool): If auto-commit is configured for this consumer,
478478
this optional flag causes the consumer to attempt to commit any
479479
pending consumed offsets prior to close. Default: True
480+
timeout_ms (num, optional): Milliseconds to wait for auto-commit.
481+
Default: None
480482
"""
481483
if self._closed:
482484
return
483485
log.debug("Closing the KafkaConsumer.")
484486
self._closed = True
485-
self._coordinator.close(autocommit=autocommit)
487+
self._coordinator.close(autocommit=autocommit, timeout_ms=timeout_ms)
486488
self._metrics.close()
487489
self._client.close()
488490
try:

test/test_consumer_group.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def consumer_thread(i):
6262
while not stop[i].is_set():
6363
for tp, records in six.iteritems(consumers[i].poll(timeout_ms=200)):
6464
messages[i][tp].extend(records)
65-
consumers[i].close()
65+
consumers[i].close(timeout_ms=500)
6666
consumers[i] = None
6767
stop[i] = None
6868

@@ -179,4 +179,4 @@ def test_heartbeat_thread(kafka_broker, topic):
179179
assert consumer._coordinator.heartbeat.last_poll == last_poll
180180
consumer.poll(timeout_ms=100)
181181
assert consumer._coordinator.heartbeat.last_poll > last_poll
182-
consumer.close()
182+
consumer.close(timeout_ms=100)

0 commit comments

Comments
 (0)