Skip to content

WriteBatcher hangs in awaitCompletion() after forest failover #744

Closed
@srinathgit

Description

@srinathgit

The following test is run using gradle on a 3 node cluster and tests forest fail over scenario when one node is shutdown during the run and the forest fails over to another node . All the documents are getting written fine. After all documents are written, WriteBatcher hangs in awaitCompletion(). The thread dump is attached.

@Test
	public void testStopOneNode() throws Exception{
		try{
			final String query1 = "fn:count(fn:doc())";
			
			final AtomicInteger successCount = new AtomicInteger(0);
			
			final AtomicBoolean failState = new AtomicBoolean(false);
			final AtomicInteger failCount = new AtomicInteger(0);
							
			WriteBatcher ihb2 =  dmManager.newWriteBatcher();
			ihb2.withBatchSize(10);
			ihb2.withThreadCount(10);
			
			WriteFailureListener[] failureListeners = ihb2.getBatchFailureListeners();
			List<WriteFailureListener> failure = Arrays.asList(failureListeners);
			
			failure = new ArrayList<WriteFailureListener>(failure);	
			failure.add( new HostAvailabilityListener(dmManager)
						.withSuspendTimeForHostUnavailable(Duration.ofSeconds(15))
						.withMinHosts(2));
					 
			ihb2.setBatchFailureListeners(failure.toArray(new WriteFailureListener[failure.size()]));
			ihb2.onBatchSuccess(
				   batch -> {

						successCount.addAndGet(batch.getItems().length);
						System.out.println("Success Host: "+ batch.getClient().getHost());
						System.out.println("Success batch number: "+ batch.getJobBatchNumber());
						System.out.println("Success Job writes so far: "+ batch.getJobWritesSoFar());
					  }
					)
					.onBatchFailure(
					  (batch, throwable) -> {
						  System.out.println("Failed batch number: "+ batch.getJobBatchNumber());
									 
						  throwable.printStackTrace();
						  failState.set(true);
						  failCount.addAndGet(batch.getItems().length);
					  });
			
			
			writeTicket = dmManager.startJob(ihb2);    
			boolean isRunning = true;
			for (int j =0 ;j < 20000; j++){
				String uri ="/local/ABC-"+ j;
				ihb2.add(uri, stringHandle);
				if (dmManager.getJobReport(writeTicket).getSuccessEventsCount() > 2000 && isRunning){
					isRunning = false;
                                        serverStartStop(hostNames[hostNames.length -1], "stop");
					Thread.currentThread().sleep(40000L);
					
				}
				
			}
		
			
			ihb2.flushAndWait();
		   
			
			System.out.println("Fail : "+failCount.intValue());
			System.out.println("Success : "+successCount.intValue());
			System.out.println("Count : "+ dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue());
	  
			Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue()==20000);
			
		}
		catch(Exception e){
			e.printStackTrace();
		}
		
	}
	

Thread:

   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x00000007213cbd78> (a java.util.concurrent.FutureTask)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl$CompletableThreadPoolExecutor.awaitCompletion(WriteBatcherImpl.java:1185)
        - locked <0x00000007213cbd78> (a java.util.concurrent.FutureTask)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl.awaitCompletion(WriteBatcherImpl.java:654)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl.awaitCompletion(WriteBatcherImpl.java:660)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flush(WriteBatcherImpl.java:547)
        at com.marklogic.client.datamovement.impl.WriteBatcherImpl.flushAndWait(WriteBatcherImpl.java:518)
        at com.marklogic.client.datamovement.functionaltests.WBFailoverTest.testStopOneNode(WBFailoverTest.java:234)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
        at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
        at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
        at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
        at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
        at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
        at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
        at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
        at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
        at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
        at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
        at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
        at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355)
        at org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions