KYLIN-5023 Some fix for spark standalone
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index bccda65..8915669 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1591,8 +1591,20 @@
         return getPropertiesByPrefix("kylin.engine.flink-conf.");
     }
 
-    public Map<String, String> getSparkConfigOverrideWithSpecificName(String configName) {
-        return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName + ".");
+    public String getSparkEngineConfigOverrideWithSpecificName(String configName) {
+        Map<String, String> config = getPropertiesByPrefix("kylin.engine.spark-conf." + configName);
+        if (config.size() != 0) {
+            return String.valueOf(config.values().iterator().next());
+        }
+        return null;
+    }
+
+    public String getSparderConfigOverrideWithSpecificName(String configName) {
+        Map<String, String> config = getPropertiesByPrefix("kylin.query.spark-conf." + configName);
+        if (config.size() != 0) {
+            return String.valueOf(config.values().iterator().next());
+        }
+        return null;
     }
 
     public Map<String, String> getFlinkConfigOverrideWithSpecificName(String configName) {
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 6859e65..fbfae2e 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -161,7 +161,10 @@
             return runLocalMode(filePath, config);
         } else {
             logger.info("Task id: {}", getId());
-            killOrphanApplicationIfExists(config, getId());
+            if ("yarn".equals(config.getSparkEngineConfigOverrideWithSpecificName("spark.master"))) {
+                logger.info("Try to kill orphan application on yarn.");
+                killOrphanApplicationIfExists(config, getId());
+            }
             return runSparkSubmit(config, hadoopConf, jars, kylinJobJar,
                     "-className " + getSparkSubmitClassName() + " " + filePath, getParent().getId());
         }
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
index 5b2caf4..db562b1 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
@@ -43,11 +43,12 @@
       case STANDALONE_CLUSTER =>
         var appState = StandaloneAppClient.getAppState(stepId)
         while (true) {
-          logInfo(s"$stepId state is $appState .")
-          if (!finalStates.contains(appState)) {
-            Thread.sleep(10000)
-          }
           appState = StandaloneAppClient.getAppState(stepId)
+          logInfo(s"$stepId state is $appState .")
+          if (finalStates.contains(appState)) {
+            return appState
+          }
+          Thread.sleep(10000)
         }
         appState
       case m => throw new UnsupportedOperationException("waitAndCheckAppState " + m)
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index 530cbae..c6916d2 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -136,10 +136,12 @@
                 case "true" =>
                   "local"
                 case _ =>
-                  "yarn-client"
+                  kylinConf.getSparderConfigOverrideWithSpecificName("spark.master")
               }
+              logInfo("SparderContext deploy with spark master: " + master)
               val sparkSession = SparkSession.builder
                 .master(master)
+                .config("spark.submit.deployMode", "client")
                 .appName(kylinConf.getSparderAppName)
                 .withExtensions { ext =>
                   ext.injectPlannerStrategy(_ => KylinSourceStrategy)