Skip to content

QueryBatcher awaitCompletion with thread count = 1 hangs #920

Closed
@georgeajit

Description

@georgeajit

Version of MarkLogic Java Client API : develop branch (Post 4.0.4)
Version of MarkLogic Server: 9.0 nightly build (20180510) on 3 node cluster
Java version : 1.8
OS and version : All platforms

Input: To reproduced

@Test
  public void massDeleteSingleThread() throws Exception {
    Set<String> uriSet = new HashSet<>();

    Assert.assertTrue(uriSet.isEmpty());
    Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue() == 2000);

    QueryBatcher queryBatcher = dmManager.newQueryBatcher(
        new StructuredQueryBuilder().collection("DeleteListener"))
        .withBatchSize(11)
        .withThreadCount(1)
        .onUrisReady(batch -> {
          for (String s : batch.getItems()) {
            uriSet.add(s);

          }
        })
        .onQueryFailure(throwable -> {
          System.out.println("Exceptions thrown from callback onQueryFailure");
          throwable.printStackTrace();

        });

    JobTicket ticket = dmManager.startJob(queryBatcher);
    queryBatcher.awaitCompletion();
    dmManager.stopJob(ticket);

    Thread.currentThread().sleep(2000L);
    Assert.assertTrue(uriSet.size() == 2000);

    AtomicInteger successDocs = new AtomicInteger();
    HashSet<String> uris2 = new HashSet<>();
    StringBuffer failures2 = new StringBuffer();

    QueryBatcher deleteBatcher = dmManager.newQueryBatcher(uriSet.iterator())
        .withBatchSize(23)
        .withThreadCount(1)
        .onUrisReady(new DeleteListener())
        .withConsistentSnapshot()
        .onUrisReady(batch -> successDocs.addAndGet(batch.getItems().length))
        .onUrisReady(batch -> uris2.addAll(Arrays.asList(batch.getItems())))
        .onQueryFailure(throwable -> {
          throwable.printStackTrace();
          failures2.append("ERROR:[" + throwable + "]\n");
        });

    JobTicket delTicket = dmManager.startJob(deleteBatcher);
    deleteBatcher.awaitCompletion();
    dmManager.stopJob(delTicket);

    if (failures2.length() > 0)
      fail(failures2.toString());
    Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue() == 0);
  }

Actual output: The test hangs. Jenkins Job and nightly regression test suite systems timeout. Here is the jStack output.

2018-05-11 16:34:08
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.162-b12 mixed mode):

"Attach Listener" #75 daemon prio=9 os_prio=0 tid=0x00007f3290007800 nid=0x5e4d waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"pool-9-thread-1" #74 prio=5 os_prio=0 tid=0x00007f3278522000 nid=0x5d6b in Object.wait() [0x00007f32950cd000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:502)
	at com.marklogic.client.datamovement.impl.QueryBatcherImpl$BlockingRunsPolicy.rejectedExecution(QueryBatcherImpl.java:892)
	- locked <0x00000000da10ccf0> (a java.lang.Object)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.marklogic.client.datamovement.impl.QueryBatcherImpl$2.run(QueryBatcherImpl.java:772)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

"OkHttp ConnectionPool" #14 daemon prio=5 os_prio=0 tid=0x00007f3278061000 nid=0x5c90 in Object.wait() [0x00007f32acf1c000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:460)
	at okhttp3.ConnectionPool$1.run(ConnectionPool.java:67)
	- locked <0x00000000e6a1c3b0> (a okhttp3.ConnectionPool)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

"/0:0:0:0:0:0:0:1:44673 to /0:0:0:0:0:0:0:1:29902 workers Thread 3" #12 prio=5 os_prio=0 tid=0x00007f32c87a6800 nid=0x59d5 runnable [0x00007f32ae538000]
   java.lang.Thread.State: RUNNABLE
	at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
	at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
	at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
	at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	- locked <0x0000000085db9c00> (a sun.nio.ch.Util$3)
	- locked <0x0000000085db9bf0> (a java.util.Collections$UnmodifiableSet)
	- locked <0x0000000085db99e8> (a sun.nio.ch.EPollSelectorImpl)
	at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
	at org.gradle.internal.remote.internal.inet.SocketConnection$SocketInputStream.read(SocketConnection.java:178)
	at com.esotericsoftware.kryo.io.Input.fill(Input.java:139)
	at com.esotericsoftware.kryo.io.Input.require(Input.java:159)
	at com.esotericsoftware.kryo.io.Input.readByte(Input.java:255)
	at org.gradle.internal.serialize.kryo.KryoBackedDecoder.readByte(KryoBackedDecoder.java:80)
	at org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:63)
	at org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:52)
	at org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:79)
	at org.gradle.internal.remote.internal.hub.MessageHub$ConnectionReceive.run(MessageHub.java:263)
	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
	at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
	at java.lang.Thread.run(Thread.java:748)

"/0:0:0:0:0:0:0:1:44673 to /0:0:0:0:0:0:0:1:29902 workers Thread 2" #11 prio=5 os_prio=0 tid=0x00007f32c87a6000 nid=0x59d4 waiting on condition [0x00007f32ae63a000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x0000000085dc9738> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
	at org.gradle.internal.remote.internal.hub.queue.EndPointQueue.take(EndPointQueue.java:48)
	at org.gradle.internal.remote.internal.hub.MessageHub$ConnectionDispatch.run(MessageHub.java:314)
	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
	at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
	at java.lang.Thread.run(Thread.java:748)

"Test worker" #10 prio=5 os_prio=0 tid=0x00007f32c8775000 nid=0x59d3 waiting on condition [0x00007f32ae738000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000000da117078> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
	at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
	at com.marklogic.client.datamovement.impl.QueryBatcherImpl$QueryThreadPoolExecutor.awaitTermination(QueryBatcherImpl.java:914)
	at com.marklogic.client.datamovement.impl.QueryBatcherImpl.awaitCompletion(QueryBatcherImpl.java:277)
	at com.marklogic.client.datamovement.impl.QueryBatcherImpl.awaitCompletion(QueryBatcherImpl.java:283)
	at com.marklogic.client.datamovement.functionaltests.DeleteListenerTest.massDeleteSingleThread(DeleteListenerTest.java:219)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
	at com.sun.proxy.$Proxy1.processTestClass(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:108)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:146)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:128)
	at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
	at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
	at java.lang.Thread.run(Thread.java:748)

"Service Thread" #8 daemon prio=9 os_prio=0 tid=0x00007f32c82a4000 nid=0x59cd runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread1" #7 daemon prio=9 os_prio=0 tid=0x00007f32c8269800 nid=0x59cc waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #6 daemon prio=9 os_prio=0 tid=0x00007f32c8266800 nid=0x59cb waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007f32c8198800 nid=0x59ca runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f32c8109000 nid=0x59c9 in Object.wait() [0x00007f32af6f5000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x0000000085d0c150> (a java.lang.ref.ReferenceQueue$Lock)
	at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
	- locked <0x0000000085d0c150> (a java.lang.ref.ReferenceQueue$Lock)
	at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
	at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:212)

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f32c8104800 nid=0x59c8 in Object.wait() [0x00007f32af7f6000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x00000000860a5770> (a java.lang.ref.Reference$Lock)
	at java.lang.Object.wait(Object.java:502)
	at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
	- locked <0x00000000860a5770> (a java.lang.ref.Reference$Lock)
	at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"main" #1 prio=5 os_prio=0 tid=0x00007f32c800b800 nid=0x59bf waiting on condition [0x00007f32cf9ac000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x0000000085df8530> (a java.util.concurrent.CountDownLatch$Sync)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:69)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:44)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:83)
	at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:35)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:119)
	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:64)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:62)
	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:67)

"VM Thread" os_prio=0 tid=0x00007f32c80fc800 nid=0x59c7 runnable 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f32c8021000 nid=0x59c4 runnable 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f32c8022800 nid=0x59c5 runnable 

"VM Periodic Task Thread" os_prio=0 tid=0x00007f32c82a9800 nid=0x59ce waiting on condition 

JNI global references: 274


Expected output: This test uses DeleteListener, hence all documents should have been deleted.

Alternatives: None

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions