Skip to content

[KYUUBI #7028] Persist the kubernetes application terminate state into metastore for app info store fallback #7029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,35 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {

}

override def upsertKubernetesEngineInfo(metadata: KubernetesEngineInfo): Unit = {
insertKubernetesEngineInfo(metadata)
updateKubernetesEngineInfo(metadata)
override def upsertKubernetesEngineInfo(engineInfo: KubernetesEngineInfo): Unit = {
dialect.insertOrReplace(
KUBERNETES_ENGINE_INFO_TABLE,
KUBERNETES_ENGINE_INFO_COLUMNS_TO_INSERT,
KUBERNETES_ENGINE_INFO_COLUMNS_TO_REPLACE,
KUBERNETES_ENGINE_INFO_KEY_COLUMN) match {
case Some(query) =>
JdbcUtils.withConnection { connection =>
val currentTime = System.currentTimeMillis()
execute(
connection,
query,
engineInfo.identifier,
engineInfo.context.orNull,
engineInfo.namespace.orNull,
engineInfo.podName,
engineInfo.podState,
engineInfo.containerState,
engineInfo.engineId,
engineInfo.engineName,
engineInfo.engineState,
engineInfo.engineError.orNull,
currentTime,
currentTime)
}
case None =>
insertKubernetesEngineInfo(engineInfo)
updateKubernetesEngineInfo(engineInfo)
}
}

override def getKubernetesMetaEngineInfo(identifier: String): KubernetesEngineInfo = {
Expand Down Expand Up @@ -785,7 +811,8 @@ object JDBCMetadataStore {
"end_time",
"peer_instance_closed").mkString(",")
private val KUBERNETES_ENGINE_INFO_TABLE = "k8s_engine_info"
private val KUBERNETES_ENGINE_INFO_COLUMNS = Seq(
private val KUBERNETES_ENGINE_INFO_KEY_COLUMN = "identifier"
private val KUBERNETES_ENGINE_INFO_COLUMNS_TO_INSERT = Seq(
"identifier",
"context",
"namespace",
Expand All @@ -797,5 +824,19 @@ object JDBCMetadataStore {
"engine_state",
"engine_error",
"create_time",
"update_time").mkString(",")
"update_time")
private val KUBERNETES_ENGINE_INFO_COLUMNS_TO_REPLACE = Seq(
"context",
"namespace",
"pod_name",
"pod_state",
"container_state",
"engine_id",
"engine_name",
"engine_state",
"engine_error",
"update_time")
private val KUBERNETES_ENGINE_INFO_COLUMNS =
KUBERNETES_ENGINE_INFO_COLUMNS_TO_INSERT.mkString(",")

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ package org.apache.kyuubi.server.metadata.jdbc

trait JdbcDatabaseDialect {
def limitClause(limit: Int, offset: Int): String
def insertOrReplace(
table: String,
colsToInsert: Seq[String],
colsToReplace: Seq[String],
keyCol: String): Option[String] = None
}

class GenericDatabaseDialect extends JdbcDatabaseDialect {
Expand All @@ -27,6 +32,47 @@ class GenericDatabaseDialect extends JdbcDatabaseDialect {
}
}

class SQLiteDatabaseDialect extends GenericDatabaseDialect {}
class MySQLDatabaseDialect extends GenericDatabaseDialect {}
class PostgreSQLDatabaseDialect extends GenericDatabaseDialect {}
class SQLiteDatabaseDialect extends GenericDatabaseDialect {
override def insertOrReplace(
table: String,
colsToInsert: Seq[String],
colsToReplace: Seq[String],
keyCol: String): Option[String] = {
Some(
s"""
|INSERT OR REPLACE INTO $table (${colsToInsert.mkString(",")})
|VALUES (${colsToInsert.map(_ => "?").mkString(",")})
|""".stripMargin)
}
}
class MySQLDatabaseDialect extends GenericDatabaseDialect {
override def insertOrReplace(
table: String,
colsToInsert: Seq[String],
colsToReplace: Seq[String],
keyCol: String): Option[String] = {
Some(
s"""
|INSERT INTO $table (${colsToInsert.mkString(",")})
|VALUES (${colsToInsert.map(_ => "?").mkString(",")})
|ON DUPLICATE KEY UPDATE
|${colsToReplace.map(c => s"$c = VALUES($c)").mkString(",")}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

|""".stripMargin)
}
}
class PostgreSQLDatabaseDialect extends GenericDatabaseDialect {
override def insertOrReplace(
table: String,
colsToInsert: Seq[String],
colsToReplace: Seq[String],
keyCol: String): Option[String] = {
Some(
s"""
|INSERT INTO $table (${colsToInsert.mkString(",")})
|VALUES (${colsToInsert.map(_ => "?").mkString(",")})
|ON CONFLICT ($keyCol)
|DO UPDATE SET
|${colsToReplace.map(c => s"$c = EXCLUDED.$c").mkString(",")}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

|""".stripMargin)
}
}
Loading