KYLIN-4320 number of replicas of Cuboid files cannot be configured for Spark engine
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3490b7a..16c07c1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1432,6 +1432,10 @@
return getPropertiesByPrefix("kylin.engine.mr.base-cuboid-config-override.");
}
+ public String getCuboidDfsReplication() {
+ return getOptional("kylin.engine.cuboid.dfs.replication", "2");
+ }
+
public Map<String, String> getSparkConfigOverride() {
return getPropertiesByPrefix("kylin.engine.spark-conf.");
}
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
index 3c532a1..bf0b686 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
@@ -223,7 +223,7 @@
final KylinConfig kylinConfig) throws Exception {
final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
- FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication=2 and enable compress
+ FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication and enable compress
FlinkUtil.setHadoopConfForCuboid(job, cubeSeg, metaUrl);
HadoopOutputFormat<Text, Text> hadoopOF =
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
index 938939f..3d8aac3 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
@@ -121,7 +121,7 @@
}
Job job = Job.getInstance();
- FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication=2 and enable compress
+ FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication and enable compress
HadoopUtil.deletePath(job.getConfiguration(), new Path(outputPath));
final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
@@ -206,7 +206,7 @@
final String cuboidOutputPath = FlinkBatchMergeJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
Job jobInstanceForEachOutputFormat = Job.getInstance();
- FlinkUtil.modifyFlinkHadoopConfiguration(jobInstanceForEachOutputFormat); // set dfs.replication=2 and enable compress
+ FlinkUtil.modifyFlinkHadoopConfiguration(jobInstanceForEachOutputFormat); // set dfs.replication and enable compress
FlinkUtil.setHadoopConfForCuboid(jobInstanceForEachOutputFormat, cubeSegment, metaUrl);
FileOutputFormat.setOutputPath(jobInstanceForEachOutputFormat, new Path(cuboidOutputPath));
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
index 73f532a..188ddef 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
@@ -102,7 +103,7 @@
}
public static void modifyFlinkHadoopConfiguration(Job job) throws Exception {
- job.getConfiguration().set("dfs.replication", "2"); // cuboid intermediate files, replication=2
+ job.getConfiguration().set("dfs.replication", KylinConfig.getInstanceFromEnv().getCuboidDfsReplication()); // cuboid intermediate files
job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
job.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
job.getConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index d35f6b6..f12c731 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -144,6 +144,9 @@
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
+ //set dfs.replication
+ job.getConfiguration().set("dfs.replication", KylinConfig.getInstanceFromEnv().getCuboidDfsReplication());
+
// set input
configureMapperInputFormat(segment);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
index 311ec4f..da7c25e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
@@ -76,6 +76,9 @@
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
+ //set dfs.replication
+ job.getConfiguration().set("dfs.replication", KylinConfig.getInstanceFromEnv().getCuboidDfsReplication());
+
// set inputs
IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getOutputFormat();
outputFormat.configureJobInput(job, input);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 1c0c18f..381ef13 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -130,7 +130,7 @@
JavaSparkContext sc = new JavaSparkContext(conf);
sc.sc().addSparkListener(jobListener);
HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
- SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
+ SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob.loadKylinConfigFromHdfs(new SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set dfs.replication and enable compress
final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index a3b13a8..c329f3c 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -112,7 +112,7 @@
conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
- SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
+ SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob.loadKylinConfigFromHdfs(new SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set dfs.replication and enable compress
KylinSparkJobListener jobListener = new KylinSparkJobListener();
sc.sc().addSparkListener(jobListener);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index fcd24f1..aabc767 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -137,8 +137,8 @@
job.setOutputFormatClass(SequenceFileOutputFormat.class);
}
- public static void modifySparkHadoopConfiguration(SparkContext sc) throws Exception {
- sc.hadoopConfiguration().set("dfs.replication", "2"); // cuboid intermediate files, replication=2
+ public static void modifySparkHadoopConfiguration(SparkContext sc, KylinConfig kylinConfig) throws Exception {
+ sc.hadoopConfiguration().set("dfs.replication", kylinConfig.getCuboidDfsReplication()); // cuboid intermediate files
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec