MAPREDUCE-2544. [Gridmix] Add compression emulation system tests to Gridmix. (Vinay Kumar Thota via amarrk)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1130147 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index f0e9858..3727df5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -23,6 +23,9 @@
IMPROVEMENTS
+ MAPREDUCE-2544. [Gridmix] Add compression emulation system tests to
+ Gridmix. (Vinay Kumar Thota via amarrk)
+
MAPREDUCE-2517. [Gridmix] Add system tests to Gridmix.
(Vinay Kumar Thota via amarrk)
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationEnableForAllTypesOfJobs.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationEnableForAllTypesOfJobs.java
new file mode 100644
index 0000000..4144bae
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationEnableForAllTypesOfJobs.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixConfig;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixRunMode;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Verify the compression emulation for all the jobs in the trace
+ * irrespective of compressed inputs.
+ */
+public class TestCompressionEmulationEnableForAllTypesOfJobs
+ extends GridmixSystemTestCase {
+ private static final Log LOG =
+ LogFactory.getLog(
+ "TestCompressionEmulationEnableForAllTypesOfJobs.class");
+
+ /**
+ * Generate compressed input data and verify the compression emulation
+ * for all the jobs in the trace irrespective of whether the original
+ * job uses the compressed input or not.Also use the custom compression
+ * ratios for map input, map output and reduce output.
+ * @throws Exception - if an error occurs.
+ */
+ @Test
+ public void testInputCompressionEmualtionEnableForAllJobsWithDefaultRatios()
+ throws Exception {
+ final long inputSizeInMB = 1024 * 6;
+ final String tracePath = getTraceFile("compression_case4_trace");
+ Assert.assertNotNull("Trace file has not found.", tracePath);
+ final String [] runtimeValues = {"LOADJOB",
+ SubmitterUserResolver.class.getName(),
+ "REPLAY",
+ inputSizeInMB + "m",
+ tracePath};
+
+ final String [] otherArgs = {
+ "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+ "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true",
+ "-D", GridMixConfig.GRIDMIX_INPUT_DECOMPRESS_ENABLE + "=true",
+ "-D", GridMixConfig.GRIDMIX_INPUT_COMPRESS_RATIO + "=0.46",
+ "-D", GridMixConfig.GRIDMIX_INTERMEDIATE_COMPRESSION_RATIO + "=0.35",
+ "-D", GridMixConfig.GRIDMIX_OUTPUT_COMPRESSION_RATIO + "=0.36"
+ };
+
+ runGridmixAndVerify(runtimeValues, otherArgs, tracePath,
+ GridMixRunMode.DATA_GENERATION_AND_RUN_GRIDMIX.getValue());
+ }
+
+ /**
+ * Use existing compressed input data and turn off the compression
+ * emulation. Verify the compression emulation whether it uses
+ * by the jobs or not.
+ * @throws Exception - if an error occurs.
+ */
+ @Test
+ public void testInputCompressionEmulationEnableForAllJobsWithCustomRatios()
+ throws Exception {
+ final String tracePath = getTraceFile("compression_case4_trace");
+ Assert.assertNotNull("Trace file has not found.", tracePath);
+ final String [] runtimeValues = {"LOADJOB",
+ SubmitterUserResolver.class.getName(),
+ "SERIAL",
+ tracePath};
+
+ final String [] otherArgs = {
+ "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+ "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=false"
+ };
+
+ runGridmixAndVerify(runtimeValues, otherArgs, tracePath,
+ GridMixRunMode.RUN_GRIDMIX.getValue());
+ }
+}
+
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationForCompressInAndUncompressOut.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationForCompressInAndUncompressOut.java
new file mode 100644
index 0000000..6f0dcbf
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationForCompressInAndUncompressOut.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixConfig;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixRunMode;
+import org.apache.hadoop.mapred.gridmix.test.system.UtilsForGridmix;
+import org.junit.Assert;
+import org.junit.Test;
+/**
+ * Verify the gridmix jobs compression ratio's of input,
+ * intermediate input and with default/custom ratios.Also verify
+ * the compressed output file format is enabled or not.
+ *
+ */
+public class TestCompressionEmulationForCompressInAndUncompressOut
+ extends GridmixSystemTestCase {
+ private static final Log LOG =
+ LogFactory.getLog(
+ "TestCompressionEmulationForCompressInAndUncompressOut.class");
+ final long inputSizeInMB = 1024 * 6;
+
+ /**
+ * Generate a compressed input data and verify the compression ratios
+ * of map input and map output against default compression ratios
+ * and also verify the whether the compressed output file output format
+ * is enabled or not.
+ * @throws Exception -if an error occurs.
+ */
+ @Test
+ public void testCompressionEmulationOfCompressedInputWithDefaultRatios()
+ throws Exception {
+ final String tracePath = getTraceFile("compression_case2_trace");
+ Assert.assertNotNull("Trace file has not found.", tracePath);
+ final String [] runtimeValues = {"LOADJOB",
+ SubmitterUserResolver.class.getName(),
+ "STRESS",
+ inputSizeInMB + "m",
+ tracePath};
+
+ final String [] otherArgs = {
+ "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+ "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true"
+ };
+
+ runGridmixAndVerify(runtimeValues, otherArgs, tracePath,
+ GridMixRunMode.DATA_GENERATION_AND_RUN_GRIDMIX.getValue());
+ }
+
+ /**
+ * Use existing compressed input data and verify the compression ratios
+ * of input and intermediate input against custom compression ratios
+ * and also verify the compressed output file output format is enabled or not.
+ * @throws Exception -if an error occurs.
+ */
+ @Test
+ public void testCompressionEmulationOfCompressedInputWithCustomRatios()
+ throws Exception {
+ final String tracePath = getTraceFile("compression_case2_trace");
+ Assert.assertNotNull("Trace file has not found.", tracePath);
+ UtilsForGridmix.cleanup(gridmixDir, rtClient.getDaemonConf());
+ final String [] runtimeValues = {"LOADJOB",
+ SubmitterUserResolver.class.getName(),
+ "STRESS",
+ inputSizeInMB + "m",
+ tracePath};
+
+ final String [] otherArgs = {
+ "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+ "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true",
+ "-D", GridMixConfig.GRIDMIX_INPUT_DECOMPRESS_ENABLE + "=true",
+ "-D", GridMixConfig.GRIDMIX_INPUT_COMPRESS_RATIO + "=0.58",
+ "-D", GridMixConfig.GRIDMIX_INTERMEDIATE_COMPRESSION_RATIO + "=0.42"
+ };
+
+ runGridmixAndVerify(runtimeValues, otherArgs, tracePath,
+ GridMixRunMode.DATA_GENERATION_AND_RUN_GRIDMIX.getValue());
+ }
+}
+
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationForUncompressInAndCompressOut.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationForUncompressInAndCompressOut.java
new file mode 100644
index 0000000..70dc0d1
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationForUncompressInAndCompressOut.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixConfig;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixRunMode;
+import org.apache.hadoop.mapred.gridmix.test.system.UtilsForGridmix;
+import org.junit.Assert;
+import org.junit.Test;
+/**
+ * Verify the gridmix jobs compression ratio's of reduce output and
+ * with default and custom ratios.
+ */
+public class TestCompressionEmulationForUncompressInAndCompressOut
+ extends GridmixSystemTestCase {
+ private static final Log LOG =
+ LogFactory.getLog(
+ "TestCompressionEmulationForUncompressInAndCompressOut.class");
+ final long inputSizeInMB = 1024 * 6;
+
+ /**
+ * Generate a uncompressed input data and verify the compression ratios
+ * of reduce output against default output compression ratio.
+ * @throws Exception -if an error occurs.
+ */
+ @Test
+ public void testCompressionEmulationOfCompressedOuputWithDefaultRatios()
+ throws Exception {
+ final String tracePath = getTraceFile("compression_case3_trace");
+ Assert.assertNotNull("Trace file has not found.", tracePath);
+ final String [] runtimeValues =
+ {"LOADJOB",
+ RoundRobinUserResolver.class.getName(),
+ "REPLAY",
+ inputSizeInMB + "m",
+ "file://" + UtilsForGridmix.getProxyUsersFile(conf),
+ tracePath};
+
+ final String [] otherArgs = {
+ "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+ "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true"
+ };
+
+ runGridmixAndVerify(runtimeValues, otherArgs, tracePath,
+ GridMixRunMode.DATA_GENERATION_AND_RUN_GRIDMIX.getValue());
+ }
+
+ /**
+ * Use existing uncompressed input data and verify the compression ratio
+ * of reduce output against custom output compression ratio and also verify
+ * the compression output file output format.
+ * @throws Exception -if an error occurs.
+ */
+ @Test
+ public void testCompressionEmulationOfCompressedOutputWithCustomRatios()
+ throws Exception {
+ final String tracePath = getTraceFile("compression_case3_trace");
+ Assert.assertNotNull("Trace file has not found.", tracePath);
+ UtilsForGridmix.cleanup(gridmixDir, rtClient.getDaemonConf());
+ final String [] runtimeValues = { "LOADJOB",
+ SubmitterUserResolver.class.getName(),
+ "STRESS",
+ inputSizeInMB + "m",
+ tracePath };
+
+ final String [] otherArgs = {
+ "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+ "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true",
+ "-D", GridMixConfig.GRIDMIX_OUTPUT_COMPRESSION_RATIO + "=0.38"
+ };
+
+ runGridmixAndVerify(runtimeValues, otherArgs, tracePath,
+ GridMixRunMode.DATA_GENERATION_AND_RUN_GRIDMIX.getValue());
+ }
+}
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridmixCompressedInputGeneration.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridmixCompressedInputGeneration.java
new file mode 100644
index 0000000..1dfc897
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridmixCompressedInputGeneration.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.gridmix.Gridmix;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixConfig;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixRunMode;
+import org.apache.hadoop.mapred.gridmix.test.system.UtilsForGridmix;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Verify the Gridmix generated input if compression emulation turn on.
+ */
+public class TestGridmixCompressedInputGeneration
+ extends GridmixSystemTestCase {
+
+ private static final Log LOG =
+ LogFactory.getLog("TestGridmixCompressedInputGeneration.class");
+
+ /**
+ * Generate input data and verify whether input files are compressed
+ * or not.
+ * @throws Exception - if an error occurs.
+ */
+ @Test
+ public void testGridmixCompressionInputGeneration() throws Exception {
+ final long inputSizeInMB = 1024 * 7;
+ final String [] runtimeValues = {"LOADJOB",
+ SubmitterUserResolver.class.getName(),
+ "STRESS",
+ inputSizeInMB + "m",
+ "file:///dev/null"};
+ final String [] otherArgs = {
+ "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+ "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true"
+ };
+ LOG.info("Verify the generated compressed input data.");
+ runAndVerify(true, inputSizeInMB, runtimeValues, otherArgs);
+ }
+
+ /**
+ * Disable compression emulation and verify whether input files are
+ * compressed or not.
+ * @throws Exception
+ */
+ @Test
+ public void testGridmixInputGenerationWithoutCompressionEnable()
+ throws Exception {
+ UtilsForGridmix.cleanup(gridmixDir, rtClient.getDaemonConf());
+ final long inputSizeInMB = 1024 * 6;
+ final String [] runtimeValues = {"LOADJOB",
+ SubmitterUserResolver.class.getName(),
+ "STRESS",
+ inputSizeInMB + "m",
+ "file:///dev/null"};
+ final String [] otherArgs = {
+ "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+ "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=false"
+ };
+
+ LOG.info("Verify the generated uncompressed input data.");
+ runAndVerify(false, inputSizeInMB, runtimeValues, otherArgs);
+ }
+
+ private void runAndVerify(boolean isCompressed, long INPUT_SIZE,
+ String [] runtimeValues, String [] otherArgs) throws Exception {
+ int exitCode =
+ UtilsForGridmix.runGridmixJob(gridmixDir, conf,
+ GridMixRunMode.DATA_GENERATION.getValue(),
+ runtimeValues,otherArgs);
+ Assert.assertEquals("Data generation has failed.", 0, exitCode);
+ verifyJobStatus();
+ verifyInputDataSize(INPUT_SIZE);
+ verifyInputFiles(isCompressed);
+ }
+
+ private void verifyInputFiles(boolean isCompressed) throws IOException {
+ List<String> inputFiles =
+ getInputFiles(conf, Gridmix.getGridmixInputDataPath(gridmixDir));
+ for (String inputFile: inputFiles) {
+ boolean fileStatus = (inputFile.contains(".gz")
+ || inputFile.contains(".tgz"))? true : false;
+ if (isCompressed) {
+ Assert.assertTrue("Compressed input split file was not found.",
+ fileStatus);
+ } else {
+ Assert.assertFalse("Uncompressed input split file was not found.",
+ fileStatus);
+ }
+ }
+ }
+
+ private void verifyInputDataSize(long INPUT_SIZE) throws IOException {
+ long actDataSize =
+ getInputDataSizeInMB(conf, Gridmix.getGridmixInputDataPath(gridmixDir));
+ double ratio = ((double)actDataSize)/INPUT_SIZE;
+ long expDataSize = (long)(INPUT_SIZE * ratio);
+ Assert.assertEquals("Generated data has not matched with given size.",
+ expDataSize, actDataSize);
+ }
+
+ private void verifyJobStatus() throws IOException {
+ JobClient jobClient = jtClient.getClient();
+ int len = jobClient.getAllJobs().length;
+ LOG.info("Verify the job status after completion of job...");
+ Assert.assertEquals("Job has not succeeded.", JobStatus.SUCCEEDED,
+ jobClient.getAllJobs()[len -1].getRunState());
+ }
+
+ private long getInputDataSizeInMB(Configuration conf, Path inputDir)
+ throws IOException {
+ FileSystem fs = inputDir.getFileSystem(conf);
+ ContentSummary csmry = fs.getContentSummary(inputDir);
+ long dataSize = csmry.getLength();
+ dataSize = dataSize/(1024 * 1024);
+ return dataSize;
+ }
+
+ private List<String> getInputFiles(Configuration conf, Path inputDir)
+ throws IOException {
+ FileSystem fs = inputDir.getFileSystem(conf);
+ FileStatus [] listStatus = fs.listStatus(inputDir);
+ List<String> files = new ArrayList<String>();
+ for (FileStatus fileStat : listStatus) {
+ files.add(getInputFile(fileStat, conf));
+ }
+ return files;
+ }
+
+ private String getInputFile(FileStatus fstatus, Configuration conf)
+ throws IOException {
+ String fileName = null;
+ if (!fstatus.isDirectory()) {
+ fileName = fstatus.getPath().getName();
+ } else {
+ FileSystem fs = fstatus.getPath().getFileSystem(conf);
+ FileStatus [] listStatus = fs.listStatus(fstatus.getPath());
+ for (FileStatus fileStat : listStatus) {
+ return getInputFile(fileStat, conf);
+ }
+ }
+ return fileName;
+ }
+}
+
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridmixCompressionEmulationWithCompressInput.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridmixCompressionEmulationWithCompressInput.java
new file mode 100644
index 0000000..adaa0d2
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridmixCompressionEmulationWithCompressInput.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixConfig;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixRunMode;
+import org.apache.hadoop.mapred.gridmix.test.system.UtilsForGridmix;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Verify the gridmix jobs compression ratios of map input,
+ * map output and reduce output with default and user specified
+ * compression ratios.
+ *
+ */
+public class TestGridmixCompressionEmulationWithCompressInput
+ extends GridmixSystemTestCase {
+ private static final Log LOG =
+ LogFactory.getLog(
+ "TestGridmixCompressionEmulationWithCompressInput.class");
+ final long inputSizeInMB = 1024 * 6;
+
+ /**
+ * Generate compressed input data and verify the map input,
+ * map output and reduce output compression ratios of gridmix jobs
+ * against the default compression ratios.
+ * @throws Exception - if an error occurs.
+ */
+ @Test
+ public void testGridmixCompressionRatiosAgainstDefaultCompressionRatio()
+ throws Exception {
+ final String tracePath = getTraceFile("compression_case1_trace");
+ Assert.assertNotNull("Trace file has not found.", tracePath);
+
+ final String [] runtimeValues =
+ {"LOADJOB",
+ RoundRobinUserResolver.class.getName(),
+ "STRESS",
+ inputSizeInMB + "m",
+ "file://" + UtilsForGridmix.getProxyUsersFile(conf),
+ tracePath};
+
+ final String [] otherArgs = {
+ "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+ "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true"
+ };
+ runGridmixAndVerify(runtimeValues, otherArgs, tracePath,
+ GridMixRunMode.DATA_GENERATION_AND_RUN_GRIDMIX.getValue());
+ }
+
+ /**
+ * Verify map input, map output and reduce output compression ratios of
+ * gridmix jobs against user specified compression ratios.
+ * @throws Exception - if an error occurs.
+ */
+ @Test
+ public void testGridmixOuputCompressionRatiosAgainstCustomRatios()
+ throws Exception {
+ final String tracePath = getTraceFile("compression_case1_trace");
+ Assert.assertNotNull("Trace file has not found.", tracePath);
+ UtilsForGridmix.cleanup(gridmixDir, rtClient.getDaemonConf());
+
+ final String [] runtimeValues =
+ {"LOADJOB",
+ RoundRobinUserResolver.class.getName(),
+ "STRESS",
+ inputSizeInMB + "m",
+ "file://" + UtilsForGridmix.getProxyUsersFile(conf),
+ tracePath};
+
+ final String [] otherArgs = {
+ "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+ "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true",
+ "-D", GridMixConfig.GRIDMIX_INPUT_DECOMPRESS_ENABLE + "=true",
+ "-D", GridMixConfig.GRIDMIX_INPUT_COMPRESS_RATIO + "=0.68",
+ "-D", GridMixConfig.GRIDMIX_INTERMEDIATE_COMPRESSION_RATIO + "=0.35",
+ "-D", GridMixConfig.GRIDMIX_OUTPUT_COMPRESSION_RATIO + "=0.40"
+ };
+ runGridmixAndVerify(runtimeValues, otherArgs, tracePath,
+ GridMixRunMode.DATA_GENERATION_AND_RUN_GRIDMIX.getValue());
+ }
+}
+
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java
index 29adaf1..2cecc40 100644
--- a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java
@@ -100,6 +100,30 @@
"gridmix.distributed-cache-emulation.enable";
/**
+ * Gridmix input decompression enable.
+ */
+ public static final String GRIDMIX_INPUT_DECOMPRESS_ENABLE =
+ "gridmix.compression-emulation.input-decompression.enable";
+
+ /**
+ * Gridmix input compression ratio.
+ */
+ public static final String GRIDMIX_INPUT_COMPRESS_RATIO =
+ "gridmix.compression-emulation.map-input.decompression-ratio";
+
+ /**
+ * Gridmix intermediate compression ratio.
+ */
+ public static final String GRIDMIX_INTERMEDIATE_COMPRESSION_RATIO =
+ "gridmix.compression-emulation.map-output.compression-ratio";
+
+ /**
+ * Gridmix output compression ratio.
+ */
+ public static final String GRIDMIX_OUTPUT_COMPRESSION_RATIO =
+ "gridmix.compression-emulation.reduce-output.compression-ratio";
+
+ /**
* Gridmix logger mode.
*/
public static final String GRIDMIX_LOG_MODE =
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java
index 1083a83..a3ad0e8 100644
--- a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java
@@ -37,6 +37,8 @@
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.test.system.JTClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.tools.rumen.LoggedJob;
@@ -61,6 +63,17 @@
static final String jobTypeKey = GridMixConfig.GRIDMIX_JOB_TYPE;
static final String mapTaskKey = GridMixConfig.GRIDMIX_SLEEPJOB_MAPTASK_ONLY;
static final String usrResolver = GridMixConfig.GRIDMIX_USER_RESOLVER;
+ static final String fileOutputFormatKey = FileOutputFormat.COMPRESS;
+ static final String fileInputFormatKey = FileInputFormat.INPUT_DIR;
+ static final String compEmulKey = GridMixConfig.GRIDMIX_COMPRESSION_ENABLE;
+ static final String inputDecompKey =
+ GridMixConfig.GRIDMIX_INPUT_DECOMPRESS_ENABLE;
+ static final String mapInputCompRatio =
+ GridMixConfig.GRIDMIX_INPUT_COMPRESS_RATIO;
+ static final String mapOutputCompRatio =
+ GridMixConfig.GRIDMIX_INTERMEDIATE_COMPRESSION_RATIO;
+ static final String reduceOutputCompRatio =
+ GridMixConfig.GRIDMIX_OUTPUT_COMPRESSION_RATIO;
/**
* Gridmix job verification constructor
@@ -118,6 +131,8 @@
verifySimulatedJobSummary(zombieJob, jhInfo, simuJobConf);
verifyJobMapCounters(counters, mapJobCounters, simuJobConf);
verifyJobReduceCounters(counters, reduceJobCounters, simuJobConf);
+ verifyCompressionEmulation(zombieJob.getJobConf(), simuJobConf, counters,
+ reduceJobCounters, mapJobCounters);
LOG.info("Done.");
}
}
@@ -329,6 +344,159 @@
return userResolverVal;
}
+ /**
+ * It verifies the compression ratios of mapreduce jobs.
+ * @param origJobConf - original job configuration.
+ * @param simuJobConf - simulated job configuration.
+ * @param counters - simulated job counters.
+ * @param origReduceCounters - original job reduce counters.
+ * @param origMapCounters - original job map counters.
+ * @throws ParseException - if a parser error occurs.
+ * @throws IOException - if an I/O error occurs.
+ */
+ public void verifyCompressionEmulation(JobConf origJobConf,
+ JobConf simuJobConf,Counters counters,
+ Map<String, Long> origReduceCounters,
+ Map<String, Long> origMapJobCounters)
+ throws ParseException,IOException {
+ if (simuJobConf.getBoolean(compEmulKey, false)) {
+ String inputDir = origJobConf.get(fileInputFormatKey);
+ Assert.assertNotNull(fileInputFormatKey + " is Null",inputDir);
+ // Verify input compression whether it's enable or not.
+ if (inputDir.contains(".gz") || inputDir.contains(".tgz")
+ || inputDir.contains(".bz")) {
+ Assert.assertTrue("Input decompression attribute has been not set for "
+ + "for compressed input",
+ simuJobConf.getBoolean(inputDecompKey, false));
+
+ float INPUT_COMP_RATIO =
+ getExpectedCompressionRatio(simuJobConf, mapInputCompRatio);
+ float INTERMEDIATE_COMP_RATIO =
+ getExpectedCompressionRatio(simuJobConf, mapOutputCompRatio);
+
+ // Verify Map Input Compression Ratio.
+ long simMapInputBytes = getCounterValue(counters, "HDFS_BYTES_READ");
+ long uncompressedInputSize = origMapJobCounters.get("MAP_INPUT_BYTES");
+ assertMapInputCompressionRatio(simMapInputBytes, uncompressedInputSize,
+ INPUT_COMP_RATIO);
+
+ // Verify Map Output Compression Ratio.
+ long simReduceInputBytes =
+ getCounterValue(counters, "REDUCE_SHUFFLE_BYTES");
+ long simMapOutputBytes = getCounterValue(counters, "MAP_OUTPUT_BYTES");
+ assertMapOuputCompressionRatio(simReduceInputBytes, simMapOutputBytes,
+ INTERMEDIATE_COMP_RATIO);
+ } else {
+ Assert.assertFalse("Input decompression attribute has been enabled "
+ + "for uncompressed input. ",
+ Boolean.valueOf(
+ simuJobConf.getBoolean(inputDecompKey, false)));
+ }
+
+ Assert.assertEquals("Simulated job output format has not matched with "
+ + "original job output format.",
+ origJobConf.getBoolean(fileOutputFormatKey,false),
+ simuJobConf.getBoolean(fileOutputFormatKey,false));
+
+ if (simuJobConf.getBoolean(fileOutputFormatKey,false)) {
+ float OUTPUT_COMP_RATIO =
+ getExpectedCompressionRatio(simuJobConf, reduceOutputCompRatio);
+
+ //Verify reduce output compression ratio.
+ long simReduceOutputBytes =
+ getCounterValue(counters, "HDFS_BYTES_WRITTEN");
+ long origReduceOutputBytes =
+ origReduceCounters.get("REDUCE_OUTPUT_BYTES");
+ assertReduceOutputCompressionRatio(simReduceOutputBytes,
+ origReduceOutputBytes,
+ OUTPUT_COMP_RATIO);
+ }
+ }
+ }
+
+ private void assertMapInputCompressionRatio(long simMapInputBytes,
+ long origMapInputBytes,
+ float expInputCompRatio) {
+ LOG.info("***Verify the map input bytes compression ratio****");
+ LOG.info("Simulated job's map input bytes(REDUCE_SHUFFLE_BYTES): "
+ + simMapInputBytes);
+ LOG.info("Original job's map input bytes: " + origMapInputBytes);
+
+ final float actInputCompRatio =
+ getActualCompressionRatio(simMapInputBytes, origMapInputBytes);
+ LOG.info("Expected Map Input Compression Ratio:" + expInputCompRatio);
+ LOG.info("Actual Map Input Compression Ratio:" + actInputCompRatio);
+
+ float diffVal = (float)(expInputCompRatio * 0.06);
+ LOG.info("Expected Difference of Map Input Compression Ratio is <= " +
+ + diffVal);
+ float delta = Math.abs(expInputCompRatio - actInputCompRatio);
+ LOG.info("Actual Difference of Map Iput Compression Ratio:" + delta);
+ Assert.assertTrue("Simulated job input compression ratio has mismatched.",
+ delta <= diffVal);
+ LOG.info("******Done******");
+ }
+
+ private void assertMapOuputCompressionRatio(long simReduceInputBytes,
+ long simMapoutputBytes,
+ float expMapOuputCompRatio) {
+ LOG.info("***Verify the map output bytes compression ratio***");
+ LOG.info("Simulated job reduce input bytes:" + simReduceInputBytes);
+ LOG.info("Simulated job map output bytes:" + simMapoutputBytes);
+
+ final float actMapOutputCompRatio =
+ getActualCompressionRatio(simReduceInputBytes, simMapoutputBytes);
+ LOG.info("Expected Map Output Compression Ratio:" + expMapOuputCompRatio);
+ LOG.info("Actual Map Output Compression Ratio:" + actMapOutputCompRatio);
+
+ float diffVal = 0.05f;
+ LOG.info("Expected Difference Of Map Output Compression Ratio is <= "
+ + diffVal);
+ float delta = Math.abs(expMapOuputCompRatio - actMapOutputCompRatio);
+ LOG.info("Actual Difference Of Map Ouput Compression Ratio :" + delta);
+
+ Assert.assertTrue("Simulated job map output compression ratio "
+ + "has not been matched.", delta <= diffVal);
+ LOG.info("******Done******");
+ }
+
+ private void assertReduceOutputCompressionRatio(long simReduceOutputBytes,
+ long origReduceOutputBytes , float expOutputCompRatio ) {
+ LOG.info("***Verify the reduce output bytes compression ratio***");
+ final float actOuputputCompRatio =
+ getActualCompressionRatio(simReduceOutputBytes, origReduceOutputBytes);
+ LOG.info("Simulated job's reduce output bytes:" + simReduceOutputBytes);
+ LOG.info("Original job's reduce output bytes:" + origReduceOutputBytes);
+ LOG.info("Expected output compression ratio:" + expOutputCompRatio);
+ LOG.info("Actual output compression ratio:" + actOuputputCompRatio);
+ long diffVal = (long)(origReduceOutputBytes * 0.15);
+ long delta = Math.abs(origReduceOutputBytes - simReduceOutputBytes);
+ LOG.info("Expected difference of output compressed bytes is <= "
+ + diffVal);
+ LOG.info("Actual difference of compressed ouput bytes:" + delta);
+ Assert.assertTrue("Simulated job reduce output compression ratio " +
+ "has not been matched.", delta <= diffVal);
+ LOG.info("******Done******");
+ }
+
+ private float getExpectedCompressionRatio(JobConf simuJobConf,
+ String RATIO_TYPE) {
+ // Default decompression ratio is 0.50f irrespective of original
+ //job compression ratio.
+ if (simuJobConf.get(RATIO_TYPE) != null) {
+ return Float.parseFloat(simuJobConf.get(RATIO_TYPE));
+ } else {
+ return 0.50f;
+ }
+ }
+
+ private float getActualCompressionRatio(long compressBytes,
+ long uncompessBytes) {
+ double ratio = ((double)compressBytes) / uncompessBytes;
+ int significant = (int)Math.round(ratio * 100);
+ return ((float)significant)/100;
+ }
+
private String convertJobStatus(String jobStatus) {
if (jobStatus.equals("SUCCEEDED")) {
return "SUCCESS";
diff --git a/src/contrib/gridmix/src/test/system/resources/compression_case1_trace.json.gz b/src/contrib/gridmix/src/test/system/resources/compression_case1_trace.json.gz
new file mode 100644
index 0000000..4e5615f
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/resources/compression_case1_trace.json.gz
Binary files differ
diff --git a/src/contrib/gridmix/src/test/system/resources/compression_case2_trace.json.gz b/src/contrib/gridmix/src/test/system/resources/compression_case2_trace.json.gz
new file mode 100644
index 0000000..faba98b
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/resources/compression_case2_trace.json.gz
Binary files differ
diff --git a/src/contrib/gridmix/src/test/system/resources/compression_case3_trace.json.gz b/src/contrib/gridmix/src/test/system/resources/compression_case3_trace.json.gz
new file mode 100644
index 0000000..5adbf43
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/resources/compression_case3_trace.json.gz
Binary files differ
diff --git a/src/contrib/gridmix/src/test/system/resources/compression_case4_trace.json.gz b/src/contrib/gridmix/src/test/system/resources/compression_case4_trace.json.gz
new file mode 100644
index 0000000..cdff79a
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/resources/compression_case4_trace.json.gz
Binary files differ