Skip to content

Commit 17afaca

Browse files
committed
Pump process inputs before checking completion
Process completion methods can hang because internal buffers are full. This commit "pumps" the inputs before calling the completion method, which should help to finish properly the command call.
1 parent 6b2251f commit 17afaca

File tree

1 file changed

+58
-11
lines changed

1 file changed

+58
-11
lines changed

src/test/java/com/rabbitmq/tools/Host.java

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,25 +54,68 @@ public static String capture(InputStream is)
5454
return buff.toString();
5555
}
5656

57-
public static Process executeCommand(String command) throws IOException
57+
public static ProcessState executeCommand(String command) throws IOException
5858
{
5959
Process pr = executeCommandProcess(command);
60+
InputStreamPumpState inputState = new InputStreamPumpState(pr.getInputStream());
61+
InputStreamPumpState errorState = new InputStreamPumpState(pr.getErrorStream());
6062

61-
int ev = waitForExitValue(pr);
63+
int ev = waitForExitValue(pr, inputState, errorState);
64+
inputState.pump();
65+
errorState.pump();
6266
if (ev != 0) {
63-
String stdout = capture(pr.getInputStream());
64-
String stderr = capture(pr.getErrorStream());
6567
throw new IOException("unexpected command exit value: " + ev +
6668
"\ncommand: " + command + "\n" +
67-
"\nstdout:\n" + stdout +
68-
"\nstderr:\n" + stderr + "\n");
69+
"\nstdout:\n" + inputState.buffer.toString() +
70+
"\nstderr:\n" + errorState.buffer.toString() + "\n");
6971
}
70-
return pr;
72+
return new ProcessState(pr, inputState, errorState);
73+
}
74+
75+
static class ProcessState {
76+
77+
private final Process process;
78+
private final InputStreamPumpState inputState;
79+
private final InputStreamPumpState errorState;
80+
81+
ProcessState(Process process, InputStreamPumpState inputState,
82+
InputStreamPumpState errorState) {
83+
this.process = process;
84+
this.inputState = inputState;
85+
this.errorState = errorState;
86+
}
87+
88+
private String output() {
89+
return inputState.buffer.toString();
90+
}
91+
92+
}
93+
94+
private static class InputStreamPumpState {
95+
96+
private final BufferedReader reader;
97+
private final StringBuilder buffer;
98+
99+
private InputStreamPumpState(InputStream in) {
100+
this.reader = new BufferedReader(new InputStreamReader(in));
101+
this.buffer = new StringBuilder();
102+
}
103+
104+
void pump() throws IOException {
105+
String line;
106+
while ((line = reader.readLine()) != null) {
107+
buffer.append(line).append("\n");
108+
}
109+
}
110+
71111
}
72112

73-
private static int waitForExitValue(Process pr) {
113+
private static int waitForExitValue(Process pr, InputStreamPumpState inputState,
114+
InputStreamPumpState errorState) throws IOException {
74115
while(true) {
75116
try {
117+
inputState.pump();
118+
errorState.pump();
76119
pr.waitFor();
77120
break;
78121
} catch (InterruptedException ignored) {}
@@ -83,6 +126,10 @@ private static int waitForExitValue(Process pr) {
83126
public static Process executeCommandIgnoringErrors(String command) throws IOException
84127
{
85128
Process pr = executeCommandProcess(command);
129+
InputStreamPumpState inputState = new InputStreamPumpState(pr.getInputStream());
130+
InputStreamPumpState errorState = new InputStreamPumpState(pr.getErrorStream());
131+
inputState.pump();
132+
errorState.pump();
86133
boolean exited = false;
87134
try {
88135
exited = pr.waitFor(30, TimeUnit.SECONDS);
@@ -118,7 +165,7 @@ public static boolean isRabbitMqCtlCommandAvailable(String command) throws IOExc
118165
return exitValue == 0;
119166
}
120167

121-
public static Process rabbitmqctl(String command) throws IOException {
168+
public static ProcessState rabbitmqctl(String command) throws IOException {
122169
return executeCommand(rabbitmqctlCommand() +
123170
rabbitmqctlNodenameArgument() +
124171
" " + command);
@@ -142,7 +189,7 @@ public static void clearResourceAlarm(String source) throws IOException {
142189
rabbitmqctl("eval 'rabbit_alarm:clear_alarm({resource_limit, " + source + ", node()}).'");
143190
}
144191

145-
public static Process invokeMakeTarget(String command) throws IOException {
192+
public static ProcessState invokeMakeTarget(String command) throws IOException {
146193
File rabbitmqctl = new File(rabbitmqctlCommand());
147194
return executeCommand(makeCommand() +
148195
" -C \'" + rabbitmqDir() + "\'" +
@@ -307,7 +354,7 @@ public String toString() {
307354
}
308355

309356
public static List<ConnectionInfo> listConnections() throws IOException {
310-
String output = capture(rabbitmqctl("list_connections -q pid peer_port client_properties").getInputStream());
357+
String output = rabbitmqctl("list_connections -q pid peer_port client_properties").output();
311358
// output (header line presence depends on broker version):
312359
// pid peer_port
313360
// <[email protected]> 58713

0 commit comments

Comments
 (0)