MAPREDUCE-2544. [Gridmix] Add compression emulation system tests to Gridmix. (Vinay Kumar Thota via amarrk)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1130147 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index f0e9858..3727df5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -23,6 +23,9 @@
 
   IMPROVEMENTS
 
+    MAPREDUCE-2544. [Gridmix] Add compression emulation system tests to 
+    Gridmix. (Vinay Kumar Thota via amarrk)
+
     MAPREDUCE-2517. [Gridmix] Add system tests to Gridmix. 
     (Vinay Kumar Thota via amarrk)
 
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationEnableForAllTypesOfJobs.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationEnableForAllTypesOfJobs.java
new file mode 100644
index 0000000..4144bae
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationEnableForAllTypesOfJobs.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixConfig;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixRunMode;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Verify the compression emulation for all the jobs in the trace 
+ * irrespective of compressed inputs.
+ */
+public class TestCompressionEmulationEnableForAllTypesOfJobs 
+    extends GridmixSystemTestCase { 
+  private static final Log LOG = 
+      LogFactory.getLog(
+          "TestCompressionEmulationEnableForAllTypesOfJobs.class");
+
+  /**
+   *  Generate compressed input data and verify the compression emulation
+   *  for all the jobs in the trace irrespective of whether the original
+   *  job uses the compressed input or not.Also use the custom compression
+   *  ratios for map input, map output and reduce output.
+   * @throws Exception - if an error occurs.
+   */
+  @Test
+  public void testInputCompressionEmualtionEnableForAllJobsWithDefaultRatios() 
+      throws Exception { 
+    final long inputSizeInMB = 1024 * 6;
+    final String tracePath = getTraceFile("compression_case4_trace");
+    Assert.assertNotNull("Trace file has not found.", tracePath);
+    final String [] runtimeValues = {"LOADJOB",
+                                     SubmitterUserResolver.class.getName(),
+                                     "REPLAY",
+                                     inputSizeInMB + "m",
+                                     tracePath};
+
+    final String [] otherArgs = {
+        "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+        "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true",
+        "-D", GridMixConfig.GRIDMIX_INPUT_DECOMPRESS_ENABLE + "=true",
+        "-D", GridMixConfig.GRIDMIX_INPUT_COMPRESS_RATIO + "=0.46",
+        "-D", GridMixConfig.GRIDMIX_INTERMEDIATE_COMPRESSION_RATIO + "=0.35",
+        "-D", GridMixConfig.GRIDMIX_OUTPUT_COMPRESSION_RATIO + "=0.36"
+    };
+
+    runGridmixAndVerify(runtimeValues, otherArgs, tracePath, 
+        GridMixRunMode.DATA_GENERATION_AND_RUN_GRIDMIX.getValue());
+  }
+  
+  /**
+   *  Use existing compressed input data and turn off the compression 
+   *  emulation. Verify the compression emulation whether it uses 
+   *  by the jobs or not.
+   * @throws Exception - if an error occurs.
+   */
+  @Test
+  public void testInputCompressionEmulationEnableForAllJobsWithCustomRatios() 
+      throws Exception { 
+     final String tracePath = getTraceFile("compression_case4_trace");
+     Assert.assertNotNull("Trace file has not found.", tracePath);
+     final String [] runtimeValues = {"LOADJOB",
+                                      SubmitterUserResolver.class.getName(),
+                                      "SERIAL",
+                                      tracePath};
+
+    final String [] otherArgs = {
+        "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+        "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=false"
+    };
+
+    runGridmixAndVerify(runtimeValues, otherArgs, tracePath, 
+                        GridMixRunMode.RUN_GRIDMIX.getValue());
+  }  
+}
+
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationForCompressInAndUncompressOut.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationForCompressInAndUncompressOut.java
new file mode 100644
index 0000000..6f0dcbf
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationForCompressInAndUncompressOut.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixConfig;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixRunMode;
+import org.apache.hadoop.mapred.gridmix.test.system.UtilsForGridmix;
+import org.junit.Assert;
+import org.junit.Test;
+/**
+ * Verify the gridmix jobs compression ratio's of input, 
+ * intermediate input and with default/custom ratios.Also verify
+ * the compressed output file format is enabled or not.
+ *
+ */
+public class TestCompressionEmulationForCompressInAndUncompressOut 
+    extends GridmixSystemTestCase { 
+  private static final Log LOG = 
+      LogFactory.getLog(
+          "TestCompressionEmulationForCompressInAndUncompressOut.class");
+  final long inputSizeInMB = 1024 * 6;
+
+  /**
+   * Generate a compressed input data and verify the compression ratios 
+   * of map input and map output against default compression ratios 
+   * and also verify the whether the compressed output file output format 
+   * is enabled or not.
+   * @throws Exception -if an error occurs.
+   */
+  @Test
+  public void testCompressionEmulationOfCompressedInputWithDefaultRatios() 
+      throws Exception {
+    final String tracePath = getTraceFile("compression_case2_trace");
+    Assert.assertNotNull("Trace file has not found.", tracePath);
+    final String [] runtimeValues = {"LOADJOB",
+                                     SubmitterUserResolver.class.getName(),
+                                     "STRESS",
+                                     inputSizeInMB + "m",
+                                     tracePath};
+
+    final String [] otherArgs = {
+        "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+        "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true"
+    };
+
+    runGridmixAndVerify(runtimeValues, otherArgs, tracePath, 
+        GridMixRunMode.DATA_GENERATION_AND_RUN_GRIDMIX.getValue());
+  }
+  
+  /**
+   * Use existing compressed input data and verify the compression ratios 
+   * of input and intermediate input against custom compression ratios 
+   * and also verify the compressed output file output format is enabled or not.
+   * @throws Exception -if an error occurs.
+   */
+  @Test
+  public void testCompressionEmulationOfCompressedInputWithCustomRatios() 
+      throws Exception {
+    final String tracePath = getTraceFile("compression_case2_trace");
+    Assert.assertNotNull("Trace file has not found.", tracePath);
+    UtilsForGridmix.cleanup(gridmixDir, rtClient.getDaemonConf());
+    final String [] runtimeValues = {"LOADJOB",
+                                     SubmitterUserResolver.class.getName(),
+                                     "STRESS",
+                                     inputSizeInMB + "m",
+                                     tracePath};
+
+    final String [] otherArgs = {
+        "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+        "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true",
+        "-D", GridMixConfig.GRIDMIX_INPUT_DECOMPRESS_ENABLE + "=true",
+        "-D", GridMixConfig.GRIDMIX_INPUT_COMPRESS_RATIO + "=0.58",
+        "-D", GridMixConfig.GRIDMIX_INTERMEDIATE_COMPRESSION_RATIO + "=0.42"
+    };
+
+    runGridmixAndVerify(runtimeValues, otherArgs, tracePath, 
+        GridMixRunMode.DATA_GENERATION_AND_RUN_GRIDMIX.getValue());
+  }
+}
+
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationForUncompressInAndCompressOut.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationForUncompressInAndCompressOut.java
new file mode 100644
index 0000000..70dc0d1
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationForUncompressInAndCompressOut.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixConfig;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixRunMode;
+import org.apache.hadoop.mapred.gridmix.test.system.UtilsForGridmix;
+import org.junit.Assert;
+import org.junit.Test;
+/**
+ * Verify the gridmix jobs compression ratio's of reduce output and 
+ * with default and custom ratios.
+ */
+public class TestCompressionEmulationForUncompressInAndCompressOut
+   extends GridmixSystemTestCase { 
+   private static final Log LOG = 
+       LogFactory.getLog(
+           "TestCompressionEmulationForUncompressInAndCompressOut.class");
+   final long inputSizeInMB = 1024 * 6;
+
+  /**
+   * Generate a uncompressed input data and verify the compression ratios 
+   * of reduce output against default output compression ratio.
+   * @throws Exception -if an error occurs.
+   */
+  @Test
+  public void testCompressionEmulationOfCompressedOuputWithDefaultRatios() 
+      throws Exception { 
+    final String tracePath = getTraceFile("compression_case3_trace");
+    Assert.assertNotNull("Trace file has not found.", tracePath);
+    final String [] runtimeValues = 
+                     {"LOADJOB",
+                      RoundRobinUserResolver.class.getName(),
+                      "REPLAY",
+                      inputSizeInMB + "m",
+                      "file://" + UtilsForGridmix.getProxyUsersFile(conf),
+                      tracePath};
+
+    final String [] otherArgs = {
+        "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+        "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true"
+    };
+
+    runGridmixAndVerify(runtimeValues, otherArgs, tracePath,
+        GridMixRunMode.DATA_GENERATION_AND_RUN_GRIDMIX.getValue());
+  }
+  
+  /**
+   * Use existing uncompressed input data and verify the compression ratio 
+   * of reduce output against custom output compression ratio and also verify 
+   * the compression output file output format.
+   * @throws Exception -if an error occurs.
+   */
+  @Test
+  public void testCompressionEmulationOfCompressedOutputWithCustomRatios() 
+      throws Exception {
+    final String tracePath = getTraceFile("compression_case3_trace");
+    Assert.assertNotNull("Trace file has not found.", tracePath);
+    UtilsForGridmix.cleanup(gridmixDir, rtClient.getDaemonConf());
+    final String [] runtimeValues = { "LOADJOB",
+                                      SubmitterUserResolver.class.getName(),
+                                      "STRESS",
+                                      inputSizeInMB + "m",
+                                      tracePath };
+
+    final String [] otherArgs = { 
+        "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+        "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true",
+        "-D", GridMixConfig.GRIDMIX_OUTPUT_COMPRESSION_RATIO + "=0.38"
+    };
+
+    runGridmixAndVerify(runtimeValues, otherArgs, tracePath, 
+        GridMixRunMode.DATA_GENERATION_AND_RUN_GRIDMIX.getValue());
+  }
+}
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridmixCompressedInputGeneration.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridmixCompressedInputGeneration.java
new file mode 100644
index 0000000..1dfc897
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridmixCompressedInputGeneration.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.gridmix.Gridmix;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixConfig;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixRunMode;
+import org.apache.hadoop.mapred.gridmix.test.system.UtilsForGridmix;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Verify the Gridmix generated input if compression emulation turn on.
+ */
+public class TestGridmixCompressedInputGeneration 
+    extends GridmixSystemTestCase { 
+
+  private static final Log LOG = 
+      LogFactory.getLog("TestGridmixCompressedInputGeneration.class");
+
+  /**
+   * Generate input data and verify whether input files are compressed
+   * or not.
+   * @throws Exception - if an error occurs.
+   */
+  @Test
+  public void testGridmixCompressionInputGeneration() throws Exception {
+    final long inputSizeInMB = 1024 * 7;
+    final String [] runtimeValues = {"LOADJOB",
+                                     SubmitterUserResolver.class.getName(),
+                                     "STRESS",
+                                     inputSizeInMB  + "m",
+                                     "file:///dev/null"};
+    final String [] otherArgs = { 
+        "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+        "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true"
+    };
+    LOG.info("Verify the generated compressed input data.");
+    runAndVerify(true, inputSizeInMB, runtimeValues, otherArgs);
+  }
+
+  /**
+   * Disable compression emulation and verify whether input files are 
+   * compressed or not.
+   * @throws Exception
+   */
+  @Test
+  public void testGridmixInputGenerationWithoutCompressionEnable() 
+      throws Exception { 
+    UtilsForGridmix.cleanup(gridmixDir, rtClient.getDaemonConf());
+    final long inputSizeInMB = 1024 * 6;
+    final String [] runtimeValues = {"LOADJOB",
+                                     SubmitterUserResolver.class.getName(),
+                                     "STRESS",
+                                     inputSizeInMB + "m",
+                                     "file:///dev/null"};
+    final String [] otherArgs = {
+        "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+        "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=false"
+    };
+
+    LOG.info("Verify the generated uncompressed input data.");
+    runAndVerify(false, inputSizeInMB, runtimeValues, otherArgs);
+  }
+  
+  private void runAndVerify(boolean isCompressed, long INPUT_SIZE, 
+      String [] runtimeValues, String [] otherArgs) throws Exception { 
+    int exitCode = 
+        UtilsForGridmix.runGridmixJob(gridmixDir, conf, 
+                                      GridMixRunMode.DATA_GENERATION.getValue(),
+                                      runtimeValues,otherArgs);
+    Assert.assertEquals("Data generation has failed.", 0, exitCode);
+    verifyJobStatus();
+    verifyInputDataSize(INPUT_SIZE);
+    verifyInputFiles(isCompressed);
+  }
+  
+  private void verifyInputFiles(boolean isCompressed) throws IOException { 
+    List<String> inputFiles = 
+        getInputFiles(conf, Gridmix.getGridmixInputDataPath(gridmixDir));
+    for (String inputFile: inputFiles) {
+      boolean fileStatus = (inputFile.contains(".gz") 
+                         || inputFile.contains(".tgz"))? true : false;
+      if (isCompressed) { 
+        Assert.assertTrue("Compressed input split file was not found.",
+                          fileStatus);
+      } else {
+        Assert.assertFalse("Uncompressed input split file was not found.",
+                           fileStatus);
+      }
+    }
+  }
+
+  private void verifyInputDataSize(long INPUT_SIZE) throws IOException {
+    long actDataSize = 
+        getInputDataSizeInMB(conf, Gridmix.getGridmixInputDataPath(gridmixDir));
+    double ratio = ((double)actDataSize)/INPUT_SIZE;
+    long expDataSize = (long)(INPUT_SIZE * ratio);
+    Assert.assertEquals("Generated data has not matched with given size.", 
+                        expDataSize, actDataSize);
+  }
+
+  private void verifyJobStatus() throws IOException { 
+    JobClient jobClient = jtClient.getClient();
+    int len = jobClient.getAllJobs().length;
+    LOG.info("Verify the job status after completion of job...");
+    Assert.assertEquals("Job has not succeeded.", JobStatus.SUCCEEDED, 
+                        jobClient.getAllJobs()[len -1].getRunState());
+  }
+
+  private long getInputDataSizeInMB(Configuration conf, Path inputDir) 
+      throws IOException { 
+    FileSystem fs = inputDir.getFileSystem(conf);
+    ContentSummary csmry = fs.getContentSummary(inputDir);
+    long dataSize = csmry.getLength();
+    dataSize = dataSize/(1024 * 1024);
+    return dataSize;
+  }
+
+  private List<String> getInputFiles(Configuration conf, Path inputDir) 
+      throws IOException {
+    FileSystem fs = inputDir.getFileSystem(conf);
+    FileStatus [] listStatus = fs.listStatus(inputDir);
+    List<String> files = new ArrayList<String>();
+    for (FileStatus fileStat : listStatus) {
+      files.add(getInputFile(fileStat, conf));
+    }
+    return files;
+  }
+
+  private String getInputFile(FileStatus fstatus, Configuration conf) 
+      throws IOException {
+    String fileName = null;
+    if (!fstatus.isDirectory()) {
+      fileName = fstatus.getPath().getName();
+    } else {
+      FileSystem fs = fstatus.getPath().getFileSystem(conf);
+      FileStatus [] listStatus = fs.listStatus(fstatus.getPath());
+      for (FileStatus fileStat : listStatus) {
+         return getInputFile(fileStat, conf);
+      }
+    }
+    return fileName;
+  }
+}
+
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridmixCompressionEmulationWithCompressInput.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridmixCompressionEmulationWithCompressInput.java
new file mode 100644
index 0000000..adaa0d2
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridmixCompressionEmulationWithCompressInput.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixConfig;
+import org.apache.hadoop.mapred.gridmix.test.system.GridMixRunMode;
+import org.apache.hadoop.mapred.gridmix.test.system.UtilsForGridmix;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Verify the gridmix jobs compression ratios of map input, 
+ * map output and reduce output with default and user specified 
+ * compression ratios.
+ *
+ */
+public class TestGridmixCompressionEmulationWithCompressInput 
+    extends GridmixSystemTestCase {
+  private static final Log LOG = 
+      LogFactory.getLog(
+              "TestGridmixCompressionEmulationWithCompressInput.class");
+  final long inputSizeInMB = 1024 * 6;
+
+  /**
+   * Generate compressed input data and verify the map input, 
+   * map output and reduce output compression ratios of gridmix jobs 
+   * against the default compression ratios. 
+   * @throws Exception - if an error occurs.
+   */
+  @Test
+  public void testGridmixCompressionRatiosAgainstDefaultCompressionRatio() 
+      throws Exception { 
+    final String tracePath = getTraceFile("compression_case1_trace");
+    Assert.assertNotNull("Trace file has not found.", tracePath);
+
+    final String [] runtimeValues = 
+                     {"LOADJOB",
+                      RoundRobinUserResolver.class.getName(),
+                      "STRESS",
+                      inputSizeInMB + "m",
+                      "file://" + UtilsForGridmix.getProxyUsersFile(conf),
+                      tracePath};
+
+    final String [] otherArgs = { 
+        "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+        "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true"
+    };
+    runGridmixAndVerify(runtimeValues, otherArgs, tracePath,
+        GridMixRunMode.DATA_GENERATION_AND_RUN_GRIDMIX.getValue());
+  }
+  
+  /**
+   * Verify map input, map output and  reduce output compression ratios of
+   * gridmix jobs against user specified compression ratios. 
+   * @throws Exception - if an error occurs.
+   */
+  @Test
+  public void testGridmixOuputCompressionRatiosAgainstCustomRatios() 
+      throws Exception { 
+    final String tracePath = getTraceFile("compression_case1_trace");
+    Assert.assertNotNull("Trace file has not found.", tracePath);
+    UtilsForGridmix.cleanup(gridmixDir, rtClient.getDaemonConf());
+
+    final String [] runtimeValues = 
+                     {"LOADJOB",
+                      RoundRobinUserResolver.class.getName(),
+                      "STRESS",
+                      inputSizeInMB + "m",
+                      "file://" + UtilsForGridmix.getProxyUsersFile(conf),
+                      tracePath};
+
+    final String [] otherArgs = {
+        "-D", GridMixConfig.GRIDMIX_DISTCACHE_ENABLE + "=false",
+        "-D", GridMixConfig.GRIDMIX_COMPRESSION_ENABLE + "=true",
+        "-D", GridMixConfig.GRIDMIX_INPUT_DECOMPRESS_ENABLE + "=true",
+        "-D", GridMixConfig.GRIDMIX_INPUT_COMPRESS_RATIO + "=0.68",
+        "-D", GridMixConfig.GRIDMIX_INTERMEDIATE_COMPRESSION_RATIO + "=0.35",
+        "-D", GridMixConfig.GRIDMIX_OUTPUT_COMPRESSION_RATIO + "=0.40"
+    };
+    runGridmixAndVerify(runtimeValues, otherArgs, tracePath, 
+        GridMixRunMode.DATA_GENERATION_AND_RUN_GRIDMIX.getValue());
+  }
+}
+
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java
index 29adaf1..2cecc40 100644
--- a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java
@@ -100,6 +100,30 @@
       "gridmix.distributed-cache-emulation.enable";
 
   /**
+   * Gridmix input decompression enable.
+   */
+  public static final String GRIDMIX_INPUT_DECOMPRESS_ENABLE = 
+    "gridmix.compression-emulation.input-decompression.enable";
+
+  /**
+   * Gridmix input compression ratio.
+   */
+  public static final String GRIDMIX_INPUT_COMPRESS_RATIO = 
+    "gridmix.compression-emulation.map-input.decompression-ratio";
+
+  /**
+   * Gridmix intermediate compression ratio.
+   */
+  public static final String GRIDMIX_INTERMEDIATE_COMPRESSION_RATIO = 
+    "gridmix.compression-emulation.map-output.compression-ratio";
+
+  /**
+   * Gridmix output compression ratio.
+   */
+  public static final String GRIDMIX_OUTPUT_COMPRESSION_RATIO = 
+      "gridmix.compression-emulation.reduce-output.compression-ratio";
+
+  /**
    *  Gridmix logger mode.
    */
   public static final String GRIDMIX_LOG_MODE =
diff --git a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java
index 1083a83..a3ad0e8 100644
--- a/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java
+++ b/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java
@@ -37,6 +37,8 @@
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.test.system.JTClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.tools.rumen.LoggedJob;
@@ -61,6 +63,17 @@
   static final String jobTypeKey = GridMixConfig.GRIDMIX_JOB_TYPE;
   static final String mapTaskKey = GridMixConfig.GRIDMIX_SLEEPJOB_MAPTASK_ONLY;
   static final String usrResolver = GridMixConfig.GRIDMIX_USER_RESOLVER;
+  static final String fileOutputFormatKey = FileOutputFormat.COMPRESS;
+  static final String fileInputFormatKey = FileInputFormat.INPUT_DIR;
+  static final String compEmulKey = GridMixConfig.GRIDMIX_COMPRESSION_ENABLE;
+  static final String inputDecompKey = 
+      GridMixConfig.GRIDMIX_INPUT_DECOMPRESS_ENABLE;
+  static final String mapInputCompRatio = 
+      GridMixConfig.GRIDMIX_INPUT_COMPRESS_RATIO;
+  static final String mapOutputCompRatio = 
+      GridMixConfig.GRIDMIX_INTERMEDIATE_COMPRESSION_RATIO;
+  static final String reduceOutputCompRatio = 
+      GridMixConfig.GRIDMIX_OUTPUT_COMPRESSION_RATIO;
 
   /**
    * Gridmix job verification constructor
@@ -118,6 +131,8 @@
       verifySimulatedJobSummary(zombieJob, jhInfo, simuJobConf);
       verifyJobMapCounters(counters, mapJobCounters, simuJobConf);
       verifyJobReduceCounters(counters, reduceJobCounters, simuJobConf); 
+      verifyCompressionEmulation(zombieJob.getJobConf(), simuJobConf, counters, 
+                                 reduceJobCounters, mapJobCounters);
       LOG.info("Done.");
     }
   }
@@ -329,6 +344,159 @@
     return userResolverVal;
   }
 
+  /**
+   * It verifies the compression ratios of mapreduce jobs.
+   * @param origJobConf - original job configuration.
+   * @param simuJobConf - simulated job configuration.
+   * @param counters  - simulated job counters.
+   * @param origReduceCounters - original job reduce counters.
+   * @param origMapCounters - original job map counters.
+   * @throws ParseException - if a parser error occurs.
+   * @throws IOException - if an I/O error occurs.
+   */
+  public void verifyCompressionEmulation(JobConf origJobConf, 
+                                         JobConf simuJobConf,Counters counters, 
+                                         Map<String, Long> origReduceCounters, 
+                                         Map<String, Long> origMapJobCounters) 
+                                         throws ParseException,IOException { 
+    if (simuJobConf.getBoolean(compEmulKey, false)) {
+      String inputDir = origJobConf.get(fileInputFormatKey);
+      Assert.assertNotNull(fileInputFormatKey + " is Null",inputDir);
+      // Verify input compression whether it's enable or not.
+      if (inputDir.contains(".gz") || inputDir.contains(".tgz") 
+         || inputDir.contains(".bz")) { 
+        Assert.assertTrue("Input decompression attribute has been not set for " 
+                         + "for compressed input",
+                         simuJobConf.getBoolean(inputDecompKey, false));
+
+        float INPUT_COMP_RATIO = 
+            getExpectedCompressionRatio(simuJobConf, mapInputCompRatio);
+        float INTERMEDIATE_COMP_RATIO = 
+            getExpectedCompressionRatio(simuJobConf, mapOutputCompRatio);
+
+        // Verify Map Input Compression Ratio.
+        long simMapInputBytes = getCounterValue(counters, "HDFS_BYTES_READ");
+        long uncompressedInputSize = origMapJobCounters.get("MAP_INPUT_BYTES"); 
+        assertMapInputCompressionRatio(simMapInputBytes, uncompressedInputSize, 
+                                       INPUT_COMP_RATIO);
+
+        // Verify Map Output Compression Ratio.
+        long simReduceInputBytes = 
+            getCounterValue(counters, "REDUCE_SHUFFLE_BYTES");
+        long simMapOutputBytes = getCounterValue(counters, "MAP_OUTPUT_BYTES");
+        assertMapOuputCompressionRatio(simReduceInputBytes, simMapOutputBytes, 
+                                       INTERMEDIATE_COMP_RATIO);
+      } else {
+        Assert.assertFalse("Input decompression attribute has been enabled " 
+                          + "for uncompressed input. ", 
+                          Boolean.valueOf(
+                          simuJobConf.getBoolean(inputDecompKey, false)));
+      }
+
+      Assert.assertEquals("Simulated job output format has not matched with " 
+                         + "original job output format.",
+                         origJobConf.getBoolean(fileOutputFormatKey,false), 
+                         simuJobConf.getBoolean(fileOutputFormatKey,false));
+
+      if (simuJobConf.getBoolean(fileOutputFormatKey,false)) { 
+        float OUTPUT_COMP_RATIO = 
+            getExpectedCompressionRatio(simuJobConf, reduceOutputCompRatio);
+
+         //Verify reduce output compression ratio.
+         long simReduceOutputBytes = 
+             getCounterValue(counters, "HDFS_BYTES_WRITTEN");
+         long origReduceOutputBytes = 
+             origReduceCounters.get("REDUCE_OUTPUT_BYTES");
+         assertReduceOutputCompressionRatio(simReduceOutputBytes, 
+                                            origReduceOutputBytes, 
+                                            OUTPUT_COMP_RATIO);
+      }
+    }
+  }
+
+  private void assertMapInputCompressionRatio(long simMapInputBytes, 
+                                   long origMapInputBytes, 
+                                   float expInputCompRatio) { 
+    LOG.info("***Verify the map input bytes compression ratio****");
+    LOG.info("Simulated job's map input bytes(REDUCE_SHUFFLE_BYTES): " 
+            + simMapInputBytes);
+    LOG.info("Original job's map input bytes: " + origMapInputBytes);
+
+    final float actInputCompRatio = 
+        getActualCompressionRatio(simMapInputBytes, origMapInputBytes);
+    LOG.info("Expected Map Input Compression Ratio:" + expInputCompRatio);
+    LOG.info("Actual Map Input Compression Ratio:" + actInputCompRatio);
+
+    float diffVal = (float)(expInputCompRatio * 0.06);
+    LOG.info("Expected Difference of Map Input Compression Ratio is <= " + 
+            + diffVal);
+    float delta = Math.abs(expInputCompRatio - actInputCompRatio);
+    LOG.info("Actual Difference of Map Iput Compression Ratio:" + delta);
+    Assert.assertTrue("Simulated job input compression ratio has mismatched.", 
+                      delta <= diffVal);
+    LOG.info("******Done******");
+  }
+
+  private void assertMapOuputCompressionRatio(long simReduceInputBytes, 
+                                              long simMapoutputBytes, 
+                                              float expMapOuputCompRatio) { 
+    LOG.info("***Verify the map output bytes compression ratio***");
+    LOG.info("Simulated job reduce input bytes:" + simReduceInputBytes);
+    LOG.info("Simulated job map output bytes:" + simMapoutputBytes);
+
+    final float actMapOutputCompRatio = 
+        getActualCompressionRatio(simReduceInputBytes, simMapoutputBytes);
+    LOG.info("Expected Map Output Compression Ratio:" + expMapOuputCompRatio);
+    LOG.info("Actual Map Output Compression Ratio:" + actMapOutputCompRatio);
+
+    float diffVal = 0.05f;
+    LOG.info("Expected Difference Of Map Output Compression Ratio is <= " 
+            + diffVal);
+    float delta = Math.abs(expMapOuputCompRatio - actMapOutputCompRatio);
+    LOG.info("Actual Difference Of Map Ouput Compression Ratio :" + delta);
+
+    Assert.assertTrue("Simulated job map output compression ratio " 
+                     + "has not been matched.", delta <= diffVal);
+    LOG.info("******Done******");
+  }
+
+  private void assertReduceOutputCompressionRatio(long simReduceOutputBytes, 
+      long origReduceOutputBytes , float expOutputCompRatio ) {
+      LOG.info("***Verify the reduce output bytes compression ratio***");
+      final float actOuputputCompRatio = 
+          getActualCompressionRatio(simReduceOutputBytes, origReduceOutputBytes);
+      LOG.info("Simulated job's reduce output bytes:" + simReduceOutputBytes);
+      LOG.info("Original job's reduce output bytes:" + origReduceOutputBytes);
+      LOG.info("Expected output compression ratio:" + expOutputCompRatio);
+      LOG.info("Actual output compression ratio:" + actOuputputCompRatio);
+      long diffVal = (long)(origReduceOutputBytes * 0.15);
+      long delta = Math.abs(origReduceOutputBytes - simReduceOutputBytes);
+      LOG.info("Expected difference of output compressed bytes is <= " 
+              + diffVal);
+      LOG.info("Actual difference of compressed ouput bytes:" + delta);
+      Assert.assertTrue("Simulated job reduce output compression ratio " +
+         "has not been matched.", delta <= diffVal);
+      LOG.info("******Done******");
+  }
+
+  private float getExpectedCompressionRatio(JobConf simuJobConf, 
+                                            String RATIO_TYPE) {
+    // Default decompression ratio is 0.50f irrespective of original 
+    //job compression ratio.
+    if (simuJobConf.get(RATIO_TYPE) != null) {
+      return Float.parseFloat(simuJobConf.get(RATIO_TYPE));
+    } else {
+      return 0.50f;
+    }
+  }
+
+  private float getActualCompressionRatio(long compressBytes, 
+                                          long uncompessBytes) {
+    double ratio = ((double)compressBytes) / uncompessBytes; 
+    int significant = (int)Math.round(ratio * 100);
+    return ((float)significant)/100; 
+  }
+
   private String convertJobStatus(String jobStatus) {
     if (jobStatus.equals("SUCCEEDED")) { 
       return "SUCCESS";
diff --git a/src/contrib/gridmix/src/test/system/resources/compression_case1_trace.json.gz b/src/contrib/gridmix/src/test/system/resources/compression_case1_trace.json.gz
new file mode 100644
index 0000000..4e5615f
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/resources/compression_case1_trace.json.gz
Binary files differ
diff --git a/src/contrib/gridmix/src/test/system/resources/compression_case2_trace.json.gz b/src/contrib/gridmix/src/test/system/resources/compression_case2_trace.json.gz
new file mode 100644
index 0000000..faba98b
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/resources/compression_case2_trace.json.gz
Binary files differ
diff --git a/src/contrib/gridmix/src/test/system/resources/compression_case3_trace.json.gz b/src/contrib/gridmix/src/test/system/resources/compression_case3_trace.json.gz
new file mode 100644
index 0000000..5adbf43
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/resources/compression_case3_trace.json.gz
Binary files differ
diff --git a/src/contrib/gridmix/src/test/system/resources/compression_case4_trace.json.gz b/src/contrib/gridmix/src/test/system/resources/compression_case4_trace.json.gz
new file mode 100644
index 0000000..cdff79a
--- /dev/null
+++ b/src/contrib/gridmix/src/test/system/resources/compression_case4_trace.json.gz
Binary files differ