Skip to content

Commit 02a6b13

Browse files
committed
[KYUUBI #7028] Persist the kubernetes application terminate state into metastore for app info store fallback
### Why are the changes needed? 1. Persist the kubernetes application terminate info into metastore to prevent the event lose. 2. If it can not get the application info from informer application info store, fallback to get the application info from metastore instead of return NOT_FOUND directly. 3. It is critical because if we return false application state, it might cause data quality issue. ### How was this patch tested? UT and IT. <img width="1917" alt="image" src="https://github.com/user-attachments/assets/306f417c-5037-4869-904d-dcf657ff8f60" /> ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7029 from turboFei/kubernetes_state. Closes #7028 9f2bade [Wang, Fei] generic dialect 186cc69 [Wang, Fei] nit 82ea626 [Wang, Fei] Add pod name 4c59beb [Wang, Fei] Refine 327a0d5 [Wang, Fei] Remove create_time from k8s engine info 12c24b1 [Wang, Fei] do not use MYSQL deprecated VALUES(col) becf9d1 [Wang, Fei] insert or replace d167623 [Wang, Fei] migration Authored-by: Wang, Fei <[email protected]> Signed-off-by: Wang, Fei <[email protected]>
1 parent 70c03ef commit 02a6b13

17 files changed

+639
-14
lines changed

.rat-excludes

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ build/scala-*/**
4343
**/metadata-store-schema*.sql
4444
**/*.derby.sql
4545
**/*.mysql.sql
46+
**/*.postgresql.sql
4647
**/*.sqlite.sql
4748
**/node/**
4849
**/web-ui/dist/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
SELECT '< KYUUBI-7028: Persist Kubernetes metadata into metastore' AS ' ';
2+
3+
CREATE TABLE IF NOT EXISTS k8s_engine_info(
4+
key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id',
5+
identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID',
6+
context varchar(32) COMMENT 'the kubernetes context',
7+
namespace varchar(255) COMMENT 'the kubernetes namespace',
8+
pod_name varchar(255) NOT NULL COMMENT 'the kubernetes pod name',
9+
pod_state varchar(32) COMMENT 'the kubernetes pod state',
10+
container_state mediumtext COMMENT 'the kubernetes container state',
11+
engine_id varchar(128) COMMENT 'the engine id',
12+
engine_name mediumtext COMMENT 'the engine name',
13+
engine_state varchar(32) COMMENT 'the engine state',
14+
engine_error mediumtext COMMENT 'the engine diagnose',
15+
update_time bigint COMMENT 'the metadata update time',
16+
UNIQUE INDEX unique_identifier_index(identifier)
17+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
-- the metadata table ddl
2+
3+
CREATE TABLE IF NOT EXISTS metadata(
4+
key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id',
5+
identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID',
6+
session_type varchar(32) NOT NULL COMMENT 'the session type, SQL or BATCH',
7+
real_user varchar(255) NOT NULL COMMENT 'the real user',
8+
user_name varchar(255) NOT NULL COMMENT 'the user name, might be a proxy user',
9+
ip_address varchar(128) COMMENT 'the client ip address',
10+
kyuubi_instance varchar(1024) COMMENT 'the kyuubi instance that creates this',
11+
state varchar(128) NOT NULL COMMENT 'the session state',
12+
resource varchar(1024) COMMENT 'the main resource',
13+
class_name varchar(1024) COMMENT 'the main class name',
14+
request_name varchar(1024) COMMENT 'the request name',
15+
request_conf mediumtext COMMENT 'the request config map',
16+
request_args mediumtext COMMENT 'the request arguments',
17+
create_time BIGINT NOT NULL COMMENT 'the metadata create time',
18+
engine_type varchar(32) NOT NULL COMMENT 'the engine type',
19+
cluster_manager varchar(128) COMMENT 'the engine cluster manager',
20+
engine_open_time bigint COMMENT 'the engine open time',
21+
engine_id varchar(128) COMMENT 'the engine application id',
22+
engine_name mediumtext COMMENT 'the engine application name',
23+
engine_url varchar(1024) COMMENT 'the engine tracking url',
24+
engine_state varchar(32) COMMENT 'the engine application state',
25+
engine_error mediumtext COMMENT 'the engine application diagnose',
26+
end_time bigint COMMENT 'the metadata end time',
27+
priority int NOT NULL DEFAULT 10 COMMENT 'the application priority, high value means high priority',
28+
peer_instance_closed boolean default '0' COMMENT 'closed by peer kyuubi instance',
29+
UNIQUE INDEX unique_identifier_index(identifier),
30+
INDEX user_name_index(user_name),
31+
INDEX engine_type_index(engine_type),
32+
INDEX create_time_index(create_time),
33+
-- See more detail about this index in ./005-KYUUBI-5327.mysql.sql
34+
INDEX priority_create_time_index(priority DESC, create_time ASC)
35+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
36+
37+
CREATE TABLE IF NOT EXISTS k8s_engine_info(
38+
key_id bigint PRIMARY KEY AUTO_INCREMENT COMMENT 'the auto increment key id',
39+
identifier varchar(36) NOT NULL COMMENT 'the identifier id, which is an UUID',
40+
context varchar(32) COMMENT 'the kubernetes context',
41+
namespace varchar(255) COMMENT 'the kubernetes namespace',
42+
pod_name varchar(255) NOT NULL COMMENT 'the kubernetes pod name',
43+
pod_state varchar(32) COMMENT 'the kubernetes pod state',
44+
container_state mediumtext COMMENT 'the kubernetes container state',
45+
engine_id varchar(128) COMMENT 'the engine id',
46+
engine_name mediumtext COMMENT 'the engine name',
47+
engine_state varchar(32) COMMENT 'the engine state',
48+
engine_error mediumtext COMMENT 'the engine diagnose',
49+
update_time bigint COMMENT 'the metadata update time',
50+
UNIQUE INDEX unique_identifier_index(identifier)
51+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
SELECT '< Upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' ';
2+
SOURCE 006-KYUUBI-7028.mysql.sql;
3+
SELECT '< Finished upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' ';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
SELECT '< KYUUBI-7028: Persist Kubernetes metadata into metastore' AS ' ';
2+
3+
CREATE TABLE IF NOT EXISTS k8s_engine_info(
4+
key_id bigserial PRIMARY KEY,
5+
identifier varchar(36) NOT NULL,
6+
context varchar(32),
7+
namespace varchar(255),
8+
pod_name varchar(255) NOT NULL,
9+
pod_state varchar(32),
10+
container_state text,
11+
engine_id varchar(128),
12+
engine_name text,
13+
engine_state varchar(32),
14+
engine_error text,
15+
update_time bigint NOT NULL
16+
);
17+
18+
COMMENT ON COLUMN k8s_engine_info.key_id IS 'the auto increment key id';
19+
COMMENT ON COLUMN k8s_engine_info.identifier IS 'the identifier id, which is an UUID';
20+
COMMENT ON COLUMN k8s_engine_info.context IS 'the kubernetes context';
21+
COMMENT ON COLUMN k8s_engine_info.namespace IS 'the kubernetes namespace';
22+
COMMENT ON COLUMN k8s_engine_info.pod_name IS 'the kubernetes pod name';
23+
COMMENT ON COLUMN k8s_engine_info.pod_state IS 'the kubernetes pod state';
24+
COMMENT ON COLUMN k8s_engine_info.container_state IS 'the kubernetes container state';
25+
COMMENT ON COLUMN k8s_engine_info.engine_id IS 'the engine id';
26+
COMMENT ON COLUMN k8s_engine_info.engine_name IS 'the engine name';
27+
COMMENT ON COLUMN k8s_engine_info.engine_state IS 'the engine state';
28+
COMMENT ON COLUMN k8s_engine_info.engine_error IS 'the engine diagnose';
29+
COMMENT ON COLUMN k8s_engine_info.update_time IS 'the metadata update time';
30+
31+
CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
CREATE TABLE IF NOT EXISTS metadata(
2+
key_id bigserial PRIMARY KEY,
3+
identifier varchar(36) NOT NULL,
4+
session_type varchar(32) NOT NULL,
5+
real_user varchar(255) NOT NULL,
6+
user_name varchar(255) NOT NULL,
7+
ip_address varchar(128),
8+
kyuubi_instance varchar(1024),
9+
state varchar(128) NOT NULL,
10+
resource varchar(1024),
11+
class_name varchar(1024),
12+
request_name varchar(1024),
13+
request_conf text,
14+
request_args text,
15+
create_time bigint NOT NULL,
16+
engine_type varchar(32) NOT NULL,
17+
cluster_manager varchar(128),
18+
engine_open_time bigint,
19+
engine_id varchar(128),
20+
engine_name text,
21+
engine_url varchar(1024),
22+
engine_state varchar(32),
23+
engine_error text,
24+
end_time bigint,
25+
priority int NOT NULL DEFAULT 10,
26+
peer_instance_closed boolean DEFAULT FALSE
27+
);
28+
29+
COMMENT ON COLUMN metadata.key_id IS 'the auto increment key id';
30+
COMMENT ON COLUMN metadata.identifier IS 'the identifier id, which is an UUID';
31+
COMMENT ON COLUMN metadata.session_type IS 'the session type, SQL or BATCH';
32+
COMMENT ON COLUMN metadata.real_user IS 'the real user';
33+
COMMENT ON COLUMN metadata.user_name IS 'the user name, might be a proxy user';
34+
COMMENT ON COLUMN metadata.ip_address IS 'the client ip address';
35+
COMMENT ON COLUMN metadata.kyuubi_instance IS 'the kyuubi instance that creates this';
36+
COMMENT ON COLUMN metadata.state IS 'the session state';
37+
COMMENT ON COLUMN metadata.resource IS 'the main resource';
38+
COMMENT ON COLUMN metadata.class_name IS 'the main class name';
39+
COMMENT ON COLUMN metadata.request_name IS 'the request name';
40+
COMMENT ON COLUMN metadata.request_conf IS 'the request config map';
41+
COMMENT ON COLUMN metadata.request_args IS 'the request arguments';
42+
COMMENT ON COLUMN metadata.create_time IS 'the metadata create time';
43+
COMMENT ON COLUMN metadata.engine_type IS 'the engine type';
44+
COMMENT ON COLUMN metadata.cluster_manager IS 'the engine cluster manager';
45+
COMMENT ON COLUMN metadata.engine_open_time IS 'the engine open time';
46+
COMMENT ON COLUMN metadata.engine_id IS 'the engine application id';
47+
COMMENT ON COLUMN metadata.engine_name IS 'the engine application name';
48+
COMMENT ON COLUMN metadata.engine_url IS 'the engine tracking url';
49+
COMMENT ON COLUMN metadata.engine_state IS 'the engine application state';
50+
COMMENT ON COLUMN metadata.engine_error IS 'the engine application diagnose';
51+
COMMENT ON COLUMN metadata.end_time IS 'the metadata end time';
52+
COMMENT ON COLUMN metadata.priority IS 'the application priority, high value means high priority';
53+
COMMENT ON COLUMN metadata.peer_instance_closed IS 'closed by peer kyuubi instance';
54+
55+
CREATE UNIQUE INDEX IF NOT EXISTS unique_identifier_index ON metadata(identifier);
56+
CREATE INDEX IF NOT EXISTS user_name_index ON metadata(user_name);
57+
CREATE INDEX IF NOT EXISTS engine_type_index ON metadata(engine_type);
58+
CREATE INDEX IF NOT EXISTS create_time_index ON metadata(create_time);
59+
CREATE INDEX IF NOT EXISTS priority_create_time_index ON metadata(priority DESC, create_time ASC);
60+
61+
CREATE TABLE IF NOT EXISTS k8s_engine_info(
62+
key_id bigserial PRIMARY KEY,
63+
identifier varchar(36) NOT NULL,
64+
context varchar(32),
65+
namespace varchar(255),
66+
pod_name varchar(255) NOT NULL,
67+
pod_state varchar(32),
68+
container_state text,
69+
engine_id varchar(128),
70+
engine_name text,
71+
engine_state varchar(32),
72+
engine_error text,
73+
update_time bigint NOT NULL
74+
);
75+
76+
COMMENT ON COLUMN k8s_engine_info.key_id IS 'the auto increment key id';
77+
COMMENT ON COLUMN k8s_engine_info.identifier IS 'the identifier id, which is an UUID';
78+
COMMENT ON COLUMN k8s_engine_info.context IS 'the kubernetes context';
79+
COMMENT ON COLUMN k8s_engine_info.namespace IS 'the kubernetes namespace';
80+
COMMENT ON COLUMN k8s_engine_info.pod_name IS 'the kubernetes pod name';
81+
COMMENT ON COLUMN k8s_engine_info.pod_state IS 'the kubernetes pod state';
82+
COMMENT ON COLUMN k8s_engine_info.container_state IS 'the kubernetes container state';
83+
COMMENT ON COLUMN k8s_engine_info.engine_id IS 'the engine id';
84+
COMMENT ON COLUMN k8s_engine_info.engine_name IS 'the engine name';
85+
COMMENT ON COLUMN k8s_engine_info.engine_state IS 'the engine state';
86+
COMMENT ON COLUMN k8s_engine_info.engine_error IS 'the engine diagnose';
87+
COMMENT ON COLUMN k8s_engine_info.update_time IS 'the metadata update time';
88+
89+
CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
SELECT '< Upgrading MetaStore schema from 1.9.0 to 1.11.0 >' AS ' ';
2+
\i 001-KYUUBI-7028.postgresql.sql
3+
SELECT '< Finished upgrading MetaStore schema from 1.9.0 to 1.11.0 >' AS ' ';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
-- the k8s_engine_info table ddl
2+
CREATE TABLE IF NOT EXISTS k8s_engine_info(
3+
key_id INTEGER PRIMARY KEY AUTOINCREMENT, -- the auto increment key id
4+
identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID
5+
context varchar(32), -- the kubernetes context
6+
namespace varchar(255), -- the kubernetes namespace
7+
pod_name varchar(255) NOT NULL, -- the kubernetes pod name
8+
pod_state varchar(32), -- the kubernetes pod state
9+
container_state mediumtext, -- the kubernetes container state
10+
engine_id varchar(128), -- the engine id
11+
engine_name mediumtext, -- the engine name
12+
engine_state varchar(32), -- the engine state
13+
engine_error mediumtext, -- the engine diagnose
14+
update_time bigint -- the metadata update time
15+
);
16+
17+
CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
-- the metadata table ddl
2+
3+
CREATE TABLE IF NOT EXISTS metadata(
4+
key_id INTEGER PRIMARY KEY AUTOINCREMENT, -- the auto increment key id
5+
identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID
6+
session_type varchar(32) NOT NULL, -- the session type, SQL or BATCH
7+
real_user varchar(255) NOT NULL, -- the real user
8+
user_name varchar(255) NOT NULL, -- the user name, might be a proxy user
9+
ip_address varchar(128), -- the client ip address
10+
kyuubi_instance varchar(1024), -- the kyuubi instance that creates this
11+
state varchar(128) NOT NULL, -- the session state
12+
resource varchar(1024), -- the main resource
13+
class_name varchar(1024), -- the main class name
14+
request_name varchar(1024), -- the request name
15+
request_conf mediumtext, -- the request config map
16+
request_args mediumtext, -- the request arguments
17+
create_time BIGINT NOT NULL, -- the metadata create time
18+
engine_type varchar(32) NOT NULL, -- the engine type
19+
cluster_manager varchar(128), -- the engine cluster manager
20+
engine_open_time bigint, -- the engine open time
21+
engine_id varchar(128), -- the engine application id
22+
engine_name mediumtext, -- the engine application name
23+
engine_url varchar(1024), -- the engine tracking url
24+
engine_state varchar(32), -- the engine application state
25+
engine_error mediumtext, -- the engine application diagnose
26+
end_time bigint, -- the metadata end time
27+
priority INTEGER NOT NULL DEFAULT 10, -- the application priority, high value means high priority
28+
peer_instance_closed boolean default '0' -- closed by peer kyuubi instance
29+
);
30+
31+
CREATE UNIQUE INDEX IF NOT EXISTS metadata_unique_identifier_index ON metadata(identifier);
32+
33+
CREATE INDEX IF NOT EXISTS metadata_user_name_index ON metadata(user_name);
34+
35+
CREATE INDEX IF NOT EXISTS metadata_engine_type_index ON metadata(engine_type);
36+
37+
CREATE INDEX IF NOT EXISTS metadata_create_time_index ON metadata(create_time);
38+
39+
CREATE INDEX IF NOT EXISTS metadata_priority_create_time_index ON metadata(priority, create_time);
40+
41+
-- the k8s_engine_info table ddl
42+
CREATE TABLE IF NOT EXISTS k8s_engine_info(
43+
key_id INTEGER PRIMARY KEY AUTOINCREMENT, -- the auto increment key id
44+
identifier varchar(36) NOT NULL, -- the identifier id, which is an UUID
45+
context varchar(32), -- the kubernetes context
46+
namespace varchar(255), -- the kubernetes namespace
47+
pod_name varchar(255) NOT NULL, -- the kubernetes pod name
48+
pod_state varchar(32), -- the kubernetes pod state
49+
container_state mediumtext, -- the kubernetes container state
50+
engine_id varchar(128), -- the engine id
51+
engine_name mediumtext, -- the engine name
52+
engine_state varchar(32), -- the engine state
53+
engine_error mediumtext, -- the engine diagnose
54+
update_time bigint -- the metadata update time
55+
);
56+
57+
CREATE UNIQUE INDEX IF NOT EXISTS k8s_engine_info_unique_identifier_index ON k8s_engine_info(identifier);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
SELECT '< Upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' ';
2+
.read 001-KYUUBI-7028.sqlite.sql
3+
SELECT '< Finished upgrading MetaStore schema from 1.8.0 to 1.11.0 >' AS ' ';

kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala

+28-7
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState
3838
import org.apache.kyuubi.engine.KubernetesResourceEventTypes.KubernetesResourceEventType
3939
import org.apache.kyuubi.operation.OperationState
4040
import org.apache.kyuubi.server.metadata.MetadataManager
41+
import org.apache.kyuubi.server.metadata.api.KubernetesEngineInfo
4142
import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils}
4243

4344
class KubernetesApplicationOperation extends ApplicationOperation with Logging {
@@ -255,8 +256,13 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
255256
try {
256257
// need to initialize the kubernetes client if not exists
257258
getOrCreateKubernetesClient(appMgrInfo.kubernetesInfo)
258-
val (_, appInfo) =
259-
appInfoStore.getOrDefault(tag, appMgrInfo.kubernetesInfo -> ApplicationInfo.NOT_FOUND)
259+
val appInfo = appInfoStore.get(tag) match {
260+
case (_, info) => info
261+
case _ =>
262+
// try to get the application info from kubernetes engine info store
263+
metadataManager.flatMap(
264+
_.getKubernetesApplicationInfo(tag)).getOrElse(ApplicationInfo.NOT_FOUND)
265+
}
260266
(appInfo.state, submitTime) match {
261267
// Kyuubi should wait second if pod is not be created
262268
case (NOT_FOUND, Some(_submitTime)) =>
@@ -340,7 +346,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
340346
updateApplicationState(kubernetesInfo, newPod, eventType)
341347
val appState = toApplicationState(newPod, appStateSource, appStateContainer, eventType)
342348
if (isTerminated(appState)) {
343-
markApplicationTerminated(newPod, eventType)
349+
markApplicationTerminated(kubernetesInfo, newPod, eventType)
344350
}
345351
KubernetesApplicationAuditLogger.audit(
346352
eventType,
@@ -358,7 +364,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
358364
if (isSparkEnginePod(pod)) {
359365
val eventType = KubernetesResourceEventTypes.DELETE
360366
updateApplicationState(kubernetesInfo, pod, eventType)
361-
markApplicationTerminated(pod, eventType)
367+
markApplicationTerminated(kubernetesInfo, pod, eventType)
362368
KubernetesApplicationAuditLogger.audit(
363369
eventType,
364370
kubernetesInfo,
@@ -456,13 +462,28 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
456462
}
457463

458464
private def markApplicationTerminated(
465+
kubernetesInfo: KubernetesInfo,
459466
pod: Pod,
460467
eventType: KubernetesResourceEventType): Unit = synchronized {
461468
val key = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
469+
val (appState, appError) =
470+
toApplicationStateAndError(pod, appStateSource, appStateContainer, eventType)
471+
// upsert the kubernetes engine info store when the application is terminated
472+
metadataManager.foreach(_.upsertKubernetesMetadata(
473+
KubernetesEngineInfo(
474+
identifier = key,
475+
context = kubernetesInfo.context,
476+
namespace = kubernetesInfo.namespace,
477+
podName = pod.getMetadata.getName,
478+
podState = pod.getStatus.getPhase,
479+
containerState = pod.getStatus.getContainerStatuses.asScala.map(cs =>
480+
s"${cs.getName}->${cs.getState}").mkString(","),
481+
engineId = getPodAppId(pod),
482+
engineName = getPodAppName(pod),
483+
engineState = appState.toString,
484+
engineError = appError)))
462485
if (cleanupTerminatedAppInfoTrigger.getIfPresent(key) == null) {
463-
cleanupTerminatedAppInfoTrigger.put(
464-
key,
465-
toApplicationState(pod, appStateSource, appStateContainer, eventType))
486+
cleanupTerminatedAppInfoTrigger.put(key, appState)
466487
}
467488
}
468489

0 commit comments

Comments
 (0)