Skip to content

Commit 965d21b

Browse files
jeffwidmandpkp
authored andcommitted
Error if connections_max_idle_ms not larger than request_timeout_ms (#1688)
1 parent 812de35 commit 965d21b

File tree

2 files changed

+13
-5
lines changed

2 files changed

+13
-5
lines changed

kafka/consumer/group.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -313,11 +313,15 @@ def __init__(self, *topics, **configs):
313313
new_config, self.config['auto_offset_reset'])
314314
self.config['auto_offset_reset'] = new_config
315315

316+
connections_max_idle_ms = self.config['connections_max_idle_ms']
316317
request_timeout_ms = self.config['request_timeout_ms']
317318
fetch_max_wait_ms = self.config['fetch_max_wait_ms']
318-
if request_timeout_ms <= fetch_max_wait_ms:
319-
raise KafkaConfigurationError("Request timeout (%s) must be larger than fetch-max-wait-ms (%s)" %
320-
(request_timeout_ms, fetch_max_wait_ms))
319+
if not (fetch_max_wait_ms < request_timeout_ms < connections_max_idle_ms):
320+
raise KafkaConfigurationError(
321+
"connections_max_idle_ms ({}) must be larger than "
322+
"request_timeout_ms ({}) which must be larger than "
323+
"fetch_max_wait_ms ({})."
324+
.format(connections_max_idle_ms, request_timeout_ms, fetch_max_wait_ms))
321325

322326
metrics_tags = {'client-id': self.config['client_id']}
323327
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],

test/test_consumer.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@
1515
class TestKafkaConsumer:
1616
def test_session_timeout_larger_than_request_timeout_raises(self):
1717
with pytest.raises(KafkaConfigurationError):
18-
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0,9), group_id='foo', session_timeout_ms=60000, request_timeout_ms=40000)
18+
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=50000, request_timeout_ms=40000)
1919

2020
def test_fetch_max_wait_larger_than_request_timeout_raises(self):
2121
with pytest.raises(KafkaConfigurationError):
22-
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000)
22+
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=50000, request_timeout_ms=40000)
23+
24+
def test_request_timeout_larger_than_connections_max_idle_ms_raises(self):
25+
with pytest.raises(KafkaConfigurationError):
26+
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), request_timeout_ms=50000, connections_max_idle_ms=40000)
2327

2428
def test_subscription_copy(self):
2529
consumer = KafkaConsumer('foo', api_version=(0, 10))

0 commit comments

Comments
 (0)