Skip to content

Commit 5360d79

Browse files
committed
Limit producer close timeout to 1sec in __del__; use context managers to close in test_producer
1 parent de17b9f commit 5360d79

File tree

2 files changed

+102
-81
lines changed

2 files changed

+102
-81
lines changed

kafka/producer/kafka.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ def _unregister_cleanup(self):
474474
self._cleanup = None
475475

476476
def __del__(self):
477-
self.close(null_logger=True)
477+
self.close(timeout=1, null_logger=True)
478478

479479
def close(self, timeout=None, null_logger=False):
480480
"""Close this producer.

test/test_producer.py

+101-80
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from contextlib import contextmanager
12
import gc
23
import platform
34
import time
@@ -22,6 +23,24 @@ def test_buffer_pool():
2223
assert buf2.read() == b''
2324

2425

26+
@contextmanager
27+
def producer_factory(**kwargs):
28+
producer = KafkaProducer(**kwargs)
29+
try:
30+
yield producer
31+
finally:
32+
producer.close(timeout=0)
33+
34+
35+
@contextmanager
36+
def consumer_factory(**kwargs):
37+
consumer = KafkaConsumer(**kwargs)
38+
try:
39+
yield consumer
40+
finally:
41+
consumer.close()
42+
43+
2544
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
2645
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd'])
2746
def test_end_to_end(kafka_broker, compression):
@@ -35,37 +54,39 @@ def test_end_to_end(kafka_broker, compression):
3554
pytest.skip('zstd requires kafka 2.1.0 or newer')
3655

3756
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
38-
producer = KafkaProducer(bootstrap_servers=connect_str,
39-
retries=5,
40-
max_block_ms=30000,
41-
compression_type=compression,
42-
value_serializer=str.encode)
43-
consumer = KafkaConsumer(bootstrap_servers=connect_str,
44-
group_id=None,
45-
consumer_timeout_ms=30000,
46-
auto_offset_reset='earliest',
47-
value_deserializer=bytes.decode)
48-
49-
topic = random_string(5)
50-
51-
messages = 100
52-
futures = []
53-
for i in range(messages):
54-
futures.append(producer.send(topic, 'msg %d' % i))
55-
ret = [f.get(timeout=30) for f in futures]
56-
assert len(ret) == messages
57-
producer.close()
58-
59-
consumer.subscribe([topic])
60-
msgs = set()
61-
for i in range(messages):
62-
try:
63-
msgs.add(next(consumer).value)
64-
except StopIteration:
65-
break
66-
67-
assert msgs == set(['msg %d' % (i,) for i in range(messages)])
68-
consumer.close()
57+
producer_args = {
58+
'bootstrap_servers': connect_str,
59+
'retries': 5,
60+
'max_block_ms': 30000,
61+
'compression_type': compression,
62+
'value_serializer': str.encode,
63+
}
64+
consumer_args = {
65+
'bootstrap_servers': connect_str,
66+
'group_id': None,
67+
'consumer_timeout_ms': 30000,
68+
'auto_offset_reset': 'earliest',
69+
'value_deserializer': bytes.decode,
70+
}
71+
with producer_factory(**producer_args) as producer, consumer_factory(**consumer_args) as consumer:
72+
topic = random_string(5)
73+
74+
messages = 100
75+
futures = []
76+
for i in range(messages):
77+
futures.append(producer.send(topic, 'msg %d' % i))
78+
ret = [f.get(timeout=30) for f in futures]
79+
assert len(ret) == messages
80+
81+
consumer.subscribe([topic])
82+
msgs = set()
83+
for i in range(messages):
84+
try:
85+
msgs.add(next(consumer).value)
86+
except StopIteration:
87+
break
88+
89+
assert msgs == set(['msg %d' % (i,) for i in range(messages)])
6990

7091

7192
@pytest.mark.skipif(platform.python_implementation() != 'CPython',
@@ -86,52 +107,52 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
86107
if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
87108
pytest.skip('zstd requires 2.1.0 or more')
88109
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
89-
producer = KafkaProducer(bootstrap_servers=connect_str,
90-
retries=5,
91-
max_block_ms=30000,
92-
compression_type=compression)
93-
magic = producer._max_usable_produce_magic()
94-
95-
# record headers are supported in 0.11.0
96-
if env_kafka_version() < (0, 11, 0):
97-
headers = None
98-
else:
99-
headers = [("Header Key", b"Header Value")]
100-
101-
topic = random_string(5)
102-
future = producer.send(
103-
topic,
104-
value=b"Simple value", key=b"Simple key", headers=headers, timestamp_ms=9999999,
105-
partition=0)
106-
record = future.get(timeout=5)
107-
assert record is not None
108-
assert record.topic == topic
109-
assert record.partition == 0
110-
assert record.topic_partition == TopicPartition(topic, 0)
111-
assert record.offset == 0
112-
if magic >= 1:
113-
assert record.timestamp == 9999999
114-
else:
115-
assert record.timestamp == -1 # NO_TIMESTAMP
116-
117-
if magic >= 2:
118-
assert record.checksum is None
119-
elif magic == 1:
120-
assert record.checksum == 1370034956
121-
else:
122-
assert record.checksum == 3296137851
123-
124-
assert record.serialized_key_size == 10
125-
assert record.serialized_value_size == 12
126-
if headers:
127-
assert record.serialized_header_size == 22
128-
129-
if magic == 0:
130-
pytest.skip('generated timestamp case is skipped for broker 0.9 and below')
131-
send_time = time.time() * 1000
132-
future = producer.send(
133-
topic,
134-
value=b"Simple value", key=b"Simple key", timestamp_ms=None,
135-
partition=0)
136-
record = future.get(timeout=5)
137-
assert abs(record.timestamp - send_time) <= 1000 # Allow 1s deviation
110+
with producer_factory(bootstrap_servers=connect_str,
111+
retries=5,
112+
max_block_ms=30000,
113+
compression_type=compression) as producer:
114+
magic = producer._max_usable_produce_magic()
115+
116+
# record headers are supported in 0.11.0
117+
if env_kafka_version() < (0, 11, 0):
118+
headers = None
119+
else:
120+
headers = [("Header Key", b"Header Value")]
121+
122+
topic = random_string(5)
123+
future = producer.send(
124+
topic,
125+
value=b"Simple value", key=b"Simple key", headers=headers, timestamp_ms=9999999,
126+
partition=0)
127+
record = future.get(timeout=5)
128+
assert record is not None
129+
assert record.topic == topic
130+
assert record.partition == 0
131+
assert record.topic_partition == TopicPartition(topic, 0)
132+
assert record.offset == 0
133+
if magic >= 1:
134+
assert record.timestamp == 9999999
135+
else:
136+
assert record.timestamp == -1 # NO_TIMESTAMP
137+
138+
if magic >= 2:
139+
assert record.checksum is None
140+
elif magic == 1:
141+
assert record.checksum == 1370034956
142+
else:
143+
assert record.checksum == 3296137851
144+
145+
assert record.serialized_key_size == 10
146+
assert record.serialized_value_size == 12
147+
if headers:
148+
assert record.serialized_header_size == 22
149+
150+
if magic == 0:
151+
pytest.skip('generated timestamp case is skipped for broker 0.9 and below')
152+
send_time = time.time() * 1000
153+
future = producer.send(
154+
topic,
155+
value=b"Simple value", key=b"Simple key", timestamp_ms=None,
156+
partition=0)
157+
record = future.get(timeout=5)
158+
assert abs(record.timestamp - send_time) <= 1000 # Allow 1s deviation

0 commit comments

Comments
 (0)