KYLIN-5023 Some fix for spark standalone
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index bccda65..8915669 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1591,8 +1591,20 @@
return getPropertiesByPrefix("kylin.engine.flink-conf.");
}
- public Map<String, String> getSparkConfigOverrideWithSpecificName(String configName) {
- return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName + ".");
+ public String getSparkEngineConfigOverrideWithSpecificName(String configName) {
+ Map<String, String> config = getPropertiesByPrefix("kylin.engine.spark-conf." + configName);
+ if (config.size() != 0) {
+ return String.valueOf(config.values().iterator().next());
+ }
+ return null;
+ }
+
+ public String getSparderConfigOverrideWithSpecificName(String configName) {
+ Map<String, String> config = getPropertiesByPrefix("kylin.query.spark-conf." + configName);
+ if (config.size() != 0) {
+ return String.valueOf(config.values().iterator().next());
+ }
+ return null;
}
public Map<String, String> getFlinkConfigOverrideWithSpecificName(String configName) {
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 6859e65..fbfae2e 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -161,7 +161,10 @@
return runLocalMode(filePath, config);
} else {
logger.info("Task id: {}", getId());
- killOrphanApplicationIfExists(config, getId());
+ if ("yarn".equals(config.getSparkEngineConfigOverrideWithSpecificName("spark.master"))) {
+ logger.info("Try to kill orphan application on yarn.");
+ killOrphanApplicationIfExists(config, getId());
+ }
return runSparkSubmit(config, hadoopConf, jars, kylinJobJar,
"-className " + getSparkSubmitClassName() + " " + filePath, getParent().getId());
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
index 5b2caf4..db562b1 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/deploy/SparkApplicationClient.scala
@@ -43,11 +43,12 @@
case STANDALONE_CLUSTER =>
var appState = StandaloneAppClient.getAppState(stepId)
while (true) {
- logInfo(s"$stepId state is $appState .")
- if (!finalStates.contains(appState)) {
- Thread.sleep(10000)
- }
appState = StandaloneAppClient.getAppState(stepId)
+ logInfo(s"$stepId state is $appState .")
+ if (finalStates.contains(appState)) {
+ return appState
+ }
+ Thread.sleep(10000)
}
appState
case m => throw new UnsupportedOperationException("waitAndCheckAppState " + m)
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index 530cbae..c6916d2 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -136,10 +136,12 @@
case "true" =>
"local"
case _ =>
- "yarn-client"
+ kylinConf.getSparderConfigOverrideWithSpecificName("spark.master")
}
+ logInfo("SparderContext deploy with spark master: " + master)
val sparkSession = SparkSession.builder
.master(master)
+ .config("spark.submit.deployMode", "client")
.appName(kylinConf.getSparderAppName)
.withExtensions { ext =>
ext.injectPlannerStrategy(_ => KylinSourceStrategy)