Skip to content

Commit 3bd61ca

Browse files
committed
add
1 parent 339df47 commit 3bd61ca

File tree

5 files changed

+34
-13
lines changed

5 files changed

+34
-13
lines changed

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,17 @@ object ApplicationState extends Enumeration {
9191
type ApplicationState = Value
9292
val PENDING, RUNNING, FINISHED, KILLED, FAILED, ZOMBIE, NOT_FOUND, UNKNOWN = Value
9393

94-
def isFailed(state: ApplicationState): Boolean = state match {
94+
def isFailed(
95+
state: ApplicationState,
96+
appOperation: Option[ApplicationOperation]): Boolean = state match {
9597
case FAILED => true
9698
case KILLED => true
97-
case NOT_FOUND => true
99+
case NOT_FOUND => appOperation match {
100+
// For YARN and Kubernetes operations, if the application is not found, treat it as failed
101+
// to prevent mistakenly set unsuccessful applications to the finished state.
102+
case Some(_: YarnApplicationOperation) | Some(_: KubernetesApplicationOperation) => true
103+
case _ => false
104+
}
98105
case _ => false
99106
}
100107

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
136136
val shouldDelete = cleanupDriverPodStrategy match {
137137
case NONE => false
138138
case ALL => true
139-
case COMPLETED => !ApplicationState.isFailed(notification.getValue)
139+
case COMPLETED => !ApplicationState.isFailed(notification.getValue, Some(this))
140140
}
141141
if (shouldDelete) {
142142
deletePod(kubernetesInfo, removed.podName.orNull, appLabel)
@@ -573,8 +573,9 @@ object KubernetesApplicationOperation extends Logging {
573573
case Some(containerAppState) => containerAppState
574574
case None => podAppState
575575
}
576-
val applicationError =
577-
if (ApplicationState.isFailed(applicationState)) {
576+
val applicationError = {
577+
// here the applicationState could not been NOT_FOUND, safe to use None ApplicationOperation
578+
if (ApplicationState.isFailed(applicationState, appOperation = None)) {
578579
val errorMap = containerStatusToBuildAppState.map { cs =>
579580
Map(
580581
"Pod" -> podName,
@@ -588,6 +589,7 @@ object KubernetesApplicationOperation extends Logging {
588589
} else {
589590
None
590591
}
592+
}
591593
applicationState -> applicationError
592594
}
593595

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ class KyuubiApplicationManager(metadataManager: Option[MetadataManager])
103103
operations.find(_.isInstanceOf[KubernetesApplicationOperation])
104104
.map(_.asInstanceOf[KubernetesApplicationOperation])
105105
}
106+
107+
private[kyuubi] def getApplicationOperation(appMgrInfo: ApplicationManagerInfo)
108+
: Option[ApplicationOperation] = {
109+
operations.find(_.isSupported(appMgrInfo))
110+
}
106111
}
107112

108113
object KyuubiApplicationManager {

kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import com.google.common.annotations.VisibleForTesting
2525

2626
import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Utils}
2727
import org.apache.kyuubi.config.KyuubiConf
28-
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState, KillResponse, ProcBuilder}
28+
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationOperation, ApplicationState, KillResponse, ProcBuilder}
2929
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
3030
import org.apache.kyuubi.metrics.MetricsConstants.OPERATION_OPEN
3131
import org.apache.kyuubi.metrics.MetricsSystem
@@ -99,6 +99,8 @@ class BatchJobSubmission(
9999
getOperationLog)
100100
}
101101

102+
private lazy val appOperation = applicationManager.getApplicationOperation(builder.appMgrInfo())
103+
102104
def startupProcessAlive: Boolean =
103105
builder.processLaunched && Option(builder.process).exists(_.isAlive)
104106

@@ -275,7 +277,7 @@ class BatchJobSubmission(
275277
try {
276278
info(s"Submitting $batchType batch[$batchId] job:\n$builder")
277279
val process = builder.start
278-
while (process.isAlive && !applicationFailed(_applicationInfo)) {
280+
while (process.isAlive && !applicationFailed(_applicationInfo, appOperation)) {
279281
doUpdateApplicationInfoMetadataIfNeeded()
280282
process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
281283
}
@@ -284,7 +286,7 @@ class BatchJobSubmission(
284286
doUpdateApplicationInfoMetadataIfNeeded()
285287
}
286288

287-
if (applicationFailed(_applicationInfo)) {
289+
if (applicationFailed(_applicationInfo, appOperation)) {
288290
Utils.terminateProcess(process, applicationStartupDestroyTimeout)
289291
throw new KyuubiException(s"Batch job failed: ${_applicationInfo}")
290292
}
@@ -332,7 +334,7 @@ class BatchJobSubmission(
332334
info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.")
333335
return
334336
}
335-
if (applicationFailed(_applicationInfo)) {
337+
if (applicationFailed(_applicationInfo, appOperation)) {
336338
throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
337339
}
338340
updateBatchMetadata()
@@ -341,7 +343,7 @@ class BatchJobSubmission(
341343
Thread.sleep(applicationCheckInterval)
342344
updateApplicationInfoMetadataIfNeeded()
343345
}
344-
if (applicationFailed(_applicationInfo)) {
346+
if (applicationFailed(_applicationInfo, appOperation)) {
345347
throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
346348
}
347349
}
@@ -445,8 +447,12 @@ class BatchJobSubmission(
445447
}
446448

447449
object BatchJobSubmission {
448-
def applicationFailed(applicationStatus: Option[ApplicationInfo]): Boolean = {
449-
applicationStatus.map(_.state).exists(ApplicationState.isFailed)
450+
def applicationFailed(
451+
applicationStatus: Option[ApplicationInfo],
452+
appOperation: Option[ApplicationOperation]): Boolean = {
453+
applicationStatus.map(_.state).exists { state =>
454+
ApplicationState.isFailed(state, appOperation)
455+
}
450456
}
451457

452458
def applicationTerminated(applicationStatus: Option[ApplicationInfo]): Boolean = {

kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,9 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
129129
metadata: Metadata,
130130
batchAppStatus: Option[ApplicationInfo]): Batch = {
131131
batchAppStatus.map { appStatus =>
132+
val appOp = sessionManager.applicationManager.getApplicationOperation(metadata.appMgrInfo)
132133
val currentBatchState =
133-
if (BatchJobSubmission.applicationFailed(batchAppStatus)) {
134+
if (BatchJobSubmission.applicationFailed(batchAppStatus, appOp)) {
134135
OperationState.ERROR.toString
135136
} else if (BatchJobSubmission.applicationTerminated(batchAppStatus)) {
136137
OperationState.FINISHED.toString

0 commit comments

Comments
 (0)