Description
Describe the bug
We noticed that occasionally we have threads stuck forever with the sack that point to the RabbitMQ channel class.
The stack is below. Generally there are several notes about this:
-
The the RPC timeout is not applied to the enqueueAsyncRpc part of the processing only to the IO part
This makes it possible for the thread to wait forever for _activeRpc to clear which in this case never happens.
i.e. the code below can loop forever disregarding the rpc timeout.while(this._activeRpc != null) { try { this._channelLockCondition.await(); } catch (InterruptedException var7) { var2 = true; } }
I guess the same rpc timeout note goes for any "lock" calls
this._channelLock.lock()
A thread can also get stuck there forever. -
The code was recently refactored to use locks instead of the synchronized sections
It's not clear how that should work in the multi-threaded environment (java memory model) with members like _activeRpc.
They are not declared as volatile/atomic. How would memory barrier be enforced for multi-threaded access to the
member? -
Also the semantic of the at ChannelN.asyncCompletableRpc is async, so it should never block,
but in fact it can and does block.
========================== Threads get stuck in this state indefinitely ================
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x00000007a5c699d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:341)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block([email protected]/AbstractQueuedSynchronizer.java:506)
at java.util.concurrent.ForkJoinPool.unmanagedBlock([email protected]/ForkJoinPool.java:3465)
at java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3436)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:1623)
at com.rabbitmq.client.impl.AMQChannel.doEnqueueRpc
at com.rabbitmq.client.impl.AMQChannel.enqueueAsyncRpc
at com.rabbitmq.client.impl.AMQChannel.quiescingAsyncRpc
at com.rabbitmq.client.impl.AMQChannel.asyncRpc
at com.rabbitmq.client.impl.AMQChannel.privateAsyncRpc
at com.rabbitmq.client.impl.AMQChannel.exnWrappingAsyncRpc
at com.rabbitmq.client.impl.ChannelN.asyncCompletableRpc
at reactor.rabbitmq.Sender.lambda$declareExchange$16
...
Reproduction steps
It's not clear what is triggering the condition.
Expected behavior
Any invocation should respect the rpc timeout.
Additional context
The code is using 5.20.0 version of the java client.