Description
In the following test, I have 3 node cluster (rh7v-intel64-90-java-stress-1/2/4.marklogic.com) with a forest on each of the hosts and forests on hosts rh7v-intel64-90-java-stress-2/4.marklogic.com configured to failover to rh7v-intel64-90-java-stress-1.marklogic.com. When Query batcher is executed, I stop rh7v-intel64-90-java-stress-4.marklogic.com , the forest QBFailover-3 fails over to rh7v-intel64-90-java-stress-1.marklogic.com. After some time, I stop rh7v-intel64-90-java-stress-2.marklogic.com but before node timeout (30 seconds by default) elapses, I start rh7v-intel64-90-java-stress-4.marklogic.com. After 30 seconds, the forest QBFailover-2 on rh7v-intel64-90-java-stress-2.marklogic.com also fails over to rh7v-intel64-90-java-stress-1.marklogic.com. In this scenario, the total URIs returned is less than expected. The log is attached
TEST-com.marklogic.client.datamovement.functionaltests.QBFailover.txt
@Test
public void testStopTwoNodes() throws Exception{
Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue()==0);
try{
final AtomicInteger success = new AtomicInteger(0);
AtomicBoolean isNode3Running = new AtomicBoolean(true);
AtomicBoolean isNode2Running = new AtomicBoolean(true);
addDocs();
Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue()==6000);
QueryBatcher batcher = dmManager.newQueryBatcher(new StructuredQueryBuilder().collection("XmlTransform"))
.withBatchSize(15).withThreadCount(10);
QueryFailureListener[] qfl = batcher.getQueryFailureListeners();
List<QueryFailureListener> batchListeners = Arrays.asList(qfl);
batchListeners = new ArrayList<QueryFailureListener>(batchListeners);
for (Iterator<QueryFailureListener> iter = batchListeners.listIterator(); iter.hasNext(); ) {
QueryFailureListener objList = iter.next();
if (objList.toString().contains("com.marklogic.client.datamovement.HostAvailabilityListener")) {
iter.remove();
}
}
batchListeners.add( new HostAvailabilityListener(dmManager)
.withSuspendTimeForHostUnavailable(Duration.ofSeconds(15))
.withMinHosts(2));
batcher.setQueryFailureListeners(batchListeners.toArray(new QueryFailureListener[batchListeners.size()]));
batcher.onUrisReady((batch)->{
success.addAndGet(batch.getItems().length);
})
.onQueryFailure(queryException-> {
queryException.printStackTrace();
}
);
ticket = dmManager.startJob(batcher);
while( ! batcher.isStopped() ){
if (isNode3Running.get() && dmManager.getJobReport(ticket).getSuccessEventsCount() > 999 ){
isNode3Running.set(false);
serverStartStop(hostNames[hostNames.length -1], "stop");
}
if (isNode2Running.get() && dmManager.getJobReport(ticket).getSuccessEventsCount() > 2999 ){
isNode2Running.set(false);
serverStartStop(hostNames[hostNames.length -2], "stop");
Thread.currentThread().sleep(5000L);
serverStartStop(hostNames[hostNames.length -1], "start");
}
}
batcher.awaitCompletion();
dmManager.stopJob(ticket);
System.out.println("Success "+ success.intValue());
assertEquals("document count", 6000,success.intValue());
}
catch(Exception e){
e.printStackTrace();
}
}
private void addDocs(){
WriteBatcher ihb2 = dmManager.newWriteBatcher();
stringTriple = "<abc>xml</abc>";
stringHandle = new StringHandle(stringTriple);
stringHandle.setFormat(Format.XML);
meta2 = new DocumentMetadataHandle().withCollections("XmlTransform");
meta2.setFormat(Format.XML);
ihb2.withBatchSize(27).withThreadCount(10);
ihb2.onBatchSuccess(
batch -> { }
)
.onBatchFailure(
(batch, throwable) -> {
throwable.printStackTrace();
});
dmManager.startJob(ihb2);
for (int j =0 ;j < 6000; j++){
String uri ="/local/string-"+ j;
ihb2.add(uri, meta2, stringHandle);
}
ihb2.flushAndWait();
}