Skip to content

Commit d214321

Browse files
authored
Move ensure_valid_topic_name to kafka.util; use in client and producer (#2561)
1 parent cd4830a commit d214321

File tree

5 files changed

+40
-27
lines changed

5 files changed

+40
-27
lines changed

kafka/client_async.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from kafka.metrics.stats.rate import TimeUnit
2828
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
2929
from kafka.protocol.metadata import MetadataRequest
30-
from kafka.util import Dict, WeakMethod
30+
from kafka.util import Dict, WeakMethod, ensure_valid_topic_name
3131
# Although this looks unused, it actually monkey-patches socket.socketpair()
3232
# and should be left in as long as we're using socket.socketpair() in this file
3333
from kafka.vendor import socketpair # noqa: F401
@@ -909,7 +909,13 @@ def add_topic(self, topic):
909909
910910
Returns:
911911
Future: resolves after metadata request/response
912+
913+
Raises:
914+
TypeError: if topic is not a string
915+
ValueError: if topic is invalid: must be chars (a-zA-Z0-9._-), and less than 250 length
912916
"""
917+
ensure_valid_topic_name(topic)
918+
913919
if topic in self._topics:
914920
return Future().success(set(self._topics))
915921

kafka/consumer/subscription_state.py

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from kafka.errors import IllegalStateError
1010
from kafka.protocol.list_offsets import OffsetResetStrategy
1111
from kafka.structs import OffsetAndMetadata
12+
from kafka.util import ensure_valid_topic_name
1213

1314
log = logging.getLogger(__name__)
1415

@@ -43,10 +44,6 @@ class SubscriptionState(object):
4344
" (2) subscribe to topics matching a regex pattern,"
4445
" (3) assign itself specific topic-partitions.")
4546

46-
# Taken from: https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29
47-
_MAX_NAME_LENGTH = 249
48-
_TOPIC_LEGAL_CHARS = re.compile('^[a-zA-Z0-9._-]+$')
49-
5047
def __init__(self, offset_reset_strategy='earliest'):
5148
"""Initialize a SubscriptionState instance
5249
@@ -123,24 +120,6 @@ def subscribe(self, topics=(), pattern=None, listener=None):
123120
raise TypeError('listener must be a ConsumerRebalanceListener')
124121
self.listener = listener
125122

126-
def _ensure_valid_topic_name(self, topic):
127-
""" Ensures that the topic name is valid according to the kafka source. """
128-
129-
# See Kafka Source:
130-
# https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
131-
if topic is None:
132-
raise TypeError('All topics must not be None')
133-
if not isinstance(topic, six.string_types):
134-
raise TypeError('All topics must be strings')
135-
if len(topic) == 0:
136-
raise ValueError('All topics must be non-empty strings')
137-
if topic == '.' or topic == '..':
138-
raise ValueError('Topic name cannot be "." or ".."')
139-
if len(topic) > self._MAX_NAME_LENGTH:
140-
raise ValueError('Topic name is illegal, it can\'t be longer than {0} characters, topic: "{1}"'.format(self._MAX_NAME_LENGTH, topic))
141-
if not self._TOPIC_LEGAL_CHARS.match(topic):
142-
raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic))
143-
144123
def change_subscription(self, topics):
145124
"""Change the topic subscription.
146125
@@ -166,7 +145,7 @@ def change_subscription(self, topics):
166145
return
167146

168147
for t in topics:
169-
self._ensure_valid_topic_name(t)
148+
ensure_valid_topic_name(t)
170149

171150
log.info('Updating subscribed topics to: %s', topics)
172151
self.subscription = set(topics)

kafka/producer/kafka.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from kafka.record.legacy_records import LegacyRecordBatchBuilder
2323
from kafka.serializer import Serializer
2424
from kafka.structs import TopicPartition
25+
from kafka.util import ensure_valid_topic_name
2526

2627

2728
log = logging.getLogger(__name__)
@@ -593,11 +594,15 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
593594
Raises:
594595
KafkaTimeoutError: if unable to fetch topic metadata, or unable
595596
to obtain memory buffer prior to configured max_block_ms
597+
TypeError: if topic is not a string
598+
ValueError: if topic is invalid: must be chars (a-zA-Z0-9._-), and less than 250 length
599+
AssertionError: if KafkaProducer is closed, or key and value are both None
596600
"""
597601
assert not self._closed, 'KafkaProducer already closed!'
598602
assert value is not None or self.config['api_version'] >= (0, 8, 1), (
599603
'Null messages require kafka >= 0.8.1')
600604
assert not (value is None and key is None), 'Need at least one: key or value'
605+
ensure_valid_topic_name(topic)
601606
key_bytes = value_bytes = None
602607
try:
603608
assigned_partition = None

kafka/util.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import absolute_import
22

33
import binascii
4+
import re
45
import time
56
import weakref
67

@@ -43,6 +44,29 @@ def inner_timeout_ms(fallback=None):
4344
return inner_timeout_ms
4445

4546

47+
# Taken from: https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29
48+
TOPIC_MAX_LENGTH = 249
49+
TOPIC_LEGAL_CHARS = re.compile('^[a-zA-Z0-9._-]+$')
50+
51+
def ensure_valid_topic_name(topic):
52+
""" Ensures that the topic name is valid according to the kafka source. """
53+
54+
# See Kafka Source:
55+
# https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
56+
if topic is None:
57+
raise TypeError('All topics must not be None')
58+
if not isinstance(topic, six.string_types):
59+
raise TypeError('All topics must be strings')
60+
if len(topic) == 0:
61+
raise ValueError('All topics must be non-empty strings')
62+
if topic == '.' or topic == '..':
63+
raise ValueError('Topic name cannot be "." or ".."')
64+
if len(topic) > TOPIC_MAX_LENGTH:
65+
raise ValueError('Topic name is illegal, it can\'t be longer than {0} characters, topic: "{1}"'.format(TOPIC_MAX_LENGTH, topic))
66+
if not TOPIC_LEGAL_CHARS.match(topic):
67+
raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic))
68+
69+
4670
class WeakMethod(object):
4771
"""
4872
Callable that weakly references a method and the object it is bound to. It

test/test_subscription_state.py renamed to test/test_util.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import pytest
55

6-
from kafka.consumer.subscription_state import SubscriptionState
6+
from kafka.util import ensure_valid_topic_name
77

88
@pytest.mark.parametrize(('topic_name', 'expectation'), [
99
(0, pytest.raises(TypeError)),
@@ -20,6 +20,5 @@
2020
('name+with+plus', pytest.raises(ValueError)),
2121
])
2222
def test_topic_name_validation(topic_name, expectation):
23-
state = SubscriptionState()
2423
with expectation:
25-
state._ensure_valid_topic_name(topic_name)
24+
ensure_valid_topic_name(topic_name)

0 commit comments

Comments
 (0)