Merge pull request #4832 from apache/master

diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala
index fec2fe5..c072c32 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala
@@ -118,4 +118,10 @@
   val TASK_SUBMIT_WAIT_TIME_MS =
     CommonVars("linkis.ec.task.submit.wait.time.ms", 2L, "Task submit wait time(ms)").getValue
 
+  val ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED =
+    CommonVars("linkis.ec.send.log.entrance.limit.enabled", true)
+
+  val ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH =
+    CommonVars("linkis.ec.send.log.entrance.limit.length", 2000)
+
 }
diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala
index 7367dd5..377c32c 100644
--- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala
+++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala
@@ -193,8 +193,16 @@
   def appendStdout(log: String): Unit = if (executor.isInternalExecute) {
     logger.info(log)
   } else {
+    var taskLog = log
+    if (
+        ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED.getValue &&
+        log.length > ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH.getValue
+    ) {
+      taskLog =
+        s"${log.substring(0, ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH.getValue)}..."
+    }
     val listenerBus = getEngineSyncListenerBus
-    getJobId.foreach(jId => listenerBus.postToAll(TaskLogUpdateEvent(jId, log)))
+    getJobId.foreach(jId => listenerBus.postToAll(TaskLogUpdateEvent(jId, taskLog)))
   }
 
   override def close(): Unit = {
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
index 4e4654f..24d3ddc 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/context/SparkConfig.java
@@ -47,6 +47,7 @@
   private String k8sSparkVersion;
 
   private String k8sNamespace;
+  private String k8sFileUploadPath;
   private String deployMode = "client"; // ("client") // todo cluster
   private String appResource; // ("")
   private String appName; // ("")
@@ -73,6 +74,14 @@
   private String keytab; // ("--keytab", "")
   private String queue; // ("--queue", "")
 
+  public String getK8sFileUploadPath() {
+    return k8sFileUploadPath;
+  }
+
+  public void setK8sFileUploadPath(String k8sFileUploadPath) {
+    this.k8sFileUploadPath = k8sFileUploadPath;
+  }
+
   public String getK8sImagePullPolicy() {
     return k8sImagePullPolicy;
   }
@@ -421,6 +430,9 @@
         + ", k8sSparkVersion='"
         + k8sSparkVersion
         + '\''
+        + ", k8sFileUploadPath='"
+        + k8sFileUploadPath
+        + '\''
         + ", k8sNamespace='"
         + k8sNamespace
         + '\''
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
index 2f03182..fa62366 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/KubernetesOperatorClusterDescriptorAdapter.java
@@ -21,11 +21,13 @@
 import org.apache.linkis.engineplugin.spark.client.context.SparkConfig;
 import org.apache.linkis.engineplugin.spark.client.deployment.crds.*;
 import org.apache.linkis.engineplugin.spark.client.deployment.util.KubernetesHelper;
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.spark.launcher.SparkAppHandle;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -79,6 +81,7 @@
 
     NonNamespaceOperation<SparkApplication, SparkApplicationList, Resource<SparkApplication>>
         sparkApplicationClient = getSparkApplicationClient(client);
+
     SparkApplication sparkApplication =
         getSparkApplication(sparkConfig.getAppName(), sparkConfig.getK8sNamespace());
 
@@ -88,12 +91,19 @@
             .memory(sparkConfig.getDriverMemory())
             .serviceAccount(sparkConfig.getK8sServiceAccount())
             .build();
+
     SparkPodSpec executor =
         SparkPodSpec.Builder()
             .cores(sparkConfig.getExecutorCores())
             .instances(sparkConfig.getNumExecutors())
             .memory(sparkConfig.getExecutorMemory())
             .build();
+
+    Map<String, String> sparkConfMap = new HashMap<>();
+    sparkConfMap.put(
+        SparkConfiguration.SPARK_KUBERNETES_FILE_UPLOAD_PATH().key(),
+        sparkConfig.getK8sFileUploadPath());
+
     SparkApplicationSpec sparkApplicationSpec =
         SparkApplicationSpec.Builder()
             .type(sparkConfig.getK8sLanguageType())
@@ -107,10 +117,12 @@
             .restartPolicy(new RestartPolicy(sparkConfig.getK8sRestartPolicy()))
             .driver(driver)
             .executor(executor)
+            .sparkConf(sparkConfMap)
             .build();
 
     logger.info("Spark k8s operator task parameters: {}", sparkApplicationSpec);
     sparkApplication.setSpec(sparkApplicationSpec);
+
     SparkApplication created = sparkApplicationClient.createOrReplace(sparkApplication);
     logger.info("Preparing to submit the Spark k8s operator Task: {}", created);
 
diff --git a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java
index 80c03f2..be705ce 100644
--- a/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java
+++ b/linkis-engineconn-plugins/spark/src/main/java/org/apache/linkis/engineplugin/spark/client/deployment/crds/SparkApplicationSpec.java
@@ -17,7 +17,9 @@
 
 package org.apache.linkis.engineplugin.spark.client.deployment.crds;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import io.fabric8.kubernetes.api.model.KubernetesResource;
 
@@ -45,6 +47,16 @@
 
   private SparkPodSpec executor;
 
+  private Map<String, String> sparkConf;
+
+  public Map<String, String> getSparkConf() {
+    return sparkConf;
+  }
+
+  public void setSparkConf(Map<String, String> sparkConf) {
+    this.sparkConf = sparkConf;
+  }
+
   public String getType() {
     return type;
   }
@@ -165,6 +177,8 @@
         + driver
         + ", executor="
         + executor
+        + ", sparkConf="
+        + sparkConf
         + '}';
   }
 
@@ -185,6 +199,8 @@
     private SparkPodSpec driver;
     private SparkPodSpec executor;
 
+    private Map<String, String> sparkConf;
+
     private SparkApplicationSpecBuilder() {}
 
     public SparkApplicationSpecBuilder type(String type) {
@@ -242,6 +258,22 @@
       return this;
     }
 
+    public SparkApplicationSpecBuilder sparkConf(Map<String, String> sparkConf) {
+      if (sparkConf == null || sparkConf.size() == 0) {
+        return this;
+      }
+
+      if (this.sparkConf == null) {
+        this.sparkConf = new HashMap<>();
+      }
+
+      for (Map.Entry<String, String> entry : sparkConf.entrySet()) {
+        this.sparkConf.put(entry.getKey(), entry.getValue());
+      }
+
+      return this;
+    }
+
     public SparkApplicationSpec build() {
       SparkApplicationSpec sparkApplicationSpec = new SparkApplicationSpec();
       sparkApplicationSpec.type = this.type;
@@ -255,6 +287,7 @@
       sparkApplicationSpec.executor = this.executor;
       sparkApplicationSpec.image = this.image;
       sparkApplicationSpec.restartPolicy = this.restartPolicy;
+      sparkApplicationSpec.sparkConf = this.sparkConf;
       return sparkApplicationSpec;
     }
   }
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index b81b21e..ecc3759 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -62,6 +62,9 @@
   val SPARK_K8S_SPARK_VERSION = CommonVars[String]("linkis.spark.k8s.sparkVersion", "3.2.1")
   val SPARK_K8S_NAMESPACE = CommonVars[String]("linkis.spark.k8s.namespace", "default")
 
+  val SPARK_KUBERNETES_FILE_UPLOAD_PATH =
+    CommonVars[String]("spark.kubernetes.file.upload.path", "local:///opt/spark/tmp")
+
   val SPARK_PYTHON_VERSION = CommonVars[String]("spark.python.version", "python")
 
   val SPARK_PYTHON_TEST_MODE_ENABLE =
diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
index fef3f06..5bf90c6 100644
--- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
+++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnFactory.scala
@@ -104,6 +104,7 @@
       sparkConfig.setK8sPassword(SPARK_K8S_PASSWORD.getValue(options))
       sparkConfig.setK8sImage(SPARK_K8S_IMAGE.getValue(options))
       sparkConfig.setK8sNamespace(SPARK_K8S_NAMESPACE.getValue(options))
+      sparkConfig.setK8sFileUploadPath(SPARK_KUBERNETES_FILE_UPLOAD_PATH.getValue(options))
       sparkConfig.setK8sSparkVersion(SPARK_K8S_SPARK_VERSION.getValue(options))
       sparkConfig.setK8sRestartPolicy(SPARK_K8S_RESTART_POLICY.getValue(options))
       sparkConfig.setK8sLanguageType(SPARK_K8S_LANGUAGE_TYPE.getValue(options))