Skip to content

Commit 2077b71

Browse files
committed
use TxnRequestHandler properties for transaction manager pass throughs
1 parent 19300f5 commit 2077b71

File tree

1 file changed

+35
-30
lines changed

1 file changed

+35
-30
lines changed

kafka/producer/transaction_manager.py

+35-30
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def initialize_transactions(self):
141141
self._transition_to(TransactionState.INITIALIZING)
142142
self.set_producer_id_and_epoch(ProducerIdAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH))
143143
self._sequence_numbers.clear()
144-
handler = InitProducerIdHandler(self, self.transactional_id, self.transaction_timeout_ms)
144+
handler = InitProducerIdHandler(self, self.transaction_timeout_ms)
145145
self._enqueue_request(handler)
146146
return handler.result
147147

@@ -172,7 +172,7 @@ def begin_abort(self):
172172
def _begin_completing_transaction(self, committed):
173173
if self._new_partitions_in_transaction:
174174
self._enqueue_request(self._add_partitions_to_transaction_handler())
175-
handler = EndTxnHandler(self, self.transactional_id, self.producer_id_and_epoch, committed)
175+
handler = EndTxnHandler(self, committed)
176176
self._enqueue_request(handler)
177177
return handler.result
178178

@@ -184,7 +184,7 @@ def send_offsets_to_transaction(self, offsets, consumer_group_id):
184184
raise Errors.KafkaError("Cannot send offsets to transaction because the producer is not in an active transaction")
185185

186186
log.debug("Begin adding offsets %s for consumer group %s to transaction", offsets, consumer_group_id)
187-
handler = AddOffsetsToTxnHandler(self, self.transactional_id, self.producer_id_and_epoch, consumer_group_id, offsets)
187+
handler = AddOffsetsToTxnHandler(self, consumer_group_id, offsets)
188188
self._enqueue_request(handler)
189189
return handler.result
190190

@@ -492,7 +492,7 @@ def _add_partitions_to_transaction_handler(self):
492492
with self._lock:
493493
self._pending_partitions_in_transaction.update(self._new_partitions_in_transaction)
494494
self._new_partitions_in_transaction.clear()
495-
return AddPartitionsToTxnHandler(self, self.transactional_id, self.producer_id_and_epoch, self._pending_partitions_in_transaction)
495+
return AddPartitionsToTxnHandler(self, self._pending_partitions_in_transaction)
496496

497497

498498
class TransactionalRequestResult(object):
@@ -537,6 +537,18 @@ def __init__(self, transaction_manager, result=None):
537537
self._result = result or TransactionalRequestResult()
538538
self._is_retry = False
539539

540+
@property
541+
def transactional_id(self):
542+
return self.transaction_manager.transactional_id
543+
544+
@property
545+
def producer_id(self):
546+
return self.transaction_manager.producer_id_and_epoch.producer_id
547+
548+
@property
549+
def producer_epoch(self):
550+
return self.transaction_manager.producer_id_and_epoch.epoch
551+
540552
def fatal_error(self, exc):
541553
self.transaction_manager._transition_to_fatal_error(exc)
542554
self._result.done(error=exc)
@@ -604,16 +616,15 @@ def priority(self):
604616

605617

606618
class InitProducerIdHandler(TxnRequestHandler):
607-
def __init__(self, transaction_manager, transactional_id, transaction_timeout_ms):
619+
def __init__(self, transaction_manager, transaction_timeout_ms):
608620
super(InitProducerIdHandler, self).__init__(transaction_manager)
609621

610-
self.transactional_id = transactional_id
611622
if transaction_manager._api_version >= (2, 0):
612623
version = 1
613624
else:
614625
version = 0
615626
self.request = InitProducerIdRequest[version](
616-
transactional_id=transactional_id,
627+
transactional_id=self.transactional_id,
617628
transaction_timeout_ms=transaction_timeout_ms)
618629

619630
@property
@@ -638,10 +649,9 @@ def handle_response(self, response):
638649
self.fatal_error(Errors.KafkaError("Unexpected error in InitProducerIdResponse: %s" % (error())))
639650

640651
class AddPartitionsToTxnHandler(TxnRequestHandler):
641-
def __init__(self, transaction_manager, transactional_id, producer_id_and_epoch, topic_partitions):
652+
def __init__(self, transaction_manager, topic_partitions):
642653
super(AddPartitionsToTxnHandler, self).__init__(transaction_manager)
643654

644-
self.transactional_id = transactional_id
645655
if transaction_manager._api_version >= (2, 7):
646656
version = 2
647657
elif transaction_manager._api_version >= (2, 0):
@@ -652,9 +662,9 @@ def __init__(self, transaction_manager, transactional_id, producer_id_and_epoch,
652662
for tp in topic_partitions:
653663
topic_data[tp.topic].append(tp.partition)
654664
self.request = AddPartitionsToTxnRequest[version](
655-
transactional_id=transactional_id,
656-
producer_id=producer_id_and_epoch.producer_id,
657-
producer_epoch=producer_id_and_epoch.epoch,
665+
transactional_id=self.transactional_id,
666+
producer_id=self.producer_id,
667+
producer_epoch=self.producer_epoch,
658668
topics=list(topic_data.items()))
659669

660670
@property
@@ -790,20 +800,19 @@ def handle_response(self, response):
790800

791801

792802
class EndTxnHandler(TxnRequestHandler):
793-
def __init__(self, transaction_manager, transactional_id, producer_id_and_epoch, committed):
803+
def __init__(self, transaction_manager, committed):
794804
super(EndTxnHandler, self).__init__(transaction_manager)
795805

796-
self.transactional_id = transactional_id
797806
if self.transaction_manager._api_version >= (2, 7):
798807
version = 2
799808
elif self.transaction_manager._api_version >= (2, 0):
800809
version = 1
801810
else:
802811
version = 0
803812
self.request = EndTxnRequest[version](
804-
transactional_id=transactional_id,
805-
producer_id=producer_id_and_epoch.producer_id,
806-
producer_epoch=producer_id_and_epoch.epoch,
813+
transactional_id=self.transactional_id,
814+
producer_id=self.producer_id,
815+
producer_epoch=self.producer_epoch,
807816
committed=committed)
808817

809818
@property
@@ -832,11 +841,9 @@ def handle_response(self, response):
832841

833842

834843
class AddOffsetsToTxnHandler(TxnRequestHandler):
835-
def __init__(self, transaction_manager, transactional_id, producer_id_and_epoch, consumer_group_id, offsets):
844+
def __init__(self, transaction_manager, consumer_group_id, offsets):
836845
super(AddOffsetsToTxnHandler, self).__init__(transaction_manager)
837846

838-
self.transactional_id = transactional_id
839-
self.producer_id_and_epoch = producer_id_and_epoch
840847
self.consumer_group_id = consumer_group_id
841848
self.offsets = offsets
842849
if self.transaction_manager._api_version >= (2, 7):
@@ -846,9 +853,9 @@ def __init__(self, transaction_manager, transactional_id, producer_id_and_epoch,
846853
else:
847854
version = 0
848855
self.request = AddOffsetsToTxnRequest[version](
849-
transactional_id=transactional_id,
850-
producer_id=producer_id_and_epoch.producer_id,
851-
producer_epoch=producer_id_and_epoch.epoch,
856+
transactional_id=self.transactional_id,
857+
producer_id=self.producer_id,
858+
producer_epoch=self.producer_epoch,
852859
group_id=consumer_group_id)
853860

854861
@property
@@ -864,8 +871,8 @@ def handle_response(self, response):
864871
# note the result is not completed until the TxnOffsetCommit returns
865872
for tp, offset in six.iteritems(self.offsets):
866873
self.transaction_manager._pending_txn_offset_commits[tp] = offset
867-
handler = TxnOffsetCommitHandler(self.transaction_manager, self.transactional_id, self.producer_id_and_epoch,
868-
self.consumer_group_id, self.transaction_manager._pending_txn_offset_commits, self._result)
874+
handler = TxnOffsetCommitHandler(self.transaction_manager, self.consumer_group_id,
875+
self.transaction_manager._pending_txn_offset_commits, self._result)
869876
self.transaction_manager._enqueue_request(handler)
870877
self.transaction_manager._transaction_started = True
871878
elif error in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError):
@@ -884,12 +891,10 @@ def handle_response(self, response):
884891

885892

886893
class TxnOffsetCommitHandler(TxnRequestHandler):
887-
def __init__(self, transaction_manager, transactional_id, producer_id_and_epoch, consumer_group_id, offsets, result):
894+
def __init__(self, transaction_manager, consumer_group_id, offsets, result):
888895
super(TxnOffsetCommitHandler, self).__init__(transaction_manager, result=result)
889896

890-
self.transactional_id = transactional_id
891897
self.consumer_group_id = consumer_group_id
892-
self.producer_id_and_epoch = producer_id_and_epoch
893898
self.offsets = offsets
894899
self.request = self._build_request()
895900

@@ -912,8 +917,8 @@ def _build_request(self):
912917
return TxnOffsetCommitRequest[version](
913918
transactional_id=self.transactional_id,
914919
group_id=self.consumer_group_id,
915-
producer_id=self.producer_id_and_epoch.producer_id,
916-
producer_epoch=self.producer_id_and_epoch.epoch,
920+
producer_id=self.producer_id,
921+
producer_epoch=self.producer_epoch,
917922
topics=list(topic_data.items()))
918923

919924
@property

0 commit comments

Comments
 (0)