@@ -22,10 +22,11 @@ import java.util.concurrent.TimeUnit
22
22
23
23
import com .codahale .metrics .MetricRegistry
24
24
import com .google .common .annotations .VisibleForTesting
25
+ import org .apache .commons .lang3 .StringUtils
25
26
26
27
import org .apache .kyuubi .{KyuubiException , KyuubiSQLException , Utils }
27
28
import org .apache .kyuubi .config .KyuubiConf
28
- import org .apache .kyuubi .engine .{ApplicationInfo , ApplicationState , KillResponse , ProcBuilder }
29
+ import org .apache .kyuubi .engine .{ApplicationInfo , ApplicationOperation , ApplicationState , KillResponse , ProcBuilder }
29
30
import org .apache .kyuubi .engine .spark .SparkBatchProcessBuilder
30
31
import org .apache .kyuubi .metrics .MetricsConstants .OPERATION_OPEN
31
32
import org .apache .kyuubi .metrics .MetricsSystem
@@ -99,6 +100,8 @@ class BatchJobSubmission(
99
100
getOperationLog)
100
101
}
101
102
103
+ private lazy val appOperation = applicationManager.getApplicationOperation(builder.appMgrInfo())
104
+
102
105
def startupProcessAlive : Boolean =
103
106
builder.processLaunched && Option (builder.process).exists(_.isAlive)
104
107
@@ -212,6 +215,20 @@ class BatchJobSubmission(
212
215
metadata match {
213
216
case Some (metadata) if metadata.peerInstanceClosed =>
214
217
setState(OperationState .CANCELED )
218
+ case Some (metadata)
219
+ // in case it has been updated by peer kyuubi instance, see KYUUBI #6278
220
+ if StringUtils .isNotBlank(metadata.engineState) &&
221
+ ApplicationState .isTerminated(ApplicationState .withName(metadata.engineState)) =>
222
+ _applicationInfo = Some (new ApplicationInfo (
223
+ id = metadata.engineId,
224
+ name = metadata.engineName,
225
+ state = ApplicationState .withName(metadata.engineState),
226
+ url = Option (metadata.engineUrl),
227
+ error = metadata.engineError))
228
+ if (applicationFailed(_applicationInfo, appOperation)) {
229
+ throw new KyuubiException (
230
+ s " $batchType batch[ $batchId] job failed: ${_applicationInfo}" )
231
+ }
215
232
case Some (metadata) if metadata.state == OperationState .PENDING .toString =>
216
233
// case 1: new batch job created using batch impl v2
217
234
// case 2: batch job from recovery, do submission only when previous state is
@@ -275,7 +292,7 @@ class BatchJobSubmission(
275
292
try {
276
293
info(s " Submitting $batchType batch[ $batchId] job: \n $builder" )
277
294
val process = builder.start
278
- while (process.isAlive && ! applicationFailed(_applicationInfo)) {
295
+ while (process.isAlive && ! applicationFailed(_applicationInfo, appOperation )) {
279
296
doUpdateApplicationInfoMetadataIfNeeded()
280
297
process.waitFor(applicationCheckInterval, TimeUnit .MILLISECONDS )
281
298
}
@@ -284,7 +301,7 @@ class BatchJobSubmission(
284
301
doUpdateApplicationInfoMetadataIfNeeded()
285
302
}
286
303
287
- if (applicationFailed(_applicationInfo)) {
304
+ if (applicationFailed(_applicationInfo, appOperation )) {
288
305
Utils .terminateProcess(process, applicationStartupDestroyTimeout)
289
306
throw new KyuubiException (s " Batch job failed: ${_applicationInfo}" )
290
307
}
@@ -329,10 +346,9 @@ class BatchJobSubmission(
329
346
setStateIfNotCanceled(OperationState .RUNNING )
330
347
}
331
348
if (_applicationInfo.isEmpty) {
332
- info(s " The $batchType batch[ $batchId] job: $appId not found, assume that it has finished. " )
333
- return
349
+ _applicationInfo = Some (ApplicationInfo .NOT_FOUND )
334
350
}
335
- if (applicationFailed(_applicationInfo)) {
351
+ if (applicationFailed(_applicationInfo, appOperation )) {
336
352
throw new KyuubiException (s " $batchType batch[ $batchId] job failed: ${_applicationInfo}" )
337
353
}
338
354
updateBatchMetadata()
@@ -341,7 +357,7 @@ class BatchJobSubmission(
341
357
Thread .sleep(applicationCheckInterval)
342
358
updateApplicationInfoMetadataIfNeeded()
343
359
}
344
- if (applicationFailed(_applicationInfo)) {
360
+ if (applicationFailed(_applicationInfo, appOperation )) {
345
361
throw new KyuubiException (s " $batchType batch[ $batchId] job failed: ${_applicationInfo}" )
346
362
}
347
363
}
@@ -445,8 +461,12 @@ class BatchJobSubmission(
445
461
}
446
462
447
463
object BatchJobSubmission {
448
- def applicationFailed (applicationStatus : Option [ApplicationInfo ]): Boolean = {
449
- applicationStatus.map(_.state).exists(ApplicationState .isFailed)
464
+ def applicationFailed (
465
+ applicationStatus : Option [ApplicationInfo ],
466
+ appOperation : Option [ApplicationOperation ]): Boolean = {
467
+ applicationStatus.map(_.state).exists { state =>
468
+ ApplicationState .isFailed(state, appOperation)
469
+ }
450
470
}
451
471
452
472
def applicationTerminated (applicationStatus : Option [ApplicationInfo ]): Boolean = {
0 commit comments