Skip to content

Commit 5d2c792

Browse files
committed
Drop recursion in _unpack_message_set
1 parent 1eb7e05 commit 5d2c792

File tree

1 file changed

+36
-17
lines changed

1 file changed

+36
-17
lines changed

kafka/consumer/fetcher.py

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -352,33 +352,52 @@ def fetched_records(self):
352352
position)
353353
return dict(drained)
354354

355-
def _unpack_message_set(self, tp, messages, relative_offset=0):
355+
def _unpack_message_set(self, tp, messages):
356356
try:
357357
for offset, size, msg in messages:
358358
if self.config['check_crcs'] and not msg.validate_crc():
359359
raise Errors.InvalidMessageError(msg)
360360
elif msg.is_compressed():
361-
mset = msg.decompress()
362-
# new format uses relative offsets for compressed messages
361+
# If relative offset is used, we need to decompress the entire message first to compute
362+
# the absolute offset.
363+
inner_mset = msg.decompress()
363364
if msg.magic > 0:
364-
last_offset, _, _ = mset[-1]
365-
relative = offset - last_offset
365+
last_offset, _, _ = inner_mset[-1]
366+
absolute_base_offset = offset - last_offset
366367
else:
367-
relative = 0
368-
for record in self._unpack_message_set(tp, mset, relative):
369-
yield record
368+
absolute_base_offset = -1
369+
370+
for inner_offset, inner_size, inner_msg in inner_mset:
371+
if msg.magic > 0:
372+
# When magic value is greater than 0, the timestamp
373+
# of a compressed message depends on the
374+
# typestamp type of the wrapper message:
375+
376+
if msg.timestamp_type == 0: # CREATE_TIME (0)
377+
inner_timestamp = inner_msg.timestamp
378+
379+
elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
380+
inner_timestamp = msg.timestamp
381+
382+
else:
383+
raise ValueError('Unknown timestamp type: {}'.format(msg.timestamp_type))
384+
else:
385+
inner_timestamp = msg.timestamp
386+
387+
if absolute_base_offset >= 0:
388+
inner_offset += absolute_base_offset
389+
390+
key, value = self._deserialize(inner_msg)
391+
yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
392+
inner_timestamp, msg.timestamp_type,
393+
key, value)
394+
370395
else:
371-
# Message v1 adds timestamp
372-
if msg.magic > 0:
373-
timestamp = msg.timestamp
374-
timestamp_type = msg.timestamp_type
375-
else:
376-
timestamp = timestamp_type = None
377396
key, value = self._deserialize(msg)
378-
yield ConsumerRecord(tp.topic, tp.partition,
379-
offset + relative_offset,
380-
timestamp, timestamp_type,
397+
yield ConsumerRecord(tp.topic, tp.partition, offset,
398+
msg.timestamp, msg.timestamp_type,
381399
key, value)
400+
382401
# If unpacking raises StopIteration, it is erroneously
383402
# caught by the generator. We want all exceptions to be raised
384403
# back to the user. See Issue 545

0 commit comments

Comments
 (0)