Skip to content

Commit c9d50ef

Browse files
committed
Add a mechanism for reading multiple semgrex requests in one process - will save on the startup time for anyone using this tool
1 parent 0a63396 commit c9d50ef

File tree

2 files changed

+140
-4
lines changed

2 files changed

+140
-4
lines changed

src/edu/stanford/nlp/semgraph/semgrex/ProcessSemgrexRequest.java

+58-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77

88
package edu.stanford.nlp.semgraph.semgrex;
99

10+
import java.io.ByteArrayInputStream;
11+
import java.io.ByteArrayOutputStream;
12+
import java.io.DataInputStream;
13+
import java.io.DataOutputStream;
14+
import java.io.EOFException;
1015
import java.io.InputStream;
1116
import java.io.IOException;
1217
import java.io.OutputStream;
@@ -69,14 +74,65 @@ public static CoreNLPProtos.SemgrexResponse processRequest(CoreNLPProtos.Semgrex
6974
return responseBuilder.build();
7075
}
7176

77+
/**
78+
* Reads a single request from the InputStream, then writes back a single response.
79+
*/
7280
public static void processInputStream(InputStream in, OutputStream out) throws IOException {
73-
// TODO: it would be nice to allow multiple reads from the same stream
7481
CoreNLPProtos.SemgrexRequest request = CoreNLPProtos.SemgrexRequest.parseFrom(in);
7582
CoreNLPProtos.SemgrexResponse response = processRequest(request);
7683
response.writeTo(out);
7784
}
7885

86+
/**
87+
* Processes multiple requests from the same stream.
88+
*<br>
89+
* As per the google suggestion for streaming multiple messages,
90+
* this reads the length of the buffer, then reads exactly that many
91+
* bytes and decodes it. It repeats until either 0 is read for the
92+
* length or until EOF.
93+
*<br>
94+
* https://developers.google.com/protocol-buffers/docs/techniques#streamimg
95+
*/
96+
public static void processMultipleInputs(InputStream in, OutputStream out) throws IOException {
97+
DataInputStream din = new DataInputStream(in);
98+
DataOutputStream dout = new DataOutputStream(out);
99+
int size = 0;
100+
do {
101+
try {
102+
size = din.readInt();
103+
} catch (EOFException e) {
104+
// If the stream ends without a closing 0, we consider that okay too
105+
size = 0;
106+
}
107+
108+
// stream is done if there's a closing 0 or if the stream ends
109+
if (size == 0) {
110+
dout.writeInt(0);
111+
break;
112+
}
113+
114+
byte[] inputArray = new byte[size];
115+
din.read(inputArray, 0, size);
116+
ByteArrayInputStream bin = new ByteArrayInputStream(inputArray);
117+
ByteArrayOutputStream result = new ByteArrayOutputStream();
118+
processInputStream(bin, result);
119+
byte[] outputArray = result.toByteArray();
120+
dout.writeInt(outputArray.length);
121+
dout.write(outputArray);
122+
} while (size > 0);
123+
}
124+
125+
/**
126+
* Command line tool for processing a semgrex request.
127+
* <br>
128+
* If -multiple is specified, will process multiple requests.
129+
*/
79130
public static void main(String[] args) throws IOException {
80-
processInputStream(System.in, System.out);
131+
if (args.length > 0 &&
132+
(args[0].equalsIgnoreCase("-multiple") || args[0].equalsIgnoreCase("--multiple"))) {
133+
processMultipleInputs(System.in, System.out);
134+
} else {
135+
processInputStream(System.in, System.out);
136+
}
81137
}
82138
}

test/src/edu/stanford/nlp/semgraph/semgrex/ProcessSemgrexRequestTest.java

+82-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@
55

66
import edu.stanford.nlp.pipeline.CoreNLPProtos;
77

8+
import java.io.ByteArrayInputStream;
9+
import java.io.ByteArrayOutputStream;
10+
import java.io.DataInputStream;
11+
import java.io.DataOutputStream;
12+
import java.io.IOException;
13+
814
public class ProcessSemgrexRequestTest {
915
/**
1016
* Build a fake request. The same query will be repeated N times
@@ -85,7 +91,7 @@ public void testSimpleRequest() {
8591
}
8692

8793
@Test
88-
public void testMultiSemgrex() {
94+
public void testTwoSemgrex() {
8995
CoreNLPProtos.SemgrexRequest request = buildFakeRequest(1, 2);
9096
CoreNLPProtos.SemgrexResponse response = ProcessSemgrexRequest.processRequest(request);
9197

@@ -123,12 +129,86 @@ public void testEmptyRequest() {
123129
}
124130

125131
@Test
126-
public void testMultiRequest() {
132+
public void testTwoGraphs() {
127133
CoreNLPProtos.SemgrexRequest request = buildFakeRequest(2, 1);
128134
CoreNLPProtos.SemgrexResponse response = ProcessSemgrexRequest.processRequest(request);
129135

130136
Assert.assertEquals("Expected exactly 2 replies", 2, response.getResultList().size());
131137
checkResult(response.getResultList().get(0), 1);
132138
checkResult(response.getResultList().get(1), 1);
133139
}
140+
141+
public byte[] buildRepeatedRequest(int count, boolean closingLength) throws IOException {
142+
ByteArrayOutputStream singleBout = new ByteArrayOutputStream();
143+
CoreNLPProtos.SemgrexRequest singleRequest = buildFakeRequest(1, 1);
144+
singleRequest.writeTo(singleBout);
145+
byte[] singleBytes = singleBout.toByteArray();
146+
147+
ByteArrayOutputStream bout = new ByteArrayOutputStream();
148+
DataOutputStream dout = new DataOutputStream(bout);
149+
for (int i = 0; i < count; ++i) {
150+
dout.writeInt(singleBytes.length);
151+
dout.write(singleBytes, 0, singleBytes.length);
152+
}
153+
if (closingLength) {
154+
dout.writeInt(0);
155+
}
156+
dout.close();
157+
158+
return bout.toByteArray();
159+
}
160+
161+
public void checkRepeatedResults(byte[] arr, int count) throws IOException {
162+
ByteArrayInputStream bin = new ByteArrayInputStream(arr);
163+
DataInputStream din = new DataInputStream(bin);
164+
for (int i = 0; i < count; ++i) {
165+
int len = din.readInt();
166+
byte[] responseBytes = new byte[len];
167+
din.read(responseBytes, 0, len);
168+
CoreNLPProtos.SemgrexResponse response = CoreNLPProtos.SemgrexResponse.parseFrom(responseBytes);
169+
checkResult(response.getResultList().get(0), 1);
170+
}
171+
int len = din.readInt();
172+
Assert.assertEquals("Repeated results should be over", 0, len);
173+
}
174+
175+
/**
176+
* Test that the multiple request pathway works with 1 request
177+
*/
178+
@Test
179+
public void testSingleMultiRequest() throws IOException {
180+
byte[] request = buildRepeatedRequest(1, true);
181+
ByteArrayInputStream bin = new ByteArrayInputStream(request);
182+
ByteArrayOutputStream bout = new ByteArrayOutputStream();
183+
184+
ProcessSemgrexRequest.processMultipleInputs(bin, bout);
185+
checkRepeatedResults(bout.toByteArray(), 1);
186+
}
187+
188+
/**
189+
* Test that the multiple request pathway works with 2 requests
190+
*/
191+
@Test
192+
public void testDoubleMultiRequest() throws IOException {
193+
byte[] request = buildRepeatedRequest(2, true);
194+
ByteArrayInputStream bin = new ByteArrayInputStream(request);
195+
ByteArrayOutputStream bout = new ByteArrayOutputStream();
196+
197+
ProcessSemgrexRequest.processMultipleInputs(bin, bout);
198+
checkRepeatedResults(bout.toByteArray(), 2);
199+
}
200+
201+
/**
202+
* Test that the multiple request pathway works even when the
203+
* input stream hits EOF
204+
*/
205+
@Test
206+
public void testUnclosedMultiRequest() throws IOException {
207+
byte[] request = buildRepeatedRequest(1, false);
208+
ByteArrayInputStream bin = new ByteArrayInputStream(request);
209+
ByteArrayOutputStream bout = new ByteArrayOutputStream();
210+
211+
ProcessSemgrexRequest.processMultipleInputs(bin, bout);
212+
checkRepeatedResults(bout.toByteArray(), 1);
213+
}
134214
}

0 commit comments

Comments
 (0)