[Feature] User can use different hadoop-user to submit application (#3401)

* user can use different hadoop-user to submit application

* user can use different hadoop-user to submit application:do it with proxyuser

* add miss commit file

* add for pass tests

* copy hadoop-user when copy application

---------

Co-authored-by: shenk-b <shenk-b@glodon.com>
diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 79a890d..c35c13c 100644
--- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -46,6 +46,9 @@
 alter table `t_flink_app`
     add column `probing` tinyint default 0;
 
+alter table `t_flink_app`
+    add column `hadoop_user` varchar(64) default null;
+
 alter table `t_flink_cluster`
     add column `job_manager_url` varchar(150) default null comment 'url address of jobmanager' after `address`,
     add column `start_time` datetime default null comment 'start time',
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 2234fa4..198cfe1 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -187,6 +187,9 @@
   @TableField("TOTAL_TM")
   private Integer totalTM;
 
+  @TableField("HADOOP_USER")
+  private String hadoopUser;
+
   private Integer totalSlot;
   private Integer availableSlot;
   private Integer jmMemory;
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 17de0e1..963a92d 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -487,6 +487,7 @@
                 ? null
                 : FlinkRestoreMode.of(appParam.getRestoreMode()),
             applicationArgs,
+            application.getHadoopUser(),
             buildResult,
             kubernetesSubmitParam,
             extraParameter);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
index 2a4dd9d..992ece6 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationManageServiceImpl.java
@@ -438,6 +438,7 @@
     newApp.setJarCheckSum(oldApp.getJarCheckSum());
     newApp.setTags(oldApp.getTags());
     newApp.setTeamId(oldApp.getTeamId());
+    newApp.setHadoopUser(oldApp.getHadoopUser());
 
     boolean saved = save(newApp);
     if (saved) {
@@ -559,7 +560,11 @@
 
     switch (appParam.getFlinkExecutionMode()) {
       case YARN_APPLICATION:
+        application.setHadoopUser(appParam.getHadoopUser());
+        break;
       case YARN_PER_JOB:
+        application.setHadoopUser(appParam.getHadoopUser());
+        break;
       case KUBERNETES_NATIVE_APPLICATION:
         application.setFlinkClusterId(null);
         break;
diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index 1af50f2..596801a 100644
--- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -96,6 +96,7 @@
   `default_mode_ingress` text ,
   `tags` varchar(500) default null,
   `probing` tinyint default 0,
+  `hadoop_user` varchar(500) default null,
   primary key(`id`)
 );
 
diff --git a/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts b/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
index 71778e6..7fdf36c 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app.type.ts
@@ -138,6 +138,7 @@
     empty: boolean;
   };
   streamParkJob: boolean;
+  hadoopUser: string;
 }
 
 interface AppControl {
diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index f9fd462..e9b778a 100644
--- a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -69,6 +69,7 @@
   status: 'Run Status',
   startTime: 'Start Time',
   endTime: 'End Time',
+  hadoopUser: 'Hadoop User',
   restoreModeTip:
     'restore mode is supported since flink 1.15, usually, you do not have to set this parameter',
   release: {
diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index 51f72ce..17652ec 100644
--- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -68,6 +68,7 @@
   status: '运行状态',
   startTime: '启动时间',
   endTime: '结束时间',
+  hadoopUser: 'Hadoop User',
   restoreModeTip: 'flink 1.15开始支持restore模式,一般情况下不用设置该参数',
   release: {
     releaseTitle: '该应用程序的当前启动正在进行中.',
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
index d456421..23a6b91 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue
@@ -87,6 +87,7 @@
         args: app.args || '',
         jar: app.jar,
         description: app.description,
+        hadoopUser: app.hadoopUser,
         dynamicProperties: app.dynamicProperties,
         resolveOrder: app.resolveOrder,
         executionMode: app.executionMode,
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index ef33460..7e693ea 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -489,6 +489,11 @@
         ifShow: ({ values }) => (edit?.mode ? true : values.jobType != JobTypeEnum.SQL),
       },
       {
+        field: 'hadoopUser',
+        label: t('flink.app.hadoopUser'),
+        component: 'Input'
+      },
+      {
         field: 'description',
         label: t('common.description'),
         component: 'InputTextArea',
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
index 7739033..f50c5cf 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/index.ts
@@ -288,6 +288,7 @@
     restartSize: values.restartSize,
     alertId: values.alertId,
     description: values.description,
+    hadoopUser: values.hadoopUser,
     k8sNamespace: values.k8sNamespace || null,
     clusterId: values.clusterId || null,
     flinkClusterId:
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 87ec66d..b74ecae 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -52,6 +52,7 @@
     savePoint: String,
     restoreMode: FlinkRestoreMode,
     args: String,
+    @Nullable hadoopUser: String,
     @Nullable buildResult: BuildResult,
     @Nullable k8sSubmitParam: KubernetesSubmitParam,
     @Nullable extraParameter: JavaMap[String, Any]) {
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index f4fe457..ec0a809 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -26,6 +26,7 @@
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.deployment.application.ApplicationConfiguration
 import org.apache.flink.client.program.ClusterClient
@@ -37,6 +38,7 @@
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.ApplicationId
 
+import java.security.PrivilegedAction
 import java.util
 import java.util.Collections
 
@@ -140,9 +142,19 @@
   override def doSubmit(
       submitRequest: SubmitRequest,
       flinkConfig: Configuration): SubmitResponse = {
-    SecurityUtils.install(new SecurityConfiguration(flinkConfig))
-    SecurityUtils.getInstalledContext.runSecured(
-      () => {
+    var proxyUserUgi: UserGroupInformation = UserGroupInformation.getCurrentUser
+    val currentUser = UserGroupInformation.getCurrentUser
+    if (!HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
+      if (StringUtils.isNotEmpty(submitRequest.hadoopUser)) {
+        proxyUserUgi = UserGroupInformation.createProxyUser(
+          submitRequest.hadoopUser,
+          currentUser
+        )
+      }
+    }
+
+    proxyUserUgi.doAs[SubmitResponse](new PrivilegedAction[SubmitResponse] {
+      override def run(): SubmitResponse = {
         val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
         val clientFactory =
           clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
@@ -174,7 +186,44 @@
         } finally {
           Utils.close(clusterDescriptor, clusterClient)
         }
-      })
+      }
+    })
+
+//    SecurityUtils.install(new SecurityConfiguration(flinkConfig))
+//    SecurityUtils.getInstalledContext.runSecured(
+//      () => {
+//        val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
+//        val clientFactory =
+//          clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
+//        val clusterDescriptor = clientFactory.createClusterDescriptor(flinkConfig)
+//        var clusterClient: ClusterClient[ApplicationId] = null
+//        try {
+//          val clusterSpecification = clientFactory.getClusterSpecification(flinkConfig)
+//          logInfo(s"""
+//                     |------------------------<<specification>>-------------------------
+//                     |$clusterSpecification
+//                     |------------------------------------------------------------------
+//                     |""".stripMargin)
+//
+//          val applicationConfiguration = ApplicationConfiguration.fromConfiguration(flinkConfig)
+//          var applicationId: ApplicationId = null
+//          var jobManagerUrl: String = null
+//          clusterClient = clusterDescriptor
+//            .deployApplicationCluster(clusterSpecification, applicationConfiguration)
+//            .getClusterClient
+//          applicationId = clusterClient.getClusterId
+//          jobManagerUrl = clusterClient.getWebInterfaceURL
+//          logInfo(s"""
+//                     |-------------------------<<applicationId>>------------------------
+//                     |Flink Job Started: applicationId: $applicationId
+//                     |__________________________________________________________________
+//                     |""".stripMargin)
+//
+//          SubmitResponse(applicationId.toString, flinkConfig.toMap, jobManagerUrl = jobManagerUrl)
+//        } finally {
+//          Utils.close(clusterDescriptor, clusterClient)
+//        }
+//      })
   }
 
 }