Skip to content

Commit b9f26c4

Browse files
committed
Refactor MessageSet and Message into LegacyRecordBatch to later support v2 message format
1 parent 8f21180 commit b9f26c4

21 files changed

+1148
-365
lines changed

kafka/consumer/fetcher.py

Lines changed: 33 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
from kafka.future import Future
1414
from kafka.metrics.stats import Avg, Count, Max, Rate
1515
from kafka.protocol.fetch import FetchRequest
16-
from kafka.protocol.message import PartialMessage
1716
from kafka.protocol.offset import (
1817
OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET
1918
)
19+
from kafka.record import MemoryRecords
2020
from kafka.serializer import Deserializer
2121
from kafka.structs import TopicPartition, OffsetAndTimestamp
2222

@@ -355,7 +355,7 @@ def fetched_records(self, max_records=None):
355355
356356
Raises:
357357
OffsetOutOfRangeError: if no subscription offset_reset_strategy
358-
InvalidMessageError: if message crc validation fails (check_crcs
358+
CorruptRecordException: if message crc validation fails (check_crcs
359359
must be set to True)
360360
RecordTooLargeError: if a message is larger than the currently
361361
configured max_partition_fetch_bytes
@@ -523,77 +523,26 @@ def _message_generator(self):
523523
" the current position is %d", tp, part.fetch_offset,
524524
position)
525525

526-
def _unpack_message_set(self, tp, messages):
526+
def _unpack_message_set(self, tp, records):
527527
try:
528-
for offset, size, msg in messages:
529-
if self.config['check_crcs'] and not msg.validate_crc():
530-
raise Errors.InvalidMessageError(msg)
531-
elif msg.is_compressed():
532-
# If relative offset is used, we need to decompress the entire message first to compute
533-
# the absolute offset.
534-
inner_mset = msg.decompress()
535-
536-
# There should only ever be a single layer of compression
537-
if inner_mset[0][-1].is_compressed():
538-
log.warning('MessageSet at %s offset %d appears '
539-
' double-compressed. This should not'
540-
' happen -- check your producers!',
541-
tp, offset)
542-
if self.config['skip_double_compressed_messages']:
543-
log.warning('Skipping double-compressed message at'
544-
' %s %d', tp, offset)
545-
continue
546-
547-
if msg.magic > 0:
548-
last_offset, _, _ = inner_mset[-1]
549-
absolute_base_offset = offset - last_offset
550-
else:
551-
absolute_base_offset = -1
552-
553-
for inner_offset, inner_size, inner_msg in inner_mset:
554-
if msg.magic > 0:
555-
# When magic value is greater than 0, the timestamp
556-
# of a compressed message depends on the
557-
# typestamp type of the wrapper message:
558-
559-
if msg.timestamp_type == 0: # CREATE_TIME (0)
560-
inner_timestamp = inner_msg.timestamp
561528

562-
elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
563-
inner_timestamp = msg.timestamp
564-
565-
else:
566-
raise ValueError('Unknown timestamp type: {0}'.format(msg.timestamp_type))
567-
else:
568-
inner_timestamp = msg.timestamp
569-
570-
if absolute_base_offset >= 0:
571-
inner_offset += absolute_base_offset
572-
573-
key = self._deserialize(
574-
self.config['key_deserializer'],
575-
tp.topic, inner_msg.key)
576-
value = self._deserialize(
577-
self.config['value_deserializer'],
578-
tp.topic, inner_msg.value)
579-
yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
580-
inner_timestamp, msg.timestamp_type,
581-
key, value, inner_msg.crc,
582-
len(inner_msg.key) if inner_msg.key is not None else -1,
583-
len(inner_msg.value) if inner_msg.value is not None else -1)
584-
585-
else:
529+
batch = records.next_batch()
530+
while batch is not None:
531+
for record in batch:
532+
key_size = len(record.key) if record.key is not None else -1
533+
value_size = len(record.value) if record.value is not None else -1
586534
key = self._deserialize(
587535
self.config['key_deserializer'],
588-
tp.topic, msg.key)
536+
tp.topic, record.key)
589537
value = self._deserialize(
590538
self.config['value_deserializer'],
591-
tp.topic, msg.value)
592-
yield ConsumerRecord(tp.topic, tp.partition, offset,
593-
msg.timestamp, msg.timestamp_type,
594-
key, value, msg.crc,
595-
len(msg.key) if msg.key is not None else -1,
596-
len(msg.value) if msg.value is not None else -1)
539+
tp.topic, record.value)
540+
yield ConsumerRecord(
541+
tp.topic, tp.partition, record.offset, record.timestamp,
542+
record.timestamp_type, key, value, record.checksum,
543+
key_size, value_size)
544+
545+
batch = records.next_batch()
597546

598547
# If unpacking raises StopIteration, it is erroneously
599548
# caught by the generator. We want all exceptions to be raised
@@ -848,7 +797,8 @@ def _handle_fetch_response(self, request, send_time, response):
848797
random.shuffle(response.topics)
849798
for topic, partitions in response.topics:
850799
random.shuffle(partitions)
851-
for partition, error_code, highwater, messages in partitions:
800+
for partition_data in partitions:
801+
partition, error_code, highwater = partition_data[:3]
852802
tp = TopicPartition(topic, partition)
853803
error_type = Errors.for_code(error_code)
854804
if not self._subscriptions.is_fetchable(tp):
@@ -859,6 +809,7 @@ def _handle_fetch_response(self, request, send_time, response):
859809

860810
elif error_type is Errors.NoError:
861811
self._subscriptions.assignment[tp].highwater = highwater
812+
records = MemoryRecords(partition_data[-1])
862813

863814
# we are interested in this fetch only if the beginning
864815
# offset (of the *request*) matches the current consumed position
@@ -873,29 +824,29 @@ def _handle_fetch_response(self, request, send_time, response):
873824
position)
874825
continue
875826

827+
if not records.has_next() and records.size_in_bytes() > 0:
828+
# we did not read a single message from a non-empty
829+
# buffer because that message's size is larger than
830+
# fetch size, in this case record this exception
831+
self._record_too_large_partitions[tp] = fetch_offset
832+
876833
num_bytes = 0
877-
partial = None
878-
if messages and isinstance(messages[-1][-1], PartialMessage):
879-
partial = messages.pop()
834+
message_count = 0
880835

881-
if messages:
836+
if records.has_next():
882837
log.debug("Adding fetched record for partition %s with"
883838
" offset %d to buffered record list", tp,
884839
position)
885-
unpacked = list(self._unpack_message_set(tp, messages))
840+
unpacked = list(self._unpack_message_set(tp, records))
886841
self._records.append(self.PartitionRecords(fetch_offset, tp, unpacked))
887-
last_offset, _, _ = messages[-1]
842+
last_offset = unpacked[-1].offset
888843
self._sensors.records_fetch_lag.record(highwater - last_offset)
889-
num_bytes = sum(msg[1] for msg in messages)
890-
elif partial:
891-
# we did not read a single message from a non-empty
892-
# buffer because that message's size is larger than
893-
# fetch size, in this case record this exception
894-
self._record_too_large_partitions[tp] = fetch_offset
844+
num_bytes = records.valid_bytes()
845+
message_count = len(unpacked)
895846

896-
self._sensors.record_topic_fetch_metrics(topic, num_bytes, len(messages))
847+
self._sensors.record_topic_fetch_metrics(topic, num_bytes, message_count)
897848
total_bytes += num_bytes
898-
total_count += len(messages)
849+
total_count += message_count
899850
elif error_type in (Errors.NotLeaderForPartitionError,
900851
Errors.UnknownTopicOrPartitionError):
901852
self._client.cluster.request_update()

kafka/errors.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,15 @@ class OffsetOutOfRangeError(BrokerResponseError):
9797
' maintained by the server for the given topic/partition.')
9898

9999

100-
class InvalidMessageError(BrokerResponseError):
100+
class CorruptRecordException(BrokerResponseError):
101101
errno = 2
102-
message = 'INVALID_MESSAGE'
102+
message = 'CORRUPT_MESSAGE'
103103
description = ('This message has failed its CRC checksum, exceeds the'
104104
' valid size, or is otherwise corrupt.')
105105

106+
# Backward compatibility
107+
InvalidMessageError = CorruptRecordException
108+
106109

107110
class UnknownTopicOrPartitionError(BrokerResponseError):
108111
errno = 3

kafka/producer/buffer.py

Lines changed: 1 addition & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -5,133 +5,9 @@
55
import threading
66
import time
77

8-
from ..codec import (has_gzip, has_snappy, has_lz4,
9-
gzip_encode, snappy_encode,
10-
lz4_encode, lz4_encode_old_kafka)
11-
from .. import errors as Errors
128
from ..metrics.stats import Rate
13-
from ..protocol.types import Int32, Int64
14-
from ..protocol.message import MessageSet, Message
159

16-
17-
18-
class MessageSetBuffer(object):
19-
"""Wrap a buffer for writing MessageSet batches.
20-
21-
Arguments:
22-
buf (IO stream): a buffer for writing data. Typically BytesIO.
23-
batch_size (int): maximum number of bytes to write to the buffer.
24-
25-
Keyword Arguments:
26-
compression_type ('gzip', 'snappy', None): compress messages before
27-
publishing. Default: None.
28-
"""
29-
_COMPRESSORS = {
30-
'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP),
31-
'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
32-
'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
33-
'lz4-old-kafka': (has_lz4, lz4_encode_old_kafka, Message.CODEC_LZ4),
34-
}
35-
def __init__(self, buf, batch_size, compression_type=None, message_version=0):
36-
if compression_type is not None:
37-
assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
38-
39-
# Kafka 0.8/0.9 had a quirky lz4...
40-
if compression_type == 'lz4' and message_version == 0:
41-
compression_type = 'lz4-old-kafka'
42-
43-
checker, encoder, attributes = self._COMPRESSORS[compression_type]
44-
assert checker(), 'Compression Libraries Not Found'
45-
self._compressor = encoder
46-
self._compression_attributes = attributes
47-
else:
48-
self._compressor = None
49-
self._compression_attributes = None
50-
51-
self._message_version = message_version
52-
self._buffer = buf
53-
# Init MessageSetSize to 0 -- update on close
54-
self._buffer.seek(0)
55-
self._buffer.write(Int32.encode(0))
56-
self._batch_size = batch_size
57-
self._closed = False
58-
self._messages = 0
59-
self._bytes_written = 4 # Int32 header is 4 bytes
60-
self._final_size = None
61-
62-
def append(self, offset, message):
63-
"""Append a Message to the MessageSet.
64-
65-
Arguments:
66-
offset (int): offset of the message
67-
message (Message or bytes): message struct or encoded bytes
68-
69-
Returns: bytes written
70-
"""
71-
if isinstance(message, Message):
72-
encoded = message.encode()
73-
else:
74-
encoded = bytes(message)
75-
msg = Int64.encode(offset) + Int32.encode(len(encoded)) + encoded
76-
self._buffer.write(msg)
77-
self._messages += 1
78-
self._bytes_written += len(msg)
79-
return len(msg)
80-
81-
def has_room_for(self, key, value):
82-
if self._closed:
83-
return False
84-
if not self._messages:
85-
return True
86-
needed_bytes = MessageSet.HEADER_SIZE + Message.HEADER_SIZE
87-
if key is not None:
88-
needed_bytes += len(key)
89-
if value is not None:
90-
needed_bytes += len(value)
91-
return self._buffer.tell() + needed_bytes < self._batch_size
92-
93-
def is_full(self):
94-
if self._closed:
95-
return True
96-
return self._buffer.tell() >= self._batch_size
97-
98-
def close(self):
99-
# This method may be called multiple times on the same batch
100-
# i.e., on retries
101-
# we need to make sure we only close it out once
102-
# otherwise compressed messages may be double-compressed
103-
# see Issue 718
104-
if not self._closed:
105-
if self._compressor:
106-
# TODO: avoid copies with bytearray / memoryview
107-
uncompressed_size = self._buffer.tell()
108-
self._buffer.seek(4)
109-
msg = Message(self._compressor(self._buffer.read(uncompressed_size - 4)),
110-
attributes=self._compression_attributes,
111-
magic=self._message_version)
112-
encoded = msg.encode()
113-
self._buffer.seek(4)
114-
self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg
115-
self._buffer.write(Int32.encode(len(encoded)))
116-
self._buffer.write(encoded)
117-
118-
# Update the message set size (less the 4 byte header),
119-
# and return with buffer ready for full read()
120-
self._final_size = self._buffer.tell()
121-
self._buffer.seek(0)
122-
self._buffer.write(Int32.encode(self._final_size - 4))
123-
124-
self._buffer.seek(0)
125-
self._closed = True
126-
127-
def size_in_bytes(self):
128-
return self._final_size or self._buffer.tell()
129-
130-
def compression_rate(self):
131-
return self.size_in_bytes() / self._bytes_written
132-
133-
def buffer(self):
134-
return self._buffer
10+
import kafka.errors as Errors
13511

13612

13713
class SimpleBufferPool(object):

0 commit comments

Comments
 (0)