Description
I am actually running into this issue right now and have worked around it for now by closing the channel and creating a new one, which clears the unconfirmedSet and resolves the issue for a while. (By the way, is there anyway to access that set for debugging purposes?) Imagine several threads going through a common object to send messages. The only way I've figured so far to know which message failed to send is to waitForConfirms after every publish. We pass it a timeout so we don't end up hanging forever for any reason. The problem we see is that waitForConfirms is timing out quite frequently, always during high traffic situations. When I looked at the implementation of that method, it looks like it waits for the entire set to be empty before returning. You're probably beginning to see the problem. If you have multiple threads sending messages and some of them waiting for their message to be confirmed, if there is always something in the set, then it just never happens. I read that publishing to the same channel, although not recommended, over multiple threads was okay because access to the channel was serialized.
I don't know whether this is a bug, design flaw, or user error. How can I check that a confirm was received for the message that was just sent instead of all messages? Or maybe my understanding of what waitForConfirms
is supposed to be used for is wrong. I've included an example usage of how I am sending and then checking if it sent. Forgive me, my code is in scala, but it's almost java so I think you'll be okay.
val basicPublishTimeout: Long = 400
rabbitClient().basicPublish(exchange.getOrElse("amq.topic"), routingKey.getOrElse(""), props, message.get.getBytes("UTF-8"))
// lets us know if the message was successfully sent
if (!waitForConfirms(basicPublishTimeout)) {
throw new IOException("Message was nack'd by broker and will be resent.")
}
/**
* Waits for all sent messages to be acknowledged up to timeout. Throws TimeoutException if times out.
* Returns true if all messages were acknowledged and false if any were explicitly not acknowledged.
* @inheritdoc
*/
def waitForConfirms(timeout: Long = 0): Boolean = try
if (timeout == 0) channel map { _.waitForConfirms() } get
else channel map { _.waitForConfirms(timeout) } get
catch {
case e: lang.IllegalStateException => logger.error("The channel is not set up to confirm messages.")
false
}