@@ -38,12 +38,14 @@ def __init__(self, produce_future, relative_offset, timestamp_ms):
38
38
produce_future .add_errback (self .failure )
39
39
40
40
def _produce_success (self , offset_and_timestamp ):
41
- base_offset , timestamp_ms = offset_and_timestamp
41
+ offset , timestamp_ms = offset_and_timestamp
42
42
if timestamp_ms is None :
43
43
timestamp_ms = self .timestamp_ms
44
- self .success (RecordMetadata (self ._produce_future .topic_partition ,
45
- base_offset , timestamp_ms ,
46
- self .relative_offset ))
44
+ if offset != - 1 and self .relative_offset is not None :
45
+ offset += self .relative_offset
46
+ tp = self ._produce_future .topic_partition
47
+ metadata = RecordMetadata (tp [0 ], tp [1 ], tp , offset , timestamp_ms )
48
+ self .success (metadata )
47
49
48
50
def get (self , timeout = None ):
49
51
if not self .is_done and not self ._produce_future .wait (timeout ):
@@ -55,18 +57,4 @@ def get(self, timeout=None):
55
57
return self .value
56
58
57
59
58
- class RecordMetadata (collections .namedtuple (
59
- 'RecordMetadata' , 'topic partition topic_partition offset timestamp' )):
60
- def __new__ (cls , tp , base_offset , timestamp , relative_offset = None ):
61
- offset = base_offset
62
- if relative_offset is not None and base_offset != - 1 :
63
- offset += relative_offset
64
- return super (RecordMetadata , cls ).__new__ (cls , tp .topic , tp .partition ,
65
- tp , offset , timestamp )
66
-
67
- def __str__ (self ):
68
- return 'RecordMetadata(topic=%s, partition=%s, offset=%s)' % (
69
- self .topic , self .partition , self .offset )
70
-
71
- def __repr__ (self ):
72
- return str (self )
60
+ RecordMetadata = collections .namedtuple ('RecordMetadata' , 'topic partition topic_partition offset timestamp' )
0 commit comments