Skip to content

KAFKA-2832: Add a consumer config option to exclude internal topics #765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ class KafkaConsumer(six.Iterator):
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector
exclude_internal_topics (bool): Whether records from internal topics
(such as offsets) should be exposed to the consumer. If set to True
the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+ Default: True

Note:
Configuration parameters are described in more detail at
Expand Down Expand Up @@ -222,6 +226,7 @@ class KafkaConsumer(six.Iterator):
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'selector': selectors.DefaultSelector,
'exclude_internal_topics': True,
}

def __init__(self, *topics, **configs):
Expand Down
12 changes: 8 additions & 4 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class ConsumerCoordinator(BaseCoordinator):
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
'api_version': (0, 9),
'exclude_internal_topics': True,
}

def __init__(self, client, subscription, metrics, metric_group_prefix,
Expand Down Expand Up @@ -70,6 +71,10 @@ def __init__(self, client, subscription, metrics, metric_group_prefix,
using Kafka's group managementment facilities. Default: 30000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
exclude_internal_topics (bool): Whether records from internal topics
(such as offsets) should be exposed to the consumer. If set to
True the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+. Default: True
"""
super(ConsumerCoordinator, self).__init__(client, **configs)
self.config = copy.copy(self.DEFAULT_CONFIG)
Expand Down Expand Up @@ -131,13 +136,12 @@ def group_protocols(self):

def _handle_metadata_update(self, cluster):
# if we encounter any unauthorized topics, raise an exception
# TODO
#if self._cluster.unauthorized_topics:
# raise TopicAuthorizationError(self._cluster.unauthorized_topics)
if cluster.unauthorized_topics:
raise Errors.TopicAuthorizationFailedError(cluster.unauthorized_topics)

if self._subscription.subscribed_pattern:
topics = []
for topic in cluster.topics():
for topic in cluster.topics(self.config['exclude_internal_topics']):
if self._subscription.subscribed_pattern.match(topic):
topics.append(topic)

Expand Down