[Feature] User can use different hadoop-user to submit application (#3401)
* user can use different hadoop-user to submit application
* user can use different hadoop-user to submit application:do it with proxyuser
* add miss commit file
* add for pass tests
* copy hadoop-user when copy application
---------
Co-authored-by: shenk-b <shenk-b@glodon.com>
diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 79a890d..c35c13c 100644
--- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -46,6 +46,9 @@
alter table `t_flink_app`
add column `probing` tinyint default 0;
+alter table `t_flink_app`
+ add column `hadoop_user` varchar(64) default null;
+
alter table `t_flink_cluster`
add column `job_manager_url` varchar(150) default null comment 'url address of jobmanager' after `address`,
add column `start_time` datetime default null comment 'start time',
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 2234fa4..198cfe1 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -187,6 +187,9 @@
@TableField("TOTAL_TM")
private Integer totalTM;
+ @TableField("HADOOP_USER")
+ private String hadoopUser;
+
private Integer totalSlot;
private Integer availableSlot;
private Integer jmMemory;
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 17de0e1..963a92d 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -487,6 +487,7 @@
? null
: FlinkRestoreMode.of(appParam.getRestoreMode()),
applicationArgs,
+ application.getHadoopUser(),
buildResult,
kubernetesSubmitParam,
extraParameter);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 2a4dd9d..992ece6 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -438,6 +438,7 @@
newApp.setJarCheckSum(oldApp.getJarCheckSum());
newApp.setTags(oldApp.getTags());
newApp.setTeamId(oldApp.getTeamId());
+ newApp.setHadoopUser(oldApp.getHadoopUser());
boolean saved = save(newApp);
if (saved) {
@@ -559,7 +560,11 @@
switch (appParam.getFlinkExecutionMode()) {
case YARN_APPLICATION:
+ application.setHadoopUser(appParam.getHadoopUser());
+ break;
case YARN_PER_JOB:
+ application.setHadoopUser(appParam.getHadoopUser());
+ break;
case KUBERNETES_NATIVE_APPLICATION:
application.setFlinkClusterId(null);
break;
diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index 1af50f2..596801a 100644
--- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -96,6 +96,7 @@
`default_mode_ingress` text ,
`tags` varchar(500) default null,
`probing` tinyint default 0,
+ `hadoop_user` varchar(500) default null,
primary key(`id`)
);
diff --git a/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts b/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
index 71778e6..7fdf36c 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
@@ -138,6 +138,7 @@
empty: boolean;
};
streamParkJob: boolean;
+ hadoopUser: string;
}
interface AppControl {
diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index f9fd462..e9b778a 100644
--- a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -69,6 +69,7 @@
status: 'Run Status',
startTime: 'Start Time',
endTime: 'End Time',
+ hadoopUser: 'Hadoop User',
restoreModeTip:
'restore mode is supported since flink 1.15, usually, you do not have to set this parameter',
release: {
diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index 51f72ce..17652ec 100644
--- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -68,6 +68,7 @@
status: '运行状态',
startTime: '启动时间',
endTime: '结束时间',
+ hadoopUser: 'Hadoop User',
restoreModeTip: 'flink 1.15开始支持restore模式,一般情况下不用设置该参数',
release: {
releaseTitle: '该应用程序的当前启动正在进行中.',
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
index d456421..23a6b91 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
@@ -87,6 +87,7 @@
args: app.args || '',
jar: app.jar,
description: app.description,
+ hadoopUser: app.hadoopUser,
dynamicProperties: app.dynamicProperties,
resolveOrder: app.resolveOrder,
executionMode: app.executionMode,
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index ef33460..7e693ea 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -489,6 +489,11 @@
ifShow: ({ values }) => (edit?.mode ? true : values.jobType != JobTypeEnum.SQL),
},
{
+ field: 'hadoopUser',
+ label: t('flink.app.hadoopUser'),
+ component: 'Input'
+ },
+ {
field: 'description',
label: t('common.description'),
component: 'InputTextArea',
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 7739033..f50c5cf 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -288,6 +288,7 @@
restartSize: values.restartSize,
alertId: values.alertId,
description: values.description,
+ hadoopUser: values.hadoopUser,
k8sNamespace: values.k8sNamespace || null,
clusterId: values.clusterId || null,
flinkClusterId:
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 87ec66d..b74ecae 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -52,6 +52,7 @@
savePoint: String,
restoreMode: FlinkRestoreMode,
args: String,
+ @Nullable hadoopUser: String,
@Nullable buildResult: BuildResult,
@Nullable k8sSubmitParam: KubernetesSubmitParam,
@Nullable extraParameter: JavaMap[String, Any]) {
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index f4fe457..ec0a809 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -26,6 +26,7 @@
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
+import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.client.program.ClusterClient
@@ -37,6 +38,7 @@
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationId
+import java.security.PrivilegedAction
import java.util
import java.util.Collections
@@ -140,9 +142,19 @@
override def doSubmit(
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
- SecurityUtils.install(new SecurityConfiguration(flinkConfig))
- SecurityUtils.getInstalledContext.runSecured(
- () => {
+ var proxyUserUgi: UserGroupInformation = UserGroupInformation.getCurrentUser
+ val currentUser = UserGroupInformation.getCurrentUser
+ if (!HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
+ if (StringUtils.isNotEmpty(submitRequest.hadoopUser)) {
+ proxyUserUgi = UserGroupInformation.createProxyUser(
+ submitRequest.hadoopUser,
+ currentUser
+ )
+ }
+ }
+
+ proxyUserUgi.doAs[SubmitResponse](new PrivilegedAction[SubmitResponse] {
+ override def run(): SubmitResponse = {
val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
val clientFactory =
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
@@ -174,7 +186,44 @@
} finally {
Utils.close(clusterDescriptor, clusterClient)
}
- })
+ }
+ })
+
+// SecurityUtils.install(new SecurityConfiguration(flinkConfig))
+// SecurityUtils.getInstalledContext.runSecured(
+// () => {
+// val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
+// val clientFactory =
+// clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
+// val clusterDescriptor = clientFactory.createClusterDescriptor(flinkConfig)
+// var clusterClient: ClusterClient[ApplicationId] = null
+// try {
+// val clusterSpecification = clientFactory.getClusterSpecification(flinkConfig)
+// logInfo(s"""
+// |------------------------<<specification>>-------------------------
+// |$clusterSpecification
+// |------------------------------------------------------------------
+// |""".stripMargin)
+//
+// val applicationConfiguration = ApplicationConfiguration.fromConfiguration(flinkConfig)
+// var applicationId: ApplicationId = null
+// var jobManagerUrl: String = null
+// clusterClient = clusterDescriptor
+// .deployApplicationCluster(clusterSpecification, applicationConfiguration)
+// .getClusterClient
+// applicationId = clusterClient.getClusterId
+// jobManagerUrl = clusterClient.getWebInterfaceURL
+// logInfo(s"""
+// |-------------------------<<applicationId>>------------------------
+// |Flink Job Started: applicationId: $applicationId
+// |__________________________________________________________________
+// |""".stripMargin)
+//
+// SubmitResponse(applicationId.toString, flinkConfig.toMap, jobManagerUrl = jobManagerUrl)
+// } finally {
+// Utils.close(clusterDescriptor, clusterClient)
+// }
+// })
}
}