File tree Expand file tree Collapse file tree 1 file changed +12
-2
lines changed Expand file tree Collapse file tree 1 file changed +12
-2
lines changed Original file line number Diff line number Diff line change @@ -233,17 +233,27 @@ def lz4_encode_old_kafka(payload):
233
233
flg = data [4 ]
234
234
else :
235
235
flg = ord (data [4 ])
236
+
236
237
content_size_bit = ((flg >> 3 ) & 1 )
237
238
if content_size_bit :
238
- header_size += 8
239
+ # Old kafka does not accept the content-size field
240
+ # so we need to discard it and reset the header flag
241
+ flg -= 8
242
+ if isinstance (data [4 ], int ):
243
+ data [4 ] = flg
244
+ else :
245
+ data = data [0 :4 ] + chr (flg ) + data [5 :]
246
+ payload = data [header_size + 8 :]
247
+ else :
248
+ payload = data [header_size :]
239
249
240
250
# This is the incorrect hc
241
251
hc = xxhash .xxh32 (data [0 :header_size - 1 ]).digest ()[- 2 :- 1 ] # pylint: disable-msg=no-member
242
252
243
253
return b'' .join ([
244
254
data [0 :header_size - 1 ],
245
255
hc ,
246
- data [ header_size :]
256
+ payload
247
257
])
248
258
249
259
You can’t perform that action at this time.
0 commit comments