Skip to content

Commit 4122c1f

Browse files
authored
KIP-98: Transactional Producer (#2587)
1 parent 614b059 commit 4122c1f

12 files changed

+1318
-217
lines changed

kafka/conn.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -934,7 +934,8 @@ def close(self, error=None):
934934
if self.state is ConnectionStates.DISCONNECTED:
935935
return
936936
log.log(logging.ERROR if error else logging.INFO, '%s: Closing connection. %s', self, error or '')
937-
self._update_reconnect_backoff()
937+
if error:
938+
self._update_reconnect_backoff()
938939
self._api_versions_future = None
939940
self._sasl_auth_future = None
940941
self._init_sasl_mechanism()

kafka/producer/kafka.py

+110-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
2020
from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator
2121
from kafka.producer.sender import Sender
22-
from kafka.producer.transaction_state import TransactionState
22+
from kafka.producer.transaction_manager import TransactionManager
2323
from kafka.record.default_records import DefaultRecordBatchBuilder
2424
from kafka.record.legacy_records import LegacyRecordBatchBuilder
2525
from kafka.serializer import Serializer
@@ -318,6 +318,8 @@ class KafkaProducer(object):
318318
'key_serializer': None,
319319
'value_serializer': None,
320320
'enable_idempotence': False,
321+
'transactional_id': None,
322+
'transaction_timeout_ms': 60000,
321323
'acks': 1,
322324
'bootstrap_topics_filter': set(),
323325
'compression_type': None,
@@ -444,9 +446,30 @@ def __init__(self, **configs):
444446
assert checker(), "Libraries for {} compression codec not found".format(ct)
445447
self.config['compression_attrs'] = compression_attrs
446448

447-
self._transaction_state = None
449+
self._metadata = client.cluster
450+
self._transaction_manager = None
451+
self._init_transactions_result = None
452+
if 'enable_idempotence' in user_provided_configs and not self.config['enable_idempotence'] and self.config['transactional_id']:
453+
raise Errors.KafkaConfigurationError("Cannot set transactional_id without enable_idempotence.")
454+
455+
if self.config['transactional_id']:
456+
self.config['enable_idempotence'] = True
457+
448458
if self.config['enable_idempotence']:
449-
self._transaction_state = TransactionState()
459+
assert self.config['api_version'] >= (0, 11), "Transactional/Idempotent producer requires >= Kafka 0.11 Brokers"
460+
461+
self._transaction_manager = TransactionManager(
462+
transactional_id=self.config['transactional_id'],
463+
transaction_timeout_ms=self.config['transaction_timeout_ms'],
464+
retry_backoff_ms=self.config['retry_backoff_ms'],
465+
api_version=self.config['api_version'],
466+
metadata=self._metadata,
467+
)
468+
if self._transaction_manager.is_transactional():
469+
log.info("Instantiated a transactional producer.")
470+
else:
471+
log.info("Instantiated an idempotent producer.")
472+
450473
if 'retries' not in user_provided_configs:
451474
log.info("Overriding the default 'retries' config to 3 since the idempotent producer is enabled.")
452475
self.config['retries'] = 3
@@ -470,15 +493,14 @@ def __init__(self, **configs):
470493

471494
message_version = self.max_usable_produce_magic(self.config['api_version'])
472495
self._accumulator = RecordAccumulator(
473-
transaction_state=self._transaction_state,
496+
transaction_manager=self._transaction_manager,
474497
message_version=message_version,
475498
**self.config)
476-
self._metadata = client.cluster
477499
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
478500
self._sender = Sender(client, self._metadata,
479501
self._accumulator,
480502
metrics=self._metrics,
481-
transaction_state=self._transaction_state,
503+
transaction_manager=self._transaction_manager,
482504
guarantee_message_order=guarantee_message_order,
483505
**self.config)
484506
self._sender.daemon = True
@@ -610,6 +632,84 @@ def _estimate_size_in_bytes(self, key, value, headers=[]):
610632
return LegacyRecordBatchBuilder.estimate_size_in_bytes(
611633
magic, self.config['compression_type'], key, value)
612634

635+
def init_transactions(self):
636+
"""
637+
Needs to be called before any other methods when the transactional.id is set in the configuration.
638+
639+
This method does the following:
640+
1. Ensures any transactions initiated by previous instances of the producer with the same
641+
transactional_id are completed. If the previous instance had failed with a transaction in
642+
progress, it will be aborted. If the last transaction had begun completion,
643+
but not yet finished, this method awaits its completion.
644+
2. Gets the internal producer id and epoch, used in all future transactional
645+
messages issued by the producer.
646+
647+
Note that this method will raise KafkaTimeoutError if the transactional state cannot
648+
be initialized before expiration of `max_block_ms`.
649+
650+
Retrying after a KafkaTimeoutError will continue to wait for the prior request to succeed or fail.
651+
Retrying after any other exception will start a new initialization attempt.
652+
Retrying after a successful initialization will do nothing.
653+
654+
Raises:
655+
IllegalStateError: if no transactional_id has been configured
656+
AuthorizationError: fatal error indicating that the configured
657+
transactional_id is not authorized.
658+
KafkaError: if the producer has encountered a previous fatal error or for any other unexpected error
659+
KafkaTimeoutError: if the time taken for initialize the transaction has surpassed `max.block.ms`.
660+
"""
661+
if not self._transaction_manager:
662+
raise Errors.IllegalStateError("Cannot call init_transactions without setting a transactional_id.")
663+
if self._init_transactions_result is None:
664+
self._init_transactions_result = self._transaction_manager.initialize_transactions()
665+
self._sender.wakeup()
666+
667+
try:
668+
if not self._init_transactions_result.wait(timeout_ms=self.config['max_block_ms']):
669+
raise Errors.KafkaTimeoutError("Timeout expired while initializing transactional state in %s ms." % (self.config['max_block_ms'],))
670+
finally:
671+
if self._init_transactions_result.failed:
672+
self._init_transactions_result = None
673+
674+
def begin_transaction(self):
675+
""" Should be called before the start of each new transaction.
676+
677+
Note that prior to the first invocation of this method,
678+
you must invoke `init_transactions()` exactly one time.
679+
680+
Raises:
681+
ProducerFencedError if another producer is with the same
682+
transactional_id is active.
683+
"""
684+
# Set the transactional bit in the producer.
685+
if not self._transaction_manager:
686+
raise Errors.IllegalStateError("Cannot use transactional methods without enabling transactions")
687+
self._transaction_manager.begin_transaction()
688+
689+
def commit_transaction(self):
690+
""" Commits the ongoing transaction.
691+
692+
Raises: ProducerFencedError if another producer with the same
693+
transactional_id is active.
694+
"""
695+
if not self._transaction_manager:
696+
raise Errors.IllegalStateError("Cannot commit transaction since transactions are not enabled")
697+
result = self._transaction_manager.begin_commit()
698+
self._sender.wakeup()
699+
result.wait()
700+
701+
def abort_transaction(self):
702+
""" Aborts the ongoing transaction.
703+
704+
Raises: ProducerFencedError if another producer with the same
705+
transactional_id is active.
706+
"""
707+
if not self._transaction_manager:
708+
raise Errors.IllegalStateError("Cannot abort transaction since transactions are not enabled.")
709+
result = self._transaction_manager.begin_abort()
710+
self._sender.wakeup()
711+
result.wait()
712+
613713
def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
614714
"""Publish a message to a topic.
615715
@@ -687,6 +787,10 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
687787

688788
tp = TopicPartition(topic, partition)
689789
log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp)
790+
791+
if self._transaction_manager and self._transaction_manager.is_transactional():
792+
self._transaction_manager.maybe_add_partition_to_transaction(tp)
793+
690794
result = self._accumulator.append(tp, timestamp_ms,
691795
key_bytes, value_bytes, headers)
692796
future, batch_is_full, new_batch_created = result

kafka/producer/record_accumulator.py

+52-13
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ def record_count(self):
5656
def producer_id(self):
5757
return self.records.producer_id if self.records else None
5858

59+
@property
60+
def producer_epoch(self):
61+
return self.records.producer_epoch if self.records else None
62+
63+
@property
64+
def has_sequence(self):
65+
return self.records.has_sequence if self.records else False
66+
5967
def try_append(self, timestamp_ms, key, value, headers, now=None):
6068
metadata = self.records.append(timestamp_ms, key, value, headers)
6169
if metadata is None:
@@ -170,7 +178,7 @@ class RecordAccumulator(object):
170178
'compression_attrs': 0,
171179
'linger_ms': 0,
172180
'retry_backoff_ms': 100,
173-
'transaction_state': None,
181+
'transaction_manager': None,
174182
'message_version': 0,
175183
}
176184

@@ -181,7 +189,7 @@ def __init__(self, **configs):
181189
self.config[key] = configs.pop(key)
182190

183191
self._closed = False
184-
self._transaction_state = self.config['transaction_state']
192+
self._transaction_manager = self.config['transaction_manager']
185193
self._flushes_in_progress = AtomicInteger()
186194
self._appends_in_progress = AtomicInteger()
187195
self._batches = collections.defaultdict(collections.deque) # TopicPartition: [ProducerBatch]
@@ -244,7 +252,7 @@ def append(self, tp, timestamp_ms, key, value, headers):
244252
batch_is_full = len(dq) > 1 or last.records.is_full()
245253
return future, batch_is_full, False
246254

247-
if self._transaction_state and self.config['message_version'] < 2:
255+
if self._transaction_manager and self.config['message_version'] < 2:
248256
raise Errors.UnsupportedVersionError("Attempting to use idempotence with a broker which"
249257
" does not support the required message format (v2)."
250258
" The broker must be version 0.11 or later.")
@@ -418,8 +426,8 @@ def ready(self, cluster, now=None):
418426

419427
return ready_nodes, next_ready_check, unknown_leaders_exist
420428

421-
def has_unsent(self):
422-
"""Return whether there is any unsent record in the accumulator."""
429+
def has_undrained(self):
430+
"""Check whether there are any batches which haven't been drained"""
423431
for tp in list(self._batches.keys()):
424432
with self._tp_locks[tp]:
425433
dq = self._batches[tp]
@@ -479,8 +487,10 @@ def drain(self, cluster, nodes, max_size, now=None):
479487
break
480488
else:
481489
producer_id_and_epoch = None
482-
if self._transaction_state:
483-
producer_id_and_epoch = self._transaction_state.producer_id_and_epoch
490+
if self._transaction_manager:
491+
if not self._transaction_manager.is_send_to_partition_allowed(tp):
492+
break
493+
producer_id_and_epoch = self._transaction_manager.producer_id_and_epoch
484494
if not producer_id_and_epoch.is_valid:
485495
# we cannot send the batch until we have refreshed the PID
486496
log.debug("Waiting to send ready batches because transaction producer id is not valid")
@@ -493,11 +503,16 @@ def drain(self, cluster, nodes, max_size, now=None):
493503
# the previous attempt may actually have been accepted, and if we change
494504
# the pid and sequence here, this attempt will also be accepted, causing
495505
# a duplicate.
496-
sequence_number = self._transaction_state.sequence_number(batch.topic_partition)
506+
sequence_number = self._transaction_manager.sequence_number(batch.topic_partition)
497507
log.debug("Dest: %s: %s producer_id=%s epoch=%s sequence=%s",
498508
node_id, batch.topic_partition, producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch,
499509
sequence_number)
500-
batch.records.set_producer_state(producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch, sequence_number)
510+
batch.records.set_producer_state(
511+
producer_id_and_epoch.producer_id,
512+
producer_id_and_epoch.epoch,
513+
sequence_number,
514+
self._transaction_manager.is_transactional()
515+
)
501516
batch.records.close()
502517
size += batch.records.size_in_bytes()
503518
ready.append(batch)
@@ -544,6 +559,10 @@ def await_flush_completion(self, timeout=None):
544559
finally:
545560
self._flushes_in_progress.decrement()
546561

562+
@property
563+
def has_incomplete(self):
564+
return bool(self._incomplete)
565+
547566
def abort_incomplete_batches(self):
548567
"""
549568
This function is only called when sender is closed forcefully. It will fail all the
@@ -553,27 +572,41 @@ def abort_incomplete_batches(self):
553572
# 1. Avoid losing batches.
554573
# 2. Free up memory in case appending threads are blocked on buffer full.
555574
# This is a tight loop but should be able to get through very quickly.
575+
error = Errors.IllegalStateError("Producer is closed forcefully.")
556576
while True:
557-
self._abort_batches()
577+
self._abort_batches(error)
558578
if not self._appends_in_progress.get():
559579
break
560580
# After this point, no thread will append any messages because they will see the close
561581
# flag set. We need to do the last abort after no thread was appending in case the there was a new
562582
# batch appended by the last appending thread.
563-
self._abort_batches()
583+
self._abort_batches(error)
564584
self._batches.clear()
565585

566-
def _abort_batches(self):
586+
def _abort_batches(self, error):
567587
"""Go through incomplete batches and abort them."""
568-
error = Errors.IllegalStateError("Producer is closed forcefully.")
569588
for batch in self._incomplete.all():
570589
tp = batch.topic_partition
571590
# Close the batch before aborting
572591
with self._tp_locks[tp]:
573592
batch.records.close()
593+
self._batches[tp].remove(batch)
574594
batch.done(exception=error)
575595
self.deallocate(batch)
576596

597+
def abort_undrained_batches(self, error):
598+
for batch in self._incomplete.all():
599+
tp = batch.topic_partition
600+
with self._tp_locks[tp]:
601+
aborted = False
602+
if not batch.is_done:
603+
aborted = True
604+
batch.records.close()
605+
self._batches[tp].remove(batch)
606+
if aborted:
607+
batch.done(exception=error)
608+
self.deallocate(batch)
609+
577610
def close(self):
578611
"""Close this accumulator and force all the record buffers to be drained."""
579612
self._closed = True
@@ -600,3 +633,9 @@ def remove(self, batch):
600633
def all(self):
601634
with self._lock:
602635
return list(self._incomplete)
636+
637+
def __bool__(self):
638+
return bool(self._incomplete)
639+
640+
641+
__nonzero__ = __bool__

0 commit comments

Comments
 (0)