Skip to content

Commit 65bc3f2

Browse files
committed
Always encode size with MessageSet
1 parent 68c8a4a commit 65bc3f2

File tree

2 files changed

+3
-5
lines changed

2 files changed

+3
-5
lines changed

kafka/protocol/message.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class MessageSet(AbstractType):
143143
HEADER_SIZE = 12 # offset + message_size
144144

145145
@classmethod
146-
def encode(cls, items, size=True, recalc_message_size=True):
146+
def encode(cls, items):
147147
# RecordAccumulator encodes messagesets internally
148148
if isinstance(items, io.BytesIO):
149149
size = Int32.decode(items)
@@ -156,8 +156,6 @@ def encode(cls, items, size=True, recalc_message_size=True):
156156
encoded_values.append(Int64.encode(offset))
157157
encoded_values.append(Bytes.encode(message))
158158
encoded = b''.join(encoded_values)
159-
if not size:
160-
return encoded
161159
return Bytes.encode(encoded)
162160

163161
@classmethod

test/test_protocol.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,7 @@ def test_encode_message_set():
7272
Message(b'v2', key=b'k2')
7373
]
7474
encoded = MessageSet.encode([(0, msg.encode())
75-
for msg in messages],
76-
size=False)
75+
for msg in messages])
7776
expect = b''.join([
7877
struct.pack('>q', 0), # MsgSet Offset
7978
struct.pack('>i', 18), # Msg Size
@@ -93,6 +92,7 @@ def test_encode_message_set():
9392
struct.pack('>i', 2), # Length of value
9493
b'v2', # Value
9594
])
95+
expect = struct.pack('>i', len(expect)) + expect
9696
assert encoded == expect
9797

9898

0 commit comments

Comments
 (0)