Skip to content

Config parameter 'coordinator_not_ready_retry_timeout_ms' #1209

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
4 changes: 4 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ class KafkaConsumer(six.Iterator):
metrics. Default: 2
metrics_sample_window_ms (int): The maximum age in milliseconds of
samples used to compute metrics. Default: 30000
coordinator_not_ready_retry_timeout_ms (int): The timeout used to detect
that the Kafka coordinator is not available. If 'None', the default
behavior of polling indefinitely would be kept. Default: None
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector
Expand Down Expand Up @@ -316,6 +319,7 @@ class KafkaConsumer(six.Iterator):
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'metric_group_prefix': 'consumer',
'coordinator_not_ready_retry_timeout_ms': None,
'selector': selectors.DefaultSelector,
'exclude_internal_topics': True,
'sasl_mechanism': None,
Expand Down
19 changes: 17 additions & 2 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class BaseCoordinator(object):
'group_id': 'kafka-python-default-group',
'session_timeout_ms': 10000,
'heartbeat_interval_ms': 3000,
'coordinator_not_ready_retry_timeout_ms': None,
'max_poll_interval_ms': 300000,
'retry_backoff_ms': 100,
'api_version': (0, 10, 1),
Expand All @@ -103,6 +104,10 @@ def __init__(self, client, metrics, **configs):
should be set no higher than 1/3 of that value. It can be
adjusted even lower to control the expected time for normal
rebalances. Default: 3000
coordinator_not_ready_retry_timeout_ms (int): The timeout used to
detect that the Kafka coordinator is not available. If 'None',
the default behavior of polling indefinitely would be kept.
Default: None
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
"""
Expand Down Expand Up @@ -234,10 +239,12 @@ def coordinator(self):
else:
return self.coordinator_id

def ensure_coordinator_ready(self):
def ensure_coordinator_ready(self, timeout_ms=None):
"""Block until the coordinator for this group is known
(and we have an active connection -- java client uses unsent queue).
"""
retry_timeout_ms = timeout_ms or self.config['coordinator_not_ready_retry_timeout_ms']
retry_start_time_in_secs = time.time()
with self._client._lock, self._lock:
while self.coordinator_unknown():

Expand All @@ -255,7 +262,15 @@ def ensure_coordinator_ready(self):

if future.failed():
if future.retriable():
if getattr(future.exception, 'invalid_metadata', False):
if retry_timeout_ms is not None and isinstance(
future.exception, (Errors.NodeNotReadyError, Errors.NoBrokersAvailable)):
remaining_retry_timeout_ms = retry_timeout_ms - (
time.time() - retry_start_time_in_secs) * 1000
if remaining_retry_timeout_ms <= 0:
raise future.exception # pylint: disable-msg=raising-bad-type
self._client.poll(timeout_ms=min(
self.config['retry_backoff_ms'], remaining_retry_timeout_ms))
elif getattr(future.exception, 'invalid_metadata', False):
log.debug('Requesting metadata for group coordinator request: %s', future.exception)
metadata_update = self._client.cluster.request_update()
self._client.poll(future=metadata_update)
Expand Down
7 changes: 6 additions & 1 deletion kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class ConsumerCoordinator(BaseCoordinator):
'retry_backoff_ms': 100,
'api_version': (0, 10, 1),
'exclude_internal_topics': True,
'metric_group_prefix': 'consumer'
'metric_group_prefix': 'consumer',
'coordinator_not_ready_retry_timeout_ms': None
}

def __init__(self, client, subscription, metrics, **configs):
Expand Down Expand Up @@ -77,6 +78,10 @@ def __init__(self, client, subscription, metrics, **configs):
(such as offsets) should be exposed to the consumer. If set to
True the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+. Default: True
coordinator_not_ready_retry_timeout_ms (int): The timeout used to
detect that the Kafka coordinator is not available. If 'None',
the default behavior of polling indefinitely would be kept.
Default: None.
"""
super(ConsumerCoordinator, self).__init__(client, metrics, **configs)

Expand Down