KYLIN-4320 number of replicas of Cuboid files cannot be configured for Spark engine
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3490b7a..16c07c1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1432,6 +1432,10 @@
         return getPropertiesByPrefix("kylin.engine.mr.base-cuboid-config-override.");
     }
 
+    public String getCuboidDfsReplication() {
+        return getOptional("kylin.engine.cuboid.dfs.replication", "2");
+    }
+
     public Map<String, String> getSparkConfigOverride() {
         return getPropertiesByPrefix("kylin.engine.spark-conf.");
     }
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
index 3c532a1..bf0b686 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
@@ -223,7 +223,7 @@
             final KylinConfig kylinConfig) throws Exception {
         final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
         final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
-        FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication=2 and enable compress
+        FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication and enable compress
         FlinkUtil.setHadoopConfForCuboid(job, cubeSeg, metaUrl);
 
         HadoopOutputFormat<Text, Text> hadoopOF =
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
index 938939f..3d8aac3 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingMerge.java
@@ -121,7 +121,7 @@
         }
 
         Job job = Job.getInstance();
-        FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication=2 and enable compress
+        FlinkUtil.modifyFlinkHadoopConfiguration(job); // set dfs.replication and enable compress
 
         HadoopUtil.deletePath(job.getConfiguration(), new Path(outputPath));
         final SerializableConfiguration sConf = new SerializableConfiguration(job.getConfiguration());
@@ -206,7 +206,7 @@
                 final String cuboidOutputPath = FlinkBatchMergeJobBuilder2.getCuboidOutputPathsByLevel(outputPath, level);
 
                 Job jobInstanceForEachOutputFormat = Job.getInstance();
-                FlinkUtil.modifyFlinkHadoopConfiguration(jobInstanceForEachOutputFormat); // set dfs.replication=2 and enable compress
+                FlinkUtil.modifyFlinkHadoopConfiguration(jobInstanceForEachOutputFormat); // set dfs.replication and enable compress
                 FlinkUtil.setHadoopConfForCuboid(jobInstanceForEachOutputFormat, cubeSegment, metaUrl);
 
                 FileOutputFormat.setOutputPath(jobInstanceForEachOutputFormat, new Path(cuboidOutputPath));
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
index 73f532a..188ddef 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkUtil.java
@@ -34,6 +34,7 @@
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
@@ -102,7 +103,7 @@
     }
 
     public static void modifyFlinkHadoopConfiguration(Job job) throws Exception {
-        job.getConfiguration().set("dfs.replication", "2"); // cuboid intermediate files, replication=2
+        job.getConfiguration().set("dfs.replication", KylinConfig.getInstanceFromEnv().getCuboidDfsReplication()); // cuboid intermediate files
         job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
         job.getConfiguration().set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
         job.getConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index d35f6b6..f12c731 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -144,6 +144,9 @@
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(Text.class);
 
+            //set dfs.replication
+            job.getConfiguration().set("dfs.replication", KylinConfig.getInstanceFromEnv().getCuboidDfsReplication());
+
             // set input
             configureMapperInputFormat(segment);
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
index 311ec4f..da7c25e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
@@ -76,6 +76,9 @@
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(Text.class);
 
+            //set dfs.replication
+            job.getConfiguration().set("dfs.replication", KylinConfig.getInstanceFromEnv().getCuboidDfsReplication());
+
             // set inputs
             IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getOutputFormat();
             outputFormat.configureJobInput(job, input);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 1c0c18f..381ef13 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -130,7 +130,7 @@
         JavaSparkContext sc = new JavaSparkContext(conf);
         sc.sc().addSparkListener(jobListener);
         HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
-        SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
+        SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob.loadKylinConfigFromHdfs(new SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set dfs.replication and enable compress
         final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
         KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
 
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index a3b13a8..c329f3c 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -112,7 +112,7 @@
         conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
 
         try (JavaSparkContext sc = new JavaSparkContext(conf)) {
-            SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
+            SparkUtil.modifySparkHadoopConfiguration(sc.sc(), AbstractHadoopJob.loadKylinConfigFromHdfs(new SerializableConfiguration(sc.hadoopConfiguration()), metaUrl)); // set dfs.replication and enable compress
             KylinSparkJobListener jobListener = new KylinSparkJobListener();
             sc.sc().addSparkListener(jobListener);
 
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index fcd24f1..aabc767 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -137,8 +137,8 @@
         job.setOutputFormatClass(SequenceFileOutputFormat.class);
     }
 
-    public static void modifySparkHadoopConfiguration(SparkContext sc) throws Exception {
-        sc.hadoopConfiguration().set("dfs.replication", "2"); // cuboid intermediate files, replication=2
+    public static void modifySparkHadoopConfiguration(SparkContext sc, KylinConfig kylinConfig) throws Exception {
+        sc.hadoopConfiguration().set("dfs.replication", kylinConfig.getCuboidDfsReplication()); // cuboid intermediate files
         sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
         sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
         sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec