Skip to content

GH-8778: Fix KafkaMessageSource deadlock #8780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -120,13 +121,13 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object>
*/
public static final String REMAINING_RECORDS = KafkaHeaders.PREFIX + "remainingRecords";

private final Lock lock = new ReentrantLock();

private final ConsumerFactory<K, V> consumerFactory;

private final KafkaAckCallbackFactory<K, V> ackCallbackFactory;

private final Lock consumerMonitor = new ReentrantLock();
private final Lock receiveLock = new ReentrantLock();

private final Lock consumerLock = new ReentrantLock();

private final Map<TopicPartition, Set<KafkaAckInfo<K, V>>> inflightRecords = new ConcurrentHashMap<>();

Expand All @@ -142,14 +143,20 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object>

private final Duration pollTimeout;

private final AtomicBoolean running = new AtomicBoolean();

private final AtomicBoolean pausing = new AtomicBoolean();

private final AtomicBoolean paused = new AtomicBoolean();

private final AtomicBoolean stopped = new AtomicBoolean();

private RecordMessageConverter messageConverter = new MessagingMessageConverter();

private Class<?> payloadType;

private boolean rawMessageHeader;

private boolean running;

private Duration closeTimeout = Duration.ofSeconds(DEFAULT_CLOSE_TIMEOUT);

private boolean checkNullKeyForExceptions;
Expand All @@ -158,14 +165,8 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object>

private volatile Consumer<K, V> consumer;

private volatile boolean pausing;

private volatile boolean paused;

private volatile Iterator<ConsumerRecord<K, V>> recordsIterator;

private volatile boolean stopped;

public volatile boolean newAssignment; // NOSONAR - direct access from inner

private ClassLoader classLoader;
Expand Down Expand Up @@ -409,104 +410,74 @@ private boolean maxPollStringGtr1(Object maxPoll) {

@Override
public boolean isRunning() {
this.lock.lock();
try {
return this.running;
}
finally {
this.lock.unlock();
}
return this.running.get();
}

@Override
public void start() {
this.lock.lock();
try {
this.running = true;
this.stopped = false;
}
finally {
this.lock.unlock();
if (this.running.compareAndSet(false, true)) {
this.stopped.set(false);
}
}

@Override
public void stop() {
this.lock.lock();
try {
if (this.running.compareAndSet(true, false)) {
stopConsumer();
this.running = false;
this.stopped = true;
}
finally {
this.lock.unlock();
this.stopped.set(true);
}
}

@Override
public void pause() {
this.lock.lock();
try {
this.pausing = true;
}
finally {
this.lock.unlock();
}
this.pausing.set(true);
}

@Override
public void resume() {
this.lock.lock();
try {
this.pausing = false;
}
finally {
this.lock.unlock();
}
this.pausing.set(false);
}

@Override
public boolean isPaused() {
return this.paused;
return this.paused.get();
}

@Override // NOSONAR - not so complex
protected Object doReceive() {
this.lock.lock();
this.receiveLock.lock();
try {

if (this.stopped) {
if (this.stopped.get()) {
this.logger.debug("Message source is stopped; no records will be returned");
return null;
}
if (this.consumer == null) {
createConsumer();
this.running = true;
}
if (this.pausing && !this.paused && !this.assignedPartitions.isEmpty()) {
if (this.pausing.get() && !this.paused.get() && !this.assignedPartitions.isEmpty()) {
this.consumer.pause(this.assignedPartitions);
this.paused = true;
this.paused.set(true);
}
else if (this.paused && !this.pausing) {
else if (this.paused.get() && !this.pausing.get()) {
this.consumer.resume(this.assignedPartitions);
this.paused = false;
this.paused.set(false);
}
if (this.paused && this.recordsIterator == null) {
if (this.paused.get() && this.recordsIterator == null) {
this.logger.debug("Consumer is paused; no records will be returned");
}
ConsumerRecord<K, V> record = pollRecord();

return record != null
? recordToMessage(record)
: null;
}
finally {
this.lock.unlock();
this.receiveLock.unlock();
}

ConsumerRecord<K, V> record = pollRecord();

return record != null ? recordToMessage(record) : null;
}

protected void createConsumer() {
this.consumerMonitor.lock();
this.consumerLock.lock();
try {
this.consumer = this.consumerFactory.createConsumer(this.consumerProperties.getGroupId(),
this.consumerProperties.getClientId(), null, this.consumerProperties.getKafkaConsumerProperties());
Expand All @@ -528,7 +499,7 @@ else if (partitions != null) {
}
}
finally {
this.consumerMonitor.unlock();
this.consumerLock.unlock();
}
}

Expand Down Expand Up @@ -586,7 +557,7 @@ private ConsumerRecord<K, V> pollRecord() {
return nextRecord();
}
else {
this.consumerMonitor.lock();
this.consumerLock.lock();
try {
try {
ConsumerRecords<K, V> records = this.consumer
Expand All @@ -611,7 +582,7 @@ private ConsumerRecord<K, V> pollRecord() {
}
}
finally {
this.consumerMonitor.unlock();
this.consumerLock.unlock();
}
}
}
Expand Down Expand Up @@ -673,26 +644,27 @@ private void checkDeserializationException(ConsumerRecord<K, V> record, String h

@Override
public void destroy() {
this.lock.lock();
this.receiveLock.lock();
try {
stopConsumer();
}
finally {
this.lock.unlock();
this.receiveLock.unlock();
}
}

private void stopConsumer() {
this.consumerMonitor.lock();
this.consumerLock.lock();
try {
if (this.consumer != null) {
this.consumer.wakeup();
this.consumer.close(this.closeTimeout);
this.consumer = null;
this.assignedPartitions.clear();
}
}
finally {
this.consumerMonitor.unlock();
this.consumerLock.unlock();
}
}

Expand Down Expand Up @@ -747,7 +719,7 @@ public void onPartitionsLost(Collection<TopicPartition> partitions) {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
KafkaMessageSource.this.assignedPartitions.addAll(partitions);
if (KafkaMessageSource.this.paused) {
if (KafkaMessageSource.this.paused.get()) {
KafkaMessageSource.this.consumer.pause(KafkaMessageSource.this.assignedPartitions);
KafkaMessageSource.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
+ "consumer paused again, so the initial poll() will never return any records");
Expand Down Expand Up @@ -985,7 +957,7 @@ public class KafkaAckInfoImpl implements KafkaAckInfo<K, V> { // NOSONAR - no eq

@Override
public Object getConsumerMonitor() {
return KafkaMessageSource.this.consumerMonitor;
return KafkaMessageSource.this.consumerLock;
}

@Override
Expand Down