Skip to content

Commit 680f98e

Browse files
author
Matthew Sackman
committed
Merging bug 22331 into default
2 parents ea57628 + 04b4bf5 commit 680f98e

File tree

7 files changed

+104
-32
lines changed

7 files changed

+104
-32
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,35 +77,35 @@ public interface Channel extends ShutdownNotifier {
7777
/**
7878
* Close this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
7979
* and message 'OK'.
80-
*
80+
*
8181
* @throws java.io.IOException if an error is encountered
8282
*/
8383
void close() throws IOException;
84-
84+
8585
/**
8686
* Close this channel.
87-
*
87+
*
8888
* @param closeCode the close code (See under "Reply Codes" in the AMQP specification)
8989
* @param closeMessage a message indicating the reason for closing the connection
9090
* @throws java.io.IOException if an error is encountered
9191
*/
9292
void close(int closeCode, String closeMessage) throws IOException;
93-
93+
9494
/**
9595
* Abort this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code
9696
* and message 'OK'.
97-
*
97+
*
9898
* Forces the channel to close and waits for the close operation to complete.
9999
* Any encountered exceptions in the close operation are silently discarded.
100100
*/
101101
void abort() throws IOException;
102-
102+
103103
/**
104104
* Abort this channel.
105-
*
105+
*
106106
* Forces the channel to close and waits for the close operation to complete.
107107
* Any encountered exceptions in the close operation are silently discarded.
108-
*/
108+
*/
109109
void abort(int closeCode, String closeMessage) throws IOException;
110110

111111
/**
@@ -254,7 +254,7 @@ Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean passive
254254
* @throws java.io.IOException if an error is encountered
255255
*/
256256
Queue.DeclareOk queueDeclare(String queue) throws IOException;
257-
257+
258258
/**
259259
* Actively declare a non-exclusive, non-autodelete queue
260260
* The name of the new queue is held in the "queue" field of the {@link com.rabbitmq.client.AMQP.Queue.DeclareOk} result.
@@ -329,7 +329,7 @@ Queue.DeclareOk queueDeclare(String queue, boolean passive, boolean durable, boo
329329
* @throws java.io.IOException if an error is encountered
330330
*/
331331
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
332-
332+
333333
/**
334334
* Unbinds a queue from an exchange, with no extra arguments.
335335
* @see com.rabbitmq.client.AMQP.Queue.Unbind
@@ -370,7 +370,7 @@ Queue.DeclareOk queueDeclare(String queue, boolean passive, boolean durable, boo
370370
* @see com.rabbitmq.client.AMQP.Queue.Purge
371371
* @see com.rabbitmq.client.AMQP.Queue.PurgeOk
372372
* @param queue the name of the queue
373-
* @param nowait whether to await completion of the purge
373+
* @param nowait whether to await completion of the purge
374374
* @return a purge-confirm method if the purge was executed succesfully
375375
* @throws java.io.IOException if an error is encountered
376376
*/

src/com/rabbitmq/tools/Tracer.java

Lines changed: 78 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,11 @@
3131

3232
package com.rabbitmq.tools;
3333

34-
import java.io.DataInputStream;
35-
import java.io.DataOutputStream;
36-
import java.io.EOFException;
37-
import java.io.IOException;
34+
import java.io.*;
3835
import java.net.ServerSocket;
3936
import java.net.Socket;
4037
import java.util.HashMap;
38+
import java.util.concurrent.*;
4139

4240
import com.rabbitmq.client.AMQP;
4341
import com.rabbitmq.client.impl.AMQCommand;
@@ -46,6 +44,7 @@
4644
import com.rabbitmq.client.impl.Frame;
4745
import com.rabbitmq.utility.BlockingCell;
4846

47+
4948
/**
5049
* AMQP Protocol Analyzer program. Listens on a configurable port and when a
5150
* connection arrives, makes an outbound connection to a configurable host and
@@ -64,6 +63,64 @@ public class Tracer implements Runnable {
6463
public static final boolean SUPPRESS_COMMAND_BODIES =
6564
Boolean.parseBoolean(System.getProperty("com.rabbitmq.tools.Tracer.SUPPRESS_COMMAND_BODIES"));
6665

66+
public static final boolean SILENT_MODE =
67+
Boolean.parseBoolean(System.getProperty("com.rabbitmq.tools.Tracer.SILENT_MODE"));
68+
69+
final static int LOG_QUEUE_SIZE = 1024 * 1024;
70+
final static int BUFFER_SIZE = 10 * 1024 * 1024;
71+
final static int MAX_TIME_BETWEEN_FLUSHES = 1000;
72+
final static Object FLUSH = new Object();
73+
74+
private static class AsyncLogger extends Thread{
75+
final PrintStream ps;
76+
final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(LOG_QUEUE_SIZE, true);
77+
AsyncLogger(PrintStream ps){
78+
this.ps = new PrintStream(new BufferedOutputStream(ps, BUFFER_SIZE), false);
79+
start();
80+
81+
new Thread(){
82+
@Override public void run(){
83+
while(true){
84+
try {
85+
Thread.sleep(MAX_TIME_BETWEEN_FLUSHES);
86+
queue.add(FLUSH);
87+
} catch(InterruptedException e) { }
88+
}
89+
90+
}
91+
}.start();
92+
}
93+
94+
void printMessage(Object message){
95+
if(message instanceof Throwable){
96+
((Throwable)message).printStackTrace(ps);
97+
} else if (message instanceof String){
98+
ps.println(message);
99+
} else {
100+
throw new RuntimeException("Unrecognised object " + message);
101+
}
102+
}
103+
104+
@Override public void run(){
105+
try {
106+
while(true){
107+
Object message = queue.take();
108+
if(message == FLUSH) ps.flush();
109+
else printMessage(message);
110+
}
111+
} catch (InterruptedException interrupt){
112+
}
113+
}
114+
115+
void log(Object message){
116+
try {
117+
queue.put(message);
118+
} catch(InterruptedException ex){
119+
throw new RuntimeException(ex);
120+
}
121+
}
122+
}
123+
67124
public static void main(String[] args) {
68125
int listenPort = args.length > 0 ? Integer.parseInt(args[0]) : 5673;
69126
String connectHost = args.length > 1 ? args[1] : "localhost";
@@ -85,9 +142,10 @@ public static void main(String[] args) {
85142
try {
86143
ServerSocket server = new ServerSocket(listenPort);
87144
int counter = 0;
145+
AsyncLogger logger = new AsyncLogger(System.out);
88146
while (true) {
89147
Socket conn = server.accept();
90-
new Tracer(conn, counter++, connectHost, connectPort);
148+
new Tracer(conn, counter++, connectHost, connectPort, logger);
91149
}
92150
} catch (IOException ioe) {
93151
ioe.printStackTrace();
@@ -109,7 +167,9 @@ public static void main(String[] args) {
109167

110168
public DataOutputStream oos;
111169

112-
public Tracer(Socket sock, int id, String host, int port) throws IOException {
170+
public AsyncLogger logger;
171+
172+
public Tracer(Socket sock, int id, String host, int port, AsyncLogger logger) throws IOException {
113173
this.inSock = sock;
114174
this.outSock = new Socket(host, port);
115175
this.id = id;
@@ -118,6 +178,7 @@ public Tracer(Socket sock, int id, String host, int port) throws IOException {
118178
this.ios = new DataOutputStream(inSock.getOutputStream());
119179
this.ois = new DataInputStream(outSock.getInputStream());
120180
this.oos = new DataOutputStream(outSock.getOutputStream());
181+
this.logger = logger;
121182

122183
new Thread(this).start();
123184
}
@@ -135,18 +196,18 @@ public void run() {
135196
new Thread(outHandler).start();
136197
Object result = w.uninterruptibleGet();
137198
if (result instanceof Exception) {
138-
((Exception) result).printStackTrace();
199+
logger.log(result);
139200
}
140201
} catch (EOFException eofe) {
141-
eofe.printStackTrace();
202+
logger.log(eofe);
142203
} catch (IOException ioe) {
143-
ioe.printStackTrace();
204+
logger.log(ioe);
144205
} finally {
145206
try {
146207
inSock.close();
147208
outSock.close();
148209
} catch (IOException ioe2) {
149-
ioe2.printStackTrace();
210+
logger.log(ioe2);
150211
}
151212
}
152213
}
@@ -174,7 +235,7 @@ public Frame readFrame() throws IOException {
174235
}
175236

176237
public void report(int channel, Object object) {
177-
System.out.println("" + System.currentTimeMillis() + ": conn#" + id + " ch#" + channel + (inBound ? " -> " : " <- ") + object);
238+
logger.log("" + System.currentTimeMillis() + ": conn#" + id + " ch#" + channel + (inBound ? " -> " : " <- ") + object);
178239
}
179240

180241
public void reportFrame(Frame f)
@@ -202,7 +263,13 @@ public void reportFrame(Frame f)
202263

203264
public void doFrame() throws IOException {
204265
Frame f = readFrame();
266+
205267
if (f != null) {
268+
269+
if(SILENT_MODE){
270+
f.writeTo(o);
271+
return;
272+
}
206273
if (f.type == AMQP.FRAME_HEARTBEAT) {
207274
if ((inBound && !WITHHOLD_INBOUND_HEARTBEATS) ||
208275
(!inBound && !WITHHOLD_OUTBOUND_HEARTBEATS))

src/com/rabbitmq/utility/BlockingValueOrException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void setException(E e) {
4646
public V uninterruptibleGetValue() throws E {
4747
return uninterruptibleGet().getValue();
4848
}
49-
49+
5050
public V uninterruptibleGetValue(int timeout) throws E, TimeoutException {
5151
return uninterruptibleGet(timeout).getValue();
5252
}

src/com/rabbitmq/utility/Utility.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public static <T extends Throwable & SensibleClone<T>> T fixStackTrace(T throwab
7676
throwable.fillInStackTrace();
7777
// We want to remove fixStackTrace from the trace.
7878
StackTraceElement[] existing = throwable.getStackTrace();
79-
StackTraceElement[] newTrace = new StackTraceElement[existing.length - 1];
79+
StackTraceElement[] newTrace = new StackTraceElement[existing.length - 1];
8080
System.arraycopy(existing, 1, newTrace, 0, newTrace.length);
8181
throwable.setStackTrace(newTrace);
8282
return throwable;
@@ -92,7 +92,7 @@ public static String makeStackTrace(Throwable throwable) {
9292
try {
9393
outputStream.close();
9494
} catch (IOException ex) {
95-
// Closing the output stream won't generate an error, and in
95+
// Closing the output stream won't generate an error, and in
9696
// fact does nothing - just being tidy
9797
ex.printStackTrace();
9898
}

src/com/rabbitmq/utility/ValueOrException.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class ValueOrException<V, E extends Throwable & SensibleClone<E>> {
3434
private final boolean _useValue;
3535
private final V _value;
3636
private final E _exception;
37-
37+
3838
/**
3939
* dual-purpose private constructor: one will be null, and the flag tells which to use
4040
* @param value the value to wrap, if applicable
@@ -69,7 +69,7 @@ public static <V, E extends Throwable & SensibleClone<E>> ValueOrException<V, E>
6969
public static <V, E extends Throwable & SensibleClone<E>> ValueOrException<V, E> makeException(E exception) {
7070
return new ValueOrException<V, E>(null, exception, false);
7171
}
72-
72+
7373
/** Retrieve value or throw exception
7474
* @return the wrapped value, if it's a value
7575
* @throws E the wrapped exception, if it's an exception

test/src/com/rabbitmq/client/test/ValueOrExceptionTest.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838

3939

4040
public class ValueOrExceptionTest extends TestCase {
41-
public static class InsufficientMagicException extends Exception implements SensibleClone<InsufficientMagicException> {
41+
public static class InsufficientMagicException extends Exception
42+
implements SensibleClone<InsufficientMagicException> {
4243
public InsufficientMagicException(String message) {
4344
super(message);
4445
}
@@ -58,15 +59,19 @@ public static TestSuite suite()
5859

5960
public void testStoresValue() throws InsufficientMagicException {
6061
Integer value = new Integer(3);
61-
ValueOrException<Integer, InsufficientMagicException> valueOrEx = ValueOrException.<Integer, InsufficientMagicException>makeValue(value);
62+
63+
ValueOrException<Integer, InsufficientMagicException> valueOrEx =
64+
ValueOrException.<Integer, InsufficientMagicException>makeValue(value);
6265

6366
Integer returnedValue = valueOrEx.getValue();
6467
assertTrue(returnedValue == value);
6568
}
6669

6770
public void testClonesException() {
68-
InsufficientMagicException exception = new InsufficientMagicException("dummy message");
69-
ValueOrException<Integer, InsufficientMagicException> valueOrEx = ValueOrException.<Integer, InsufficientMagicException>makeException(exception);
71+
InsufficientMagicException exception =
72+
new InsufficientMagicException("dummy message");
73+
ValueOrException<Integer, InsufficientMagicException> valueOrEx
74+
= ValueOrException.makeException(exception);
7075

7176
try {
7277
valueOrEx.getValue();

test/src/com/rabbitmq/client/test/performance/ScalabilityTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public void print(final int base, final String prefix)
144144
s.close();
145145
s = open(prefix, "deletion");
146146
print(s, base, deletionTimes);
147-
s.close();
147+
s.close();
148148
s = open(prefix, "routing");
149149
print(s, base, transpose(routingTimes));
150150
s.close();

0 commit comments

Comments
 (0)