Skip to content

Commit a577053

Browse files
authored
Merge pull request #755 from dpkp/unrecurse_unpack_message_set
Drop recursion in Fetcher _unpack_message_set
2 parents 916c257 + ad13500 commit a577053

File tree

2 files changed

+64
-17
lines changed

2 files changed

+64
-17
lines changed

kafka/consumer/fetcher.py

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class Fetcher(six.Iterator):
3939
'fetch_max_wait_ms': 500,
4040
'max_partition_fetch_bytes': 1048576,
4141
'check_crcs': True,
42+
'skip_double_compressed_messages': False,
4243
'iterator_refetch_records': 1, # undocumented -- interface may change
4344
'api_version': (0, 8, 0),
4445
}
@@ -71,6 +72,13 @@ def __init__(self, client, subscriptions, metrics, metric_group_prefix,
7172
consumed. This ensures no on-the-wire or on-disk corruption to
7273
the messages occurred. This check adds some overhead, so it may
7374
be disabled in cases seeking extreme performance. Default: True
75+
skip_double_compressed_messages (bool): A bug in KafkaProducer
76+
caused some messages to be corrupted via double-compression.
77+
By default, the fetcher will return the messages as a compressed
78+
blob of bytes with a single offset, i.e. how the message was
79+
actually published to the cluster. If you prefer to have the
80+
fetcher automatically detect corrupt messages and skip them,
81+
set this option to True. Default: False.
7482
"""
7583
self.config = copy.copy(self.DEFAULT_CONFIG)
7684
for key in self.config:
@@ -352,33 +360,64 @@ def fetched_records(self):
352360
position)
353361
return dict(drained)
354362

355-
def _unpack_message_set(self, tp, messages, relative_offset=0):
363+
def _unpack_message_set(self, tp, messages):
356364
try:
357365
for offset, size, msg in messages:
358366
if self.config['check_crcs'] and not msg.validate_crc():
359367
raise Errors.InvalidMessageError(msg)
360368
elif msg.is_compressed():
361-
mset = msg.decompress()
362-
# new format uses relative offsets for compressed messages
369+
# If relative offset is used, we need to decompress the entire message first to compute
370+
# the absolute offset.
371+
inner_mset = msg.decompress()
372+
373+
# There should only ever be a single layer of compression
374+
if inner_mset[0][-1].is_compressed():
375+
log.warning('MessageSet at %s offset %d appears '
376+
' double-compressed. This should not'
377+
' happen -- check your producers!',
378+
tp, offset)
379+
if self.config['skip_double_compressed_messages']:
380+
log.warning('Skipping double-compressed message at'
381+
' %s %d', tp, offset)
382+
continue
383+
363384
if msg.magic > 0:
364-
last_offset, _, _ = mset[-1]
365-
relative = offset - last_offset
385+
last_offset, _, _ = inner_mset[-1]
386+
absolute_base_offset = offset - last_offset
366387
else:
367-
relative = 0
368-
for record in self._unpack_message_set(tp, mset, relative):
369-
yield record
388+
absolute_base_offset = -1
389+
390+
for inner_offset, inner_size, inner_msg in inner_mset:
391+
if msg.magic > 0:
392+
# When magic value is greater than 0, the timestamp
393+
# of a compressed message depends on the
394+
# typestamp type of the wrapper message:
395+
396+
if msg.timestamp_type == 0: # CREATE_TIME (0)
397+
inner_timestamp = inner_msg.timestamp
398+
399+
elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
400+
inner_timestamp = msg.timestamp
401+
402+
else:
403+
raise ValueError('Unknown timestamp type: {}'.format(msg.timestamp_type))
404+
else:
405+
inner_timestamp = msg.timestamp
406+
407+
if absolute_base_offset >= 0:
408+
inner_offset += absolute_base_offset
409+
410+
key, value = self._deserialize(inner_msg)
411+
yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
412+
inner_timestamp, msg.timestamp_type,
413+
key, value)
414+
370415
else:
371-
# Message v1 adds timestamp
372-
if msg.magic > 0:
373-
timestamp = msg.timestamp
374-
timestamp_type = msg.timestamp_type
375-
else:
376-
timestamp = timestamp_type = None
377416
key, value = self._deserialize(msg)
378-
yield ConsumerRecord(tp.topic, tp.partition,
379-
offset + relative_offset,
380-
timestamp, timestamp_type,
417+
yield ConsumerRecord(tp.topic, tp.partition, offset,
418+
msg.timestamp, msg.timestamp_type,
381419
key, value)
420+
382421
# If unpacking raises StopIteration, it is erroneously
383422
# caught by the generator. We want all exceptions to be raised
384423
# back to the user. See Issue 545

kafka/consumer/group.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ class KafkaConsumer(six.Iterator):
123123
consumer_timeout_ms (int): number of milliseconds to block during
124124
message iteration before raising StopIteration (i.e., ending the
125125
iterator). Default -1 (block forever).
126+
skip_double_compressed_messages (bool): A bug in KafkaProducer <= 1.2.4
127+
caused some messages to be corrupted via double-compression.
128+
By default, the fetcher will return these messages as a compressed
129+
blob of bytes with a single offset, i.e. how the message was
130+
actually published to the cluster. If you prefer to have the
131+
fetcher automatically detect corrupt messages and skip them,
132+
set this option to True. Default: False.
126133
security_protocol (str): Protocol used to communicate with brokers.
127134
Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
128135
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
@@ -189,6 +196,7 @@ class KafkaConsumer(six.Iterator):
189196
'send_buffer_bytes': None,
190197
'receive_buffer_bytes': None,
191198
'consumer_timeout_ms': -1,
199+
'skip_double_compressed_messages': False,
192200
'security_protocol': 'PLAINTEXT',
193201
'ssl_context': None,
194202
'ssl_check_hostname': True,

0 commit comments

Comments
 (0)