Skip to content

Commit 916c257

Browse files
authored
Merge pull request #757 from dpkp/double_compression
Fix bug causing KafkaProducer to double-compress message batches
2 parents 1eb7e05 + ca9d2fa commit 916c257

File tree

2 files changed

+93
-16
lines changed

2 files changed

+93
-16
lines changed

kafka/producer/buffer.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -89,22 +89,29 @@ def is_full(self):
8989
return self._buffer.tell() >= self._batch_size
9090

9191
def close(self):
92-
if self._compressor:
93-
# TODO: avoid copies with bytearray / memoryview
94-
self._buffer.seek(4)
95-
msg = Message(self._compressor(self._buffer.read()),
96-
attributes=self._compression_attributes,
97-
magic=self._message_version)
98-
encoded = msg.encode()
99-
self._buffer.seek(4)
100-
self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg
101-
self._buffer.write(Int32.encode(len(encoded)))
102-
self._buffer.write(encoded)
103-
104-
# Update the message set size, and return ready for full read()
105-
size = self._buffer.tell() - 4
106-
self._buffer.seek(0)
107-
self._buffer.write(Int32.encode(size))
92+
# This method may be called multiple times on the same batch
93+
# i.e., on retries
94+
# we need to make sure we only close it out once
95+
# otherwise compressed messages may be double-compressed
96+
# see Issue 718
97+
if not self._closed:
98+
if self._compressor:
99+
# TODO: avoid copies with bytearray / memoryview
100+
self._buffer.seek(4)
101+
msg = Message(self._compressor(self._buffer.read()),
102+
attributes=self._compression_attributes,
103+
magic=self._message_version)
104+
encoded = msg.encode()
105+
self._buffer.seek(4)
106+
self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg
107+
self._buffer.write(Int32.encode(len(encoded)))
108+
self._buffer.write(encoded)
109+
110+
# Update the message set size, and return ready for full read()
111+
size = self._buffer.tell() - 4
112+
self._buffer.seek(0)
113+
self._buffer.write(Int32.encode(size))
114+
108115
self._buffer.seek(0)
109116
self._closed = True
110117

test/test_buffer.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# pylint: skip-file
2+
from __future__ import absolute_import
3+
4+
import io
5+
6+
import pytest
7+
8+
from kafka.producer.buffer import MessageSetBuffer
9+
from kafka.protocol.message import Message, MessageSet
10+
11+
12+
def test_buffer_close():
13+
records = MessageSetBuffer(io.BytesIO(), 100000)
14+
orig_msg = Message(b'foobar')
15+
records.append(1234, orig_msg)
16+
records.close()
17+
18+
msgset = MessageSet.decode(records.buffer())
19+
assert len(msgset) == 1
20+
(offset, size, msg) = msgset[0]
21+
assert offset == 1234
22+
assert msg == orig_msg
23+
24+
# Closing again should work fine
25+
records.close()
26+
27+
msgset = MessageSet.decode(records.buffer())
28+
assert len(msgset) == 1
29+
(offset, size, msg) = msgset[0]
30+
assert offset == 1234
31+
assert msg == orig_msg
32+
33+
34+
@pytest.mark.parametrize('compression', [
35+
'gzip',
36+
'snappy',
37+
pytest.mark.skipif("sys.version_info < (2,7)")('lz4'), # lz4tools does not work on py26
38+
])
39+
def test_compressed_buffer_close(compression):
40+
records = MessageSetBuffer(io.BytesIO(), 100000, compression_type=compression)
41+
orig_msg = Message(b'foobar')
42+
records.append(1234, orig_msg)
43+
records.close()
44+
45+
msgset = MessageSet.decode(records.buffer())
46+
assert len(msgset) == 1
47+
(offset, size, msg) = msgset[0]
48+
assert offset == 0
49+
assert msg.is_compressed()
50+
51+
msgset = msg.decompress()
52+
(offset, size, msg) = msgset[0]
53+
assert not msg.is_compressed()
54+
assert offset == 1234
55+
assert msg == orig_msg
56+
57+
# Closing again should work fine
58+
records.close()
59+
60+
msgset = MessageSet.decode(records.buffer())
61+
assert len(msgset) == 1
62+
(offset, size, msg) = msgset[0]
63+
assert offset == 0
64+
assert msg.is_compressed()
65+
66+
msgset = msg.decompress()
67+
(offset, size, msg) = msgset[0]
68+
assert not msg.is_compressed()
69+
assert offset == 1234
70+
assert msg == orig_msg

0 commit comments

Comments
 (0)