Skip to content

Commit 19300f5

Browse files
committed
KIP-98: Add offsets support to transactional KafkaProducer
1 parent 369478a commit 19300f5

File tree

4 files changed

+306
-10
lines changed

4 files changed

+306
-10
lines changed

kafka/producer/kafka.py

+30
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,36 @@ def begin_transaction(self):
686686
raise Errors.IllegalStateError("Cannot use transactional methods without enabling transactions")
687687
self._transaction_manager.begin_transaction()
688688

689+
def send_offsets_to_transaction(self, offsets, consumer_group_id):
690+
"""
691+
Sends a list of consumed offsets to the consumer group coordinator, and also marks
692+
those offsets as part of the current transaction. These offsets will be considered
693+
consumed only if the transaction is committed successfully.
694+
695+
This method should be used when you need to batch consumed and produced messages
696+
together, typically in a consume-transform-produce pattern.
697+
698+
Arguments:
699+
offsets ({TopicPartition: OffsetAndMetadata}): map of topic-partition -> offsets to commit
700+
as part of current transaction.
701+
consumer_group_id (str): Name of consumer group for offsets commit.
702+
703+
Raises:
704+
IllegalStateError: if no transactional_id, or transaction has not been started.
705+
ProducerFencedError: fatal error indicating another producer with the same transactional_id is active.
706+
UnsupportedVersionError: fatal error indicating the broker does not support transactions (i.e. if < 0.11).
707+
UnsupportedForMessageFormatError: fatal error indicating the message format used for the offsets
708+
topic on the broker does not support transactions.
709+
AuthorizationError: fatal error indicating that the configured transactional_id is not authorized.
710+
KafkaErro:r if the producer has encountered a previous fatal or abortable error, or for any
711+
other unexpected error
712+
"""
713+
if not self._transaction_manager:
714+
raise Errors.IllegalStateError("Cannot use transactional methods without enabling transactions")
715+
result = self._transaction_manager.send_offsets_to_transaction(offsets, consumer_group_id)
716+
self._sender.wakeup()
717+
result.wait()
718+
689719
def commit_transaction(self):
690720
""" Commits the ongoing transaction.
691721

kafka/producer/transaction_manager.py

+169-8
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
from kafka.vendor.enum34 import IntEnum
1717

1818
import kafka.errors as Errors
19+
from kafka.protocol.add_offsets_to_txn import AddOffsetsToTxnRequest
1920
from kafka.protocol.add_partitions_to_txn import AddPartitionsToTxnRequest
2021
from kafka.protocol.end_txn import EndTxnRequest
2122
from kafka.protocol.find_coordinator import FindCoordinatorRequest
2223
from kafka.protocol.init_producer_id import InitProducerIdRequest
24+
from kafka.protocol.txn_offset_commit import TxnOffsetCommitRequest
2325
from kafka.structs import TopicPartition
2426

2527

@@ -115,6 +117,7 @@ def __init__(self, transactional_id=None, transaction_timeout_ms=0, retry_backof
115117
self._new_partitions_in_transaction = set()
116118
self._pending_partitions_in_transaction = set()
117119
self._partitions_in_transaction = set()
120+
self._pending_txn_offset_commits = dict()
118121

119122
self._current_state = TransactionState.UNINITIALIZED
120123
self._last_error = None
@@ -169,10 +172,22 @@ def begin_abort(self):
169172
def _begin_completing_transaction(self, committed):
170173
if self._new_partitions_in_transaction:
171174
self._enqueue_request(self._add_partitions_to_transaction_handler())
172-
handler = EndTxnHandler(self, self.transactional_id, self.producer_id_and_epoch.producer_id, self.producer_id_and_epoch.epoch, committed)
175+
handler = EndTxnHandler(self, self.transactional_id, self.producer_id_and_epoch, committed)
173176
self._enqueue_request(handler)
174177
return handler.result
175178

179+
def send_offsets_to_transaction(self, offsets, consumer_group_id):
180+
with self._lock:
181+
self._ensure_transactional()
182+
self._maybe_fail_with_error()
183+
if self._current_state != TransactionState.IN_TRANSACTION:
184+
raise Errors.KafkaError("Cannot send offsets to transaction because the producer is not in an active transaction")
185+
186+
log.debug("Begin adding offsets %s for consumer group %s to transaction", offsets, consumer_group_id)
187+
handler = AddOffsetsToTxnHandler(self, self.transactional_id, self.producer_id_and_epoch, consumer_group_id, offsets)
188+
self._enqueue_request(handler)
189+
return handler.result
190+
176191
def maybe_add_partition_to_transaction(self, topic_partition):
177192
with self._lock:
178193
self._fail_if_not_ready_for_send()
@@ -389,6 +404,10 @@ def _test_transaction_contains_partition(self, tp):
389404
with self._lock:
390405
return tp in self._partitions_in_transaction
391406

407+
# visible for testing
408+
def _test_has_pending_offset_commits(self):
409+
return bool(self._pending_txn_offset_commits)
410+
392411
# visible for testing
393412
def _test_has_ongoing_transaction(self):
394413
with self._lock:
@@ -473,7 +492,7 @@ def _add_partitions_to_transaction_handler(self):
473492
with self._lock:
474493
self._pending_partitions_in_transaction.update(self._new_partitions_in_transaction)
475494
self._new_partitions_in_transaction.clear()
476-
return AddPartitionsToTxnHandler(self, self.transactional_id, self.producer_id_and_epoch.producer_id, self.producer_id_and_epoch.epoch, self._pending_partitions_in_transaction)
495+
return AddPartitionsToTxnHandler(self, self.transactional_id, self.producer_id_and_epoch, self._pending_partitions_in_transaction)
477496

478497

479498
class TransactionalRequestResult(object):
@@ -619,7 +638,7 @@ def handle_response(self, response):
619638
self.fatal_error(Errors.KafkaError("Unexpected error in InitProducerIdResponse: %s" % (error())))
620639

621640
class AddPartitionsToTxnHandler(TxnRequestHandler):
622-
def __init__(self, transaction_manager, transactional_id, producer_id, producer_epoch, topic_partitions):
641+
def __init__(self, transaction_manager, transactional_id, producer_id_and_epoch, topic_partitions):
623642
super(AddPartitionsToTxnHandler, self).__init__(transaction_manager)
624643

625644
self.transactional_id = transactional_id
@@ -634,8 +653,8 @@ def __init__(self, transaction_manager, transactional_id, producer_id, producer_
634653
topic_data[tp.topic].append(tp.partition)
635654
self.request = AddPartitionsToTxnRequest[version](
636655
transactional_id=transactional_id,
637-
producer_id=producer_id,
638-
producer_epoch=producer_epoch,
656+
producer_id=producer_id_and_epoch.producer_id,
657+
producer_epoch=producer_id_and_epoch.epoch,
639658
topics=list(topic_data.items()))
640659

641660
@property
@@ -771,7 +790,7 @@ def handle_response(self, response):
771790

772791

773792
class EndTxnHandler(TxnRequestHandler):
774-
def __init__(self, transaction_manager, transactional_id, producer_id, producer_epoch, committed):
793+
def __init__(self, transaction_manager, transactional_id, producer_id_and_epoch, committed):
775794
super(EndTxnHandler, self).__init__(transaction_manager)
776795

777796
self.transactional_id = transactional_id
@@ -783,8 +802,8 @@ def __init__(self, transaction_manager, transactional_id, producer_id, producer_
783802
version = 0
784803
self.request = EndTxnRequest[version](
785804
transactional_id=transactional_id,
786-
producer_id=producer_id,
787-
producer_epoch=producer_epoch,
805+
producer_id=producer_id_and_epoch.producer_id,
806+
producer_epoch=producer_id_and_epoch.epoch,
788807
committed=committed)
789808

790809
@property
@@ -810,3 +829,145 @@ def handle_response(self, response):
810829
self.fatal_error(error())
811830
else:
812831
self.fatal_error(Errors.KafkaError("Unhandled error in EndTxnResponse: %s" % (error())))
832+
833+
834+
class AddOffsetsToTxnHandler(TxnRequestHandler):
835+
def __init__(self, transaction_manager, transactional_id, producer_id_and_epoch, consumer_group_id, offsets):
836+
super(AddOffsetsToTxnHandler, self).__init__(transaction_manager)
837+
838+
self.transactional_id = transactional_id
839+
self.producer_id_and_epoch = producer_id_and_epoch
840+
self.consumer_group_id = consumer_group_id
841+
self.offsets = offsets
842+
if self.transaction_manager._api_version >= (2, 7):
843+
version = 2
844+
elif self.transaction_manager._api_version >= (2, 0):
845+
version = 1
846+
else:
847+
version = 0
848+
self.request = AddOffsetsToTxnRequest[version](
849+
transactional_id=transactional_id,
850+
producer_id=producer_id_and_epoch.producer_id,
851+
producer_epoch=producer_id_and_epoch.epoch,
852+
group_id=consumer_group_id)
853+
854+
@property
855+
def priority(self):
856+
return Priority.ADD_PARTITIONS_OR_OFFSETS
857+
858+
def handle_response(self, response):
859+
error = Errors.for_code(response.error_code)
860+
861+
if error is Errors.NoError:
862+
log.debug("Successfully added partition for consumer group %s to transaction", self.consumer_group_id)
863+
864+
# note the result is not completed until the TxnOffsetCommit returns
865+
for tp, offset in six.iteritems(self.offsets):
866+
self.transaction_manager._pending_txn_offset_commits[tp] = offset
867+
handler = TxnOffsetCommitHandler(self.transaction_manager, self.transactional_id, self.producer_id_and_epoch,
868+
self.consumer_group_id, self.transaction_manager._pending_txn_offset_commits, self._result)
869+
self.transaction_manager._enqueue_request(handler)
870+
self.transaction_manager._transaction_started = True
871+
elif error in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError):
872+
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
873+
self.reenqueue()
874+
elif error in (Errors.CoordinatorLoadInProgressError, Errors.ConcurrentTransactionsError):
875+
self.reenqueue()
876+
elif error is Errors.InvalidProducerEpochError:
877+
self.fatal_error(error())
878+
elif error is Errors.TransactionalIdAuthorizationFailedError:
879+
self.fatal_error(error())
880+
elif error is Errors.GroupAuthorizationFailedError:
881+
self.abortable_error(Errors.GroupAuthorizationError(self.consumer_group_id))
882+
else:
883+
self.fatal_error(Errors.KafkaError("Unexpected error in AddOffsetsToTxnResponse: %s" % (error())))
884+
885+
886+
class TxnOffsetCommitHandler(TxnRequestHandler):
887+
def __init__(self, transaction_manager, transactional_id, producer_id_and_epoch, consumer_group_id, offsets, result):
888+
super(TxnOffsetCommitHandler, self).__init__(transaction_manager, result=result)
889+
890+
self.transactional_id = transactional_id
891+
self.consumer_group_id = consumer_group_id
892+
self.producer_id_and_epoch = producer_id_and_epoch
893+
self.offsets = offsets
894+
self.request = self._build_request()
895+
896+
def _build_request(self):
897+
if self.transaction_manager._api_version >= (2, 1):
898+
version = 2
899+
elif self.transaction_manager._api_version >= (2, 0):
900+
version = 1
901+
else:
902+
version = 0
903+
904+
topic_data = collections.defaultdict(list)
905+
for tp, offset in six.iteritems(self.offsets):
906+
if version >= 2:
907+
partition_data = (tp.partition, offset.offset, offset.leader_epoch, offset.metadata)
908+
else:
909+
partition_data = (tp.partition, offset.offset, offset.metadata)
910+
topic_data[tp.topic].append(partition_data)
911+
912+
return TxnOffsetCommitRequest[version](
913+
transactional_id=self.transactional_id,
914+
group_id=self.consumer_group_id,
915+
producer_id=self.producer_id_and_epoch.producer_id,
916+
producer_epoch=self.producer_id_and_epoch.epoch,
917+
topics=list(topic_data.items()))
918+
919+
@property
920+
def priority(self):
921+
return Priority.ADD_PARTITIONS_OR_OFFSETS
922+
923+
@property
924+
def coordinator_type(self):
925+
return 'group'
926+
927+
@property
928+
def coordinator_key(self):
929+
return self.consumer_group_id
930+
931+
def handle_response(self, response):
932+
lookup_coordinator = False
933+
retriable_failure = False
934+
935+
errors = {TopicPartition(topic, partition): Errors.for_code(error_code)
936+
for topic, partition_data in response.topics
937+
for partition, error_code in partition_data}
938+
939+
for tp, error in six.iteritems(errors):
940+
if error is Errors.NoError:
941+
log.debug("Successfully added offsets for %s from consumer group %s to transaction.",
942+
tp, self.consumer_group_id)
943+
del self.transaction_manager._pending_txn_offset_commits[tp]
944+
elif error in (errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError, Errors.RequestTimedOutError):
945+
retriable_failure = True
946+
lookup_coordinator = True
947+
elif error is Errors.UnknownTopicOrPartitionError:
948+
retriable_failure = True
949+
elif error is Errors.GroupAuthorizationFailedError:
950+
self.abortable_error(Errors.GroupAuthorizationError(self.consumer_group_id))
951+
return
952+
elif error in (Errors.TransactionalIdAuthorizationFailedError,
953+
Errors.InvalidProducerEpochError,
954+
Errors.UnsupportedForMessageFormatError):
955+
self.fatal_error(error())
956+
return
957+
else:
958+
self.fatal_error(Errors.KafkaError("Unexpected error in TxnOffsetCommitResponse: %s" % (error())))
959+
return
960+
961+
if lookup_coordinator:
962+
self.transaction_manager._lookup_coordinator('group', self.consumer_group_id)
963+
964+
if not retriable_failure:
965+
# all attempted partitions were either successful, or there was a fatal failure.
966+
# either way, we are not retrying, so complete the request.
967+
self.result.done()
968+
969+
# retry the commits which failed with a retriable error.
970+
elif self.transaction_manager._pending_txn_offset_commits:
971+
self.offsets = self.transaction_manager._pending_txn_offset_commits
972+
self.request = self._build_request()
973+
self.reenqueue()

kafka/protocol/txn_offset_commit.py

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.protocol.api import Request, Response
4+
from kafka.protocol.types import Array, Int16, Int32, Int64, Schema, String
5+
6+
7+
class TxnOffsetCommitResponse_v0(Response):
8+
API_KEY = 28
9+
API_VERSION = 0
10+
SCHEMA = Schema(
11+
('throttle_time_ms', Int32),
12+
('topics', Array(
13+
('topic', String('utf-8')),
14+
('partitions', Array(
15+
('partition', Int32),
16+
('error_code', Int16))))))
17+
18+
19+
class TxnOffsetCommitResponse_v1(Response):
20+
API_KEY = 28
21+
API_VERSION = 1
22+
SCHEMA = TxnOffsetCommitResponse_v0.SCHEMA
23+
24+
25+
class TxnOffsetCommitResponse_v2(Response):
26+
API_KEY = 28
27+
API_VERSION = 2
28+
SCHEMA = TxnOffsetCommitResponse_v1.SCHEMA
29+
30+
31+
class TxnOffsetCommitRequest_v0(Request):
32+
API_KEY = 28
33+
API_VERSION = 0
34+
RESPONSE_TYPE = TxnOffsetCommitResponse_v0
35+
SCHEMA = Schema(
36+
('transactional_id', String('utf-8')),
37+
('group_id', String('utf-8')),
38+
('producer_id', Int64),
39+
('producer_epoch', Int16),
40+
('topics', Array(
41+
('topic', String('utf-8')),
42+
('partitions', Array(
43+
('partition', Int32),
44+
('offset', Int64),
45+
('metadata', String('utf-8')))))))
46+
47+
48+
class TxnOffsetCommitRequest_v1(Request):
49+
API_KEY = 28
50+
API_VERSION = 1
51+
RESPONSE_TYPE = TxnOffsetCommitResponse_v1
52+
SCHEMA = TxnOffsetCommitRequest_v0.SCHEMA
53+
54+
55+
class TxnOffsetCommitRequest_v2(Request):
56+
API_KEY = 28
57+
API_VERSION = 2
58+
RESPONSE_TYPE = TxnOffsetCommitResponse_v2
59+
SCHEMA = Schema(
60+
('transactional_id', String('utf-8')),
61+
('group_id', String('utf-8')),
62+
('producer_id', Int64),
63+
('producer_epoch', Int16),
64+
('topics', Array(
65+
('topic', String('utf-8')),
66+
('partitions', Array(
67+
('partition', Int32),
68+
('offset', Int64),
69+
('leader_epoch', Int32),
70+
('metadata', String('utf-8')))))))
71+
72+
73+
TxnOffsetCommitRequest = [
74+
TxnOffsetCommitRequest_v0, TxnOffsetCommitRequest_v1, TxnOffsetCommitRequest_v2,
75+
]
76+
TxnOffsetCommitResponse = [
77+
TxnOffsetCommitResponse_v0, TxnOffsetCommitResponse_v1, TxnOffsetCommitResponse_v2,
78+
]

0 commit comments

Comments
 (0)