Skip to content

Commit 96962a0

Browse files
committed
Remove the check for timestamp None in producer, as it's done in RecordBatch anyway.
Minor abc doc fixes.
1 parent 3fbb979 commit 96962a0

File tree

3 files changed

+7
-22
lines changed

3 files changed

+7
-22
lines changed

kafka/consumer/fetcher.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -551,12 +551,6 @@ def _unpack_message_set(self, tp, records):
551551
log.exception('StopIteration raised unpacking messageset: %s', e)
552552
raise Exception('StopIteration raised unpacking messageset')
553553

554-
# If unpacking raises AssertionError, it means decompression unsupported
555-
# See Issue 1033
556-
except AssertionError as e:
557-
log.exception('AssertionError raised unpacking messageset: %s', e)
558-
raise
559-
560554
def __iter__(self): # pylint: disable=non-iterator-returned
561555
return self
562556

kafka/producer/kafka.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -544,8 +544,6 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
544544
self._ensure_valid_record_size(message_size)
545545

546546
tp = TopicPartition(topic, partition)
547-
if timestamp_ms is None:
548-
timestamp_ms = int(time.time() * 1000)
549547
log.debug("Sending (key=%r value=%r) to %s", key, value, tp)
550548
result = self._accumulator.append(tp, timestamp_ms,
551549
key_bytes, value_bytes,

kafka/record/abc.py

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,28 +36,21 @@ def checksum(self):
3636
be the checksum for v0 and v1 and None for v2 and above.
3737
"""
3838

39-
@abc.abstractproperty
40-
def headers(self):
41-
""" If supported by version list of key-value tuples, or empty list if
42-
not supported by format.
43-
"""
44-
4539

4640
class ABCRecordBatchBuilder(object):
4741
__metaclass__ = abc.ABCMeta
4842

4943
@abc.abstractmethod
50-
def append(self, offset, timestamp, key, value, headers):
44+
def append(self, offset, timestamp, key, value):
5145
""" Writes record to internal buffer.
5246
5347
Arguments:
5448
offset (int): Relative offset of record, starting from 0
55-
timestamp (int): Timestamp in milliseconds since beginning of the
56-
epoch (midnight Jan 1, 1970 (UTC))
49+
timestamp (int or None): Timestamp in milliseconds since beginning
50+
of the epoch (midnight Jan 1, 1970 (UTC)). If omited, will be
51+
set to current time.
5752
key (bytes or None): Key of the record
5853
value (bytes or None): Value of the record
59-
headers (List[Tuple[str, bytes]]): Headers of the record. Header
60-
keys can not be ``None``.
6154
6255
Returns:
6356
(bytes, int): Checksum of the written record (or None for v2 and
@@ -74,10 +67,10 @@ def size_in_bytes(self, offset, timestamp, key, value, headers):
7467
@abc.abstractmethod
7568
def build(self):
7669
""" Close for append, compress if needed, write size and header and
77-
return a ready to send bytes object.
70+
return a ready to send buffer object.
7871
7972
Return:
80-
io.BytesIO: finished batch, ready to send.
73+
bytearray: finished batch, ready to send.
8174
"""
8275

8376

@@ -105,7 +98,7 @@ def __init__(self, buffer):
10598

10699
@abc.abstractmethod
107100
def size_in_bytes(self):
108-
""" Returns the size of buffer.
101+
""" Returns the size of inner buffer.
109102
"""
110103

111104
@abc.abstractmethod

0 commit comments

Comments
 (0)