File tree 1 file changed +2
-1
lines changed 1 file changed +2
-1
lines changed Original file line number Diff line number Diff line change 11
11
from kafka import errors as Errors
12
12
from kafka .metrics .measurable import AnonMeasurable
13
13
from kafka .metrics .stats import Avg , Max , Rate
14
+ from kafka .producer .transaction_manager import ProducerIdAndEpoch
14
15
from kafka .protocol .init_producer_id import InitProducerIdRequest
15
16
from kafka .protocol .produce import ProduceRequest
16
17
from kafka .structs import TopicPartition
@@ -317,7 +318,7 @@ def _maybe_wait_for_producer_id(self):
317
318
response = self ._client .send_and_receive (node_id , request )
318
319
error_type = Errors .for_code (response .error_code )
319
320
if error_type is Errors .NoError :
320
- self ._transaction_manager .set_producer_id_and_epoch (response .producer_id , response .producer_epoch )
321
+ self ._transaction_manager .set_producer_id_and_epoch (ProducerIdAndEpoch ( response .producer_id , response .producer_epoch ) )
321
322
elif getattr (error_type , 'retriable' , False ):
322
323
log .debug ("Retriable error from InitProducerId response: %s" , error_type .__name__ )
323
324
if getattr (error_type , 'invalid_metadata' , False ):
You can’t perform that action at this time.
0 commit comments