Skip to content

Commit a6ebf0c

Browse files
author
Simon MacMullen
committed
Merged bug 22101 into default
2 parents 71ea893 + 41f5a80 commit a6ebf0c

File tree

11 files changed

+663
-53
lines changed

11 files changed

+663
-53
lines changed

Makefile

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
VERSION=0.0.0
2-
32
PACKAGE_NAME=rabbitmq-java-client
4-
53
JAVADOC_ARCHIVE=$(PACKAGE_NAME)-javadoc-$(VERSION)
64
SRC_ARCHIVE=$(PACKAGE_NAME)-$(VERSION)
75
SIGNING_KEY=056E8E56

src/com/rabbitmq/client/Channel.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,56 @@ Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable
301301
*/
302302
Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
303303

304+
/**
305+
* Bind an exchange to an exchange, with no extra arguments.
306+
* @see com.rabbitmq.client.AMQP.Exchange.Bind
307+
* @see com.rabbitmq.client.AMQP.Exchange.BindOk
308+
* @param destination: the name of the exchange to which messages flow across the binding
309+
* @param source: the name of the exchange from which messages flow across the binding
310+
* @param routingKey: the routine key to use for the binding
311+
* @return a binding-confirm method if the binding was successfully created
312+
* @throws java.io.IOException if an error is encountered
313+
*/
314+
Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
315+
316+
/**
317+
* Bind an exchange to an exchange.
318+
* @see com.rabbitmq.client.AMQP.Exchange.Bind
319+
* @see com.rabbitmq.client.AMQP.Exchange.BindOk
320+
* @param destination: the name of the exchange to which messages flow across the binding
321+
* @param source: the name of the exchange from which messages flow across the binding
322+
* @param routingKey: the routine key to use for the binding
323+
* @param arguments: other properties (binding parameters)
324+
* @return a binding-confirm method if the binding was successfully created
325+
* @throws java.io.IOException if an error is encountered
326+
*/
327+
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
328+
329+
/**
330+
* Unbind an exchange from an exchange, with no extra arguments.
331+
* @see com.rabbitmq.client.AMQP.Exchange.Bind
332+
* @see com.rabbitmq.client.AMQP.Exchange.BindOk
333+
* @param destination: the name of the exchange to which messages flow across the binding
334+
* @param source: the name of the exchange from which messages flow across the binding
335+
* @param routingKey: the routine key to use for the binding
336+
* @return a binding-confirm method if the binding was successfully created
337+
* @throws java.io.IOException if an error is encountered
338+
*/
339+
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;
340+
341+
/**
342+
* Unbind an exchange from an exchange.
343+
* @see com.rabbitmq.client.AMQP.Exchange.Bind
344+
* @see com.rabbitmq.client.AMQP.Exchange.BindOk
345+
* @param destination: the name of the exchange to which messages flow across the binding
346+
* @param source: the name of the exchange from which messages flow across the binding
347+
* @param routingKey: the routine key to use for the binding
348+
* @param arguments: other properties (binding parameters)
349+
* @return a binding-confirm method if the binding was successfully created
350+
* @throws java.io.IOException if an error is encountered
351+
*/
352+
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
353+
304354
/**
305355
* Actively declare a server-named exclusive, autodelete, non-durable queue.
306356
* The name of the new queue is held in the "queue" field of the {@link com.rabbitmq.client.AMQP.Queue.DeclareOk} result.

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
import java.util.Map;
3838
import java.util.HashMap;
3939
import java.util.concurrent.TimeoutException;
40+
import java.util.concurrent.ScheduledExecutorService;
41+
import java.util.concurrent.Executors;
42+
import java.util.concurrent.TimeUnit;
4043

4144
import com.rabbitmq.client.AMQP;
4245
import com.rabbitmq.client.Address;
@@ -89,7 +92,7 @@ public static Map<String, Object> defaultClientProperties() {
8992
new Version(AMQP.PROTOCOL.MAJOR, AMQP.PROTOCOL.MINOR);
9093

9194
/** Initialization parameters */
92-
private final ConnectionFactory factory;
95+
private final ConnectionFactory _factory;
9396

9497
/** The special channel 0 */
9598
private final AMQChannel _channel0 = new AMQChannel(this, 0) {
@@ -122,6 +125,9 @@ public static Map<String, Object> defaultClientProperties() {
122125
/** Flag indicating whether the client received Connection.Close message from the broker */
123126
private boolean _brokerInitiatedShutdown = false;
124127

128+
/** Manages heartbeat sending for this connection */
129+
private final HeartbeatSender _heartbeatSender;
130+
125131
/**
126132
* Protected API - respond, in the driver thread, to a ShutdownSignal.
127133
* @param channel the channel to disconnect
@@ -138,12 +144,6 @@ public void ensureIsOpen()
138144
}
139145
}
140146

141-
/**
142-
* Timestamp of last time we wrote a frame - used for deciding when to
143-
* send a heartbeat
144-
*/
145-
private volatile long _lastActivityTime = Long.MAX_VALUE;
146-
147147
/**
148148
* Count of socket-timeouts that have happened without any incoming frames
149149
*/
@@ -210,7 +210,8 @@ public AMQConnection(ConnectionFactory factory,
210210
_requestedHeartbeat = factory.getRequestedHeartbeat();
211211
_clientProperties = new HashMap<String, Object>(factory.getClientProperties());
212212

213-
this.factory = factory;
213+
_factory = factory;
214+
_heartbeatSender = new HeartbeatSender(frameHandler);
214215
_frameHandler = frameHandler;
215216
_running = true;
216217
_frameMax = 0;
@@ -288,17 +289,17 @@ public void start()
288289
}
289290

290291
int channelMax =
291-
negotiatedMaxValue(factory.getRequestedChannelMax(),
292+
negotiatedMaxValue(_factory.getRequestedChannelMax(),
292293
connTune.getChannelMax());
293294
_channelManager = new ChannelManager(channelMax);
294295

295296
int frameMax =
296-
negotiatedMaxValue(factory.getRequestedFrameMax(),
297+
negotiatedMaxValue(_factory.getRequestedFrameMax(),
297298
connTune.getFrameMax());
298299
setFrameMax(frameMax);
299300

300301
int heartbeat =
301-
negotiatedMaxValue(factory.getRequestedHeartbeat(),
302+
negotiatedMaxValue(_factory.getRequestedHeartbeat(),
302303
connTune.getHeartbeat());
303304
setHeartbeat(heartbeat);
304305

@@ -349,10 +350,12 @@ public int getHeartbeat() {
349350
*/
350351
public void setHeartbeat(int heartbeat) {
351352
try {
353+
_heartbeatSender.setHeartbeat(heartbeat);
354+
_heartbeat = heartbeat;
355+
352356
// Divide by four to make the maximum unwanted delay in
353357
// sending a timeout be less than a quarter of the
354358
// timeout setting.
355-
_heartbeat = heartbeat;
356359
_frameHandler.setTimeout(heartbeat * 1000 / 4);
357360
} catch (SocketException se) {
358361
// should do more here?
@@ -395,7 +398,7 @@ public Frame readFrame() throws IOException {
395398
*/
396399
public void writeFrame(Frame f) throws IOException {
397400
_frameHandler.writeFrame(f);
398-
_lastActivityTime = System.nanoTime();
401+
_heartbeatSender.signalActivity();
399402
}
400403

401404
private static int negotiatedMaxValue(int clientValue, int serverValue) {
@@ -416,7 +419,7 @@ private class MainLoop extends Thread {
416419
try {
417420
while (_running) {
418421
Frame frame = readFrame();
419-
maybeSendHeartbeat();
422+
420423
if (frame != null) {
421424
_missedHeartbeats = 0;
422425
if (frame.type == AMQP.FRAME_HEARTBEAT) {
@@ -459,25 +462,6 @@ private class MainLoop extends Thread {
459462
}
460463
}
461464

462-
private static final long NANOS_IN_SECOND = 1000 * 1000 * 1000;
463-
464-
/**
465-
* Private API - Checks lastActivityTime and heartbeat, sending a
466-
* heartbeat frame if conditions are right.
467-
*/
468-
public void maybeSendHeartbeat() throws IOException {
469-
if (_heartbeat == 0) {
470-
// No heartbeating.
471-
return;
472-
}
473-
474-
long now = System.nanoTime();
475-
if (now > (_lastActivityTime + (_heartbeat * NANOS_IN_SECOND))) {
476-
_lastActivityTime = now;
477-
writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
478-
}
479-
}
480-
481465
/**
482466
* Private API - Called when a frame-read operation times out. Checks to
483467
* see if too many heartbeats have been missed, and if so, throws
@@ -557,7 +541,7 @@ public void handleConnectionClose(Command closeCommand) {
557541
} catch (IOException ioe) {
558542
Utility.emptyStatement();
559543
}
560-
_heartbeat = 0; // Do not try to send heartbeats after CloseOk
544+
_heartbeatSender.shutdown(); // Do not try to send heartbeats after CloseOk
561545
_brokerInitiatedShutdown = true;
562546
Thread scw = new SocketCloseWait(sse);
563547
scw.setName("AMQP Connection Closing Monitor " +
@@ -607,6 +591,10 @@ public ShutdownSignalException shutdown(Object reason,
607591
if (isOpen())
608592
_shutdownCause = sse;
609593
}
594+
595+
// stop any heartbeating
596+
_heartbeatSender.shutdown();
597+
610598
_channel0.processShutdownSignal(sse, !initiatedByApplication, notifyRpc);
611599
_channelManager.handleSignal(sse);
612600
return sse;

src/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,32 +92,43 @@ public void handleSignal(ShutdownSignalException signal) {
9292
}
9393
}
9494

95-
public synchronized ChannelN createChannel(AMQConnection connection) throws IOException {
96-
int channelNumber = channelNumberAllocator.allocate();
97-
if (channelNumber == -1) {
98-
return null;
95+
public ChannelN createChannel(AMQConnection connection) throws IOException {
96+
int channelNumber;
97+
synchronized (this) {
98+
channelNumber = channelNumberAllocator.allocate();
99+
if (channelNumber == -1) {
100+
return null;
101+
}
99102
}
100103
return createChannelInternal(connection, channelNumber);
101104
}
102105

103-
public synchronized ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException {
104-
if(channelNumberAllocator.reserve(channelNumber))
106+
public ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException {
107+
boolean reserved;
108+
synchronized (this) {
109+
reserved = channelNumberAllocator.reserve(channelNumber);
110+
}
111+
if(reserved)
105112
return createChannelInternal(connection, channelNumber);
106113
else
107114
return null;
108115
}
109116

110-
private synchronized ChannelN createChannelInternal(AMQConnection connection, int channelNumber) throws IOException {
111-
if (_channelMap.containsKey(channelNumber)) {
112-
// That number's already allocated! Can't do it
113-
// This should never happen unless something has gone
114-
// badly wrong with our implementation.
115-
throw new IllegalStateException("We have attempted to "
116-
+ "create a channel with a number that is already in "
117-
+ "use. This should never happen. Please report this as a bug.");
117+
private ChannelN createChannelInternal(AMQConnection connection, int channelNumber) throws IOException {
118+
ChannelN ch;
119+
synchronized (this) {
120+
if (_channelMap.containsKey(channelNumber)) {
121+
// That number's already allocated! Can't do it
122+
// This should never happen unless something has gone
123+
// badly wrong with our implementation.
124+
throw new IllegalStateException("We have attempted to "
125+
+ "create a channel with a number that is already in "
126+
+ "use. This should never happen. "
127+
+ "Please report this as a bug.");
128+
}
129+
ch = new ChannelN(connection, channelNumber);
130+
addChannel(ch);
118131
}
119-
ChannelN ch = new ChannelN(connection, channelNumber);
120-
addChannel(ch);
121132
ch.open(); // now that it's been added to our internal tables
122133
return ch;
123134
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,36 @@ public Exchange.DeleteOk exchangeDelete(String exchange)
524524
return exchangeDelete(exchange, false);
525525
}
526526

527+
/** Public API - {@inheritDoc} */
528+
public Exchange.BindOk exchangeBind(String destination, String source,
529+
String routingKey, Map<String, Object> arguments)
530+
throws IOException {
531+
return (Exchange.BindOk) exnWrappingRpc(
532+
new Exchange.Bind(TICKET, destination, source, routingKey,
533+
false, arguments)).getMethod();
534+
}
535+
536+
/** Public API - {@inheritDoc} */
537+
public Exchange.BindOk exchangeBind(String destination, String source,
538+
String routingKey) throws IOException {
539+
return exchangeBind(destination, source, routingKey, null);
540+
}
541+
542+
/** Public API - {@inheritDoc} */
543+
public Exchange.UnbindOk exchangeUnbind(String destination, String source,
544+
String routingKey, Map<String, Object> arguments)
545+
throws IOException {
546+
return (Exchange.UnbindOk) exnWrappingRpc(
547+
new Exchange.Unbind(TICKET, destination, source, routingKey,
548+
false, arguments)).getMethod();
549+
}
550+
551+
/** Public API - {@inheritDoc} */
552+
public Exchange.UnbindOk exchangeUnbind(String destination, String source,
553+
String routingKey) throws IOException {
554+
return exchangeUnbind(destination, source, routingKey, null);
555+
}
556+
527557
/** Public API - {@inheritDoc} */
528558
public Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,
529559
boolean autoDelete, Map<String, Object> arguments)

0 commit comments

Comments
 (0)