Skip to content

Commit b6629a0

Browse files
committed
Remove the check for timestamp None in producer, as it's done in RecordBatch anyway.
Minor abc doc fixes.
1 parent db2a898 commit b6629a0

File tree

3 files changed

+7
-22
lines changed

3 files changed

+7
-22
lines changed

kafka/consumer/fetcher.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -467,12 +467,6 @@ def _unpack_message_set(self, tp, records):
467467
log.exception('StopIteration raised unpacking messageset: %s', e)
468468
raise Exception('StopIteration raised unpacking messageset')
469469

470-
# If unpacking raises AssertionError, it means decompression unsupported
471-
# See Issue 1033
472-
except AssertionError as e:
473-
log.exception('AssertionError raised unpacking messageset: %s', e)
474-
raise
475-
476470
def __iter__(self): # pylint: disable=non-iterator-returned
477471
return self
478472

kafka/producer/kafka.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -546,8 +546,6 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
546546
self._ensure_valid_record_size(message_size)
547547

548548
tp = TopicPartition(topic, partition)
549-
if timestamp_ms is None:
550-
timestamp_ms = int(time.time() * 1000)
551549
log.debug("Sending (key=%r value=%r) to %s", key, value, tp)
552550
result = self._accumulator.append(tp, timestamp_ms,
553551
key_bytes, value_bytes,

kafka/record/abc.py

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,28 +36,21 @@ def checksum(self):
3636
be the checksum for v0 and v1 and None for v2 and above.
3737
"""
3838

39-
@abc.abstractproperty
40-
def headers(self):
41-
""" If supported by version list of key-value tuples, or empty list if
42-
not supported by format.
43-
"""
44-
4539

4640
class ABCRecordBatchBuilder(object):
4741
__metaclass__ = abc.ABCMeta
4842

4943
@abc.abstractmethod
50-
def append(self, offset, timestamp, key, value, headers):
44+
def append(self, offset, timestamp, key, value):
5145
""" Writes record to internal buffer.
5246
5347
Arguments:
5448
offset (int): Relative offset of record, starting from 0
55-
timestamp (int): Timestamp in milliseconds since beginning of the
56-
epoch (midnight Jan 1, 1970 (UTC))
49+
timestamp (int or None): Timestamp in milliseconds since beginning
50+
of the epoch (midnight Jan 1, 1970 (UTC)). If omited, will be
51+
set to current time.
5752
key (bytes or None): Key of the record
5853
value (bytes or None): Value of the record
59-
headers (List[Tuple[str, bytes]]): Headers of the record. Header
60-
keys can not be ``None``.
6154
6255
Returns:
6356
(bytes, int): Checksum of the written record (or None for v2 and
@@ -74,10 +67,10 @@ def size_in_bytes(self, offset, timestamp, key, value, headers):
7467
@abc.abstractmethod
7568
def build(self):
7669
""" Close for append, compress if needed, write size and header and
77-
return a ready to send bytes object.
70+
return a ready to send buffer object.
7871
7972
Return:
80-
io.BytesIO: finished batch, ready to send.
73+
bytearray: finished batch, ready to send.
8174
"""
8275

8376

@@ -105,7 +98,7 @@ def __init__(self, buffer):
10598

10699
@abc.abstractmethod
107100
def size_in_bytes(self):
108-
""" Returns the size of buffer.
101+
""" Returns the size of inner buffer.
109102
"""
110103

111104
@abc.abstractmethod

0 commit comments

Comments
 (0)