Skip to content

Commit dd073d8

Browse files
committed
Fixes racing condition when message is sent to broker before topic logs are created
1 parent 6605597 commit dd073d8

File tree

1 file changed

+17
-1
lines changed

1 file changed

+17
-1
lines changed

test/testutil.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010

1111
from kafka import SimpleClient, create_message
1212
from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError
13-
from kafka.structs import OffsetRequestPayload, ProduceRequestPayload
13+
from kafka.structs import OffsetRequestPayload, ProduceRequestPayload, \
14+
NotLeaderForPartitionError, UnknownTopicOrPartitionError, \
15+
FailedPayloadsError
1416
from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order
1517

1618
def kafka_versions(*versions):
@@ -128,6 +130,20 @@ def setUp(self):
128130
else:
129131
raise KafkaTimeoutError('Timeout loading topic metadata!')
130132

133+
134+
# Ensure topic partitions have been created on all brokers to avoid UnknownPartitionErrors
135+
# TODO: It might be a good idea to move this to self.client.ensure_topic_exists
136+
for partition in self.client.get_partition_ids_for_topic(self.topic):
137+
while True:
138+
try:
139+
req = OffsetRequestPayload(self.topic, partition, -1, 100)
140+
self.client.send_offset_request([req])
141+
break
142+
except (NotLeaderForPartitionError, UnknownTopicOrPartitionError, FailedPayloadsError) as e:
143+
if time.time() > timeout:
144+
raise KafkaTimeoutError('Timeout loading topic metadata!')
145+
time.sleep(.1)
146+
131147
self._messages = {}
132148

133149
def tearDown(self):

0 commit comments

Comments
 (0)