Skip to content

Commit 91a20d4

Browse files
committed
Fix: Async close /w error from server
1 parent b29d654 commit 91a20d4

File tree

2 files changed

+17
-10
lines changed

2 files changed

+17
-10
lines changed

src/main/java/com/swiftmq/amqp/v100/client/Connection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,10 +448,10 @@ private Session mapSessionToLocalChannel(long incomingWindowSize, long outgoingW
448448
/**
449449
* Creates a new Session.
450450
*
451-
* @param incomingWindowSize Incoming Window Size (maxnumber of unsettled incoming transfers)
451+
* @param incomingWindowSize Incoming Window Size (max number of unsettled incoming transfers)
452452
* @param outgoingWindowSize Outgoing Window Size (max number of unsettled outgoing transfers)
453453
* @return Session
454-
* @throws SessionHandshakeException An error occured during handshake
454+
* @throws SessionHandshakeException An error occurred during handshake
455455
* @throws ConnectionClosedException The connection was closed
456456
*/
457457
public Session createSession(long incomingWindowSize, long outgoingWindowSize) throws SessionHandshakeException, ConnectionClosedException {

src/main/java/com/swiftmq/amqp/v100/client/ConnectionDispatcher.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -217,12 +217,12 @@ private void checkBothSidesClosed() {
217217
}
218218

219219
private void notifyWaitingPOs(POObject[] po) {
220-
for (int i = 0; i < po.length; i++) {
221-
if (po[i] != null) {
222-
po[i].setSuccess(false);
223-
if (po[i].getException() == null)
224-
po[i].setException("Connection was asynchronously closed");
225-
po[i].getSemaphore().notifySingleWaiter();
220+
for (POObject poObject : po) {
221+
if (poObject != null) {
222+
poObject.setSuccess(false);
223+
if (poObject.getException() == null)
224+
poObject.setException("Connection was asynchronously closed");
225+
poObject.getSemaphore().notifySingleWaiter();
226226
}
227227
}
228228
}
@@ -584,8 +584,15 @@ public void visit(EndFrame frame) {
584584

585585
public void visit(CloseFrame frame) {
586586
if (pTracer.isEnabled()) pTracer.trace(toString(), ", visit=" + frame);
587-
remoteClose = frame;
588-
checkBothSidesClosed();
587+
// Async remote close due to error
588+
if (frame.getError() != null) {
589+
if (myConnection.getExceptionListener() != null)
590+
myConnection.getExceptionListener().onException(new ConnectionClosedException(frame.getError().getDescription().getValue()));
591+
new Thread(() -> myConnection.cancel()).start();
592+
} else {
593+
remoteClose = frame;
594+
checkBothSidesClosed();
595+
}
589596
}
590597

591598
public void visit(SaslMechanismsFrame frame) {

0 commit comments

Comments
 (0)