MAPREDUCE-2408. [Gridmix] Compression emulation in Gridmix. (amarrk)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1128162 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 0166f4a..2a74014 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@
 
   NEW FEATURES
 
+    MAPREDUCE-2408. [Gridmix] Compression emulation in Gridmix. (amarrk)
+
     MAPREDUCE-2473. Add "mapred groups" command to query the server-side groups
     resolved for a user. (Aaron T. Myers via todd)
 
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
index a7d4e81..9ba6e9a 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
@@ -42,6 +42,8 @@
   private final int keyLen;
   private long accBytes = 0L;
   private long accRecords = 0L;
+  private int unspilledBytes = 0;
+  private int minSpilledBytes = 0;
 
   /**
    * @param targetBytes Expected byte count.
@@ -50,6 +52,14 @@
    */
   public AvgRecordFactory(long targetBytes, long targetRecords,
       Configuration conf) {
+    this(targetBytes, targetRecords, conf, 0);
+  }
+  
+  /**
+   * @param minSpilledBytes Minimum amount of data expected per record
+   */
+  public AvgRecordFactory(long targetBytes, long targetRecords,
+      Configuration conf, int minSpilledBytes) {
     this.targetBytes = targetBytes;
     this.targetRecords = targetRecords <= 0 && this.targetBytes >= 0
       ? Math.max(1,
@@ -60,6 +70,7 @@
     avgrec = (int) Math.min(Integer.MAX_VALUE, tmp + 1);
     keyLen = Math.max(1,
         (int)(tmp * Math.min(1.0f, conf.getFloat(GRIDMIX_KEY_FRC, 0.1f))));
+    this.minSpilledBytes = minSpilledBytes;
   }
 
   @Override
@@ -69,14 +80,33 @@
     }
     final int reclen = accRecords++ >= step ? avgrec - 1 : avgrec;
     final int len = (int) Math.min(targetBytes - accBytes, reclen);
+    
+    unspilledBytes += len;
+    
     // len != reclen?
     if (key != null) {
-      key.setSize(keyLen);
-      val.setSize(len - key.getSize());
+      if (unspilledBytes < minSpilledBytes && accRecords < targetRecords) {
+        key.setSize(1);
+        val.setSize(1);
+        accBytes += key.getSize() + val.getSize();
+        unspilledBytes -= (key.getSize() + val.getSize());
+      } else {
+        key.setSize(keyLen);
+        val.setSize(unspilledBytes - key.getSize());
+        accBytes += unspilledBytes;
+        unspilledBytes = 0;
+      }
     } else {
-      val.setSize(len);
+      if (unspilledBytes < minSpilledBytes && accRecords < targetRecords) {
+        val.setSize(1);
+        accBytes += val.getSize();
+        unspilledBytes -= val.getSize();
+      } else {
+        val.setSize(unspilledBytes);
+        accBytes += unspilledBytes;
+        unspilledBytes = 0;
+      }
     }
-    accBytes += len;
     return true;
   }
 
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
new file mode 100644
index 0000000..aafb300
--- /dev/null
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
@@ -0,0 +1,571 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.mapred.gridmix.GenerateData.GenDataFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This is a utility class for all the compression related modules.
+ */
+class CompressionEmulationUtil {
+  static final Log LOG = LogFactory.getLog(CompressionEmulationUtil.class);
+  
+  /**
+   * Enable compression usage in GridMix runs.
+   */
+  private static final String COMPRESSION_EMULATION_ENABLE = 
+    "gridmix.compression-emulation.enable";
+  
+  /**
+   * Enable input data decompression.
+   */
+  private static final String INPUT_DECOMPRESSION_EMULATION_ENABLE = 
+    "gridmix.compression-emulation.input-decompression.enable";
+  
+  /**
+   * Configuration property for setting the compression ratio for map input 
+   * data.
+   */
+  private static final String GRIDMIX_MAP_INPUT_COMPRESSION_RATIO = 
+    "gridmix.compression-emulation.map-input.decompression-ratio";
+  
+  /**
+   * Configuration property for setting the compression ratio of map output.
+   */
+  private static final String GRIDMIX_MAP_OUTPUT_COMPRESSION_RATIO = 
+    "gridmix.compression-emulation.map-output.compression-ratio";
+  
+  /**
+   * Configuration property for setting the compression ratio of reduce output.
+   */
+  private static final String GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO = 
+    "gridmix.compression-emulation.reduce-output.compression-ratio";
+  
+  /**
+   * Default compression ratio.
+   */
+  static final float DEFAULT_COMPRESSION_RATIO = 0.5F;
+  
+  private static final CompressionRatioLookupTable COMPRESSION_LOOKUP_TABLE = 
+    new CompressionRatioLookupTable();
+  
+  /**
+   * This is a {@link Mapper} implementation for generating random text data.
+   * It uses {@link RandomTextDataGenerator} for generating text data and the
+   * output files are compressed.
+   */
+  public static class RandomTextDataMapper
+  extends Mapper<NullWritable, LongWritable, Text, Text> {
+    private RandomTextDataGenerator rtg;
+
+    @Override
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      int listSize = 
+        RandomTextDataGenerator.getRandomTextDataGeneratorListSize(conf);
+      int wordSize = 
+        RandomTextDataGenerator.getRandomTextDataGeneratorWordSize(conf);
+      rtg = new RandomTextDataGenerator(listSize, wordSize);
+    }
+    
+    /**
+     * Emits random words sequence of desired size. Note that the desired output
+     * size is passed as the value parameter to this map.
+     */
+    @Override
+    public void map(NullWritable key, LongWritable value, Context context)
+    throws IOException, InterruptedException {
+      //TODO Control the extra data written ..
+      //TODO Should the key\tvalue\n be considered for measuring size?
+      //     Can counters like BYTES_WRITTEN be used? What will be the value of
+      //     such counters in LocalJobRunner?
+      for (long bytes = value.get(); bytes > 0;) {
+        String randomKey = rtg.getRandomWord();
+        String randomValue = rtg.getRandomWord();
+        context.write(new Text(randomKey), new Text(randomValue));
+        bytes -= (randomValue.getBytes().length + randomKey.getBytes().length);
+      }
+    }
+  }
+  
+  /**
+   * Configure the {@link Job} for enabling compression emulation.
+   */
+  static void configure(final Job job) throws IOException, InterruptedException,
+                                              ClassNotFoundException {
+    // set the random text mapper
+    job.setMapperClass(RandomTextDataMapper.class);
+    job.setNumReduceTasks(0);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setInputFormatClass(GenDataFormat.class);
+    job.setJarByClass(GenerateData.class);
+
+    // set the output compression true
+    FileOutputFormat.setCompressOutput(job, true);
+    try {
+      FileInputFormat.addInputPath(job, new Path("ignored"));
+    } catch (IOException e) {
+      LOG.error("Error while adding input path ", e);
+    }
+  }
+
+  /**
+   * This is the lookup table for mapping compression ratio to the size of the 
+   * word in the {@link RandomTextDataGenerator}'s dictionary. 
+   * 
+   * Note that this table is computed (empirically) using a dictionary of 
+   * default length i.e {@value RandomTextDataGenerator#DEFAULT_LIST_SIZE}.
+   */
+  private static class CompressionRatioLookupTable {
+    private static Map<Float, Integer> map = new HashMap<Float, Integer>(60);
+    private static final float MIN_RATIO = 0.07F;
+    private static final float MAX_RATIO = 0.68F;
+    
+    // add the empirically obtained data points in the lookup table
+    CompressionRatioLookupTable() {
+      map.put(.07F,30);
+      map.put(.08F,25);
+      map.put(.09F,60);
+      map.put(.10F,20);
+      map.put(.11F,70);
+      map.put(.12F,15);
+      map.put(.13F,80);
+      map.put(.14F,85);
+      map.put(.15F,90);
+      map.put(.16F,95);
+      map.put(.17F,100);
+      map.put(.18F,105);
+      map.put(.19F,110);
+      map.put(.20F,115);
+      map.put(.21F,120);
+      map.put(.22F,125);
+      map.put(.23F,130);
+      map.put(.24F,140);
+      map.put(.25F,145);
+      map.put(.26F,150);
+      map.put(.27F,155);
+      map.put(.28F,160);
+      map.put(.29F,170);
+      map.put(.30F,175);
+      map.put(.31F,180);
+      map.put(.32F,190);
+      map.put(.33F,195);
+      map.put(.34F,205);
+      map.put(.35F,215);
+      map.put(.36F,225);
+      map.put(.37F,230);
+      map.put(.38F,240);
+      map.put(.39F,250);
+      map.put(.40F,260);
+      map.put(.41F,270);
+      map.put(.42F,280);
+      map.put(.43F,295);
+      map.put(.44F,310);
+      map.put(.45F,325);
+      map.put(.46F,335);
+      map.put(.47F,355);
+      map.put(.48F,375);
+      map.put(.49F,395);
+      map.put(.50F,420);
+      map.put(.51F,440);
+      map.put(.52F,465);
+      map.put(.53F,500);
+      map.put(.54F,525);
+      map.put(.55F,550);
+      map.put(.56F,600);
+      map.put(.57F,640);
+      map.put(.58F,680);
+      map.put(.59F,734);
+      map.put(.60F,813);
+      map.put(.61F,905);
+      map.put(.62F,1000);
+      map.put(.63F,1055);
+      map.put(.64F,1160);
+      map.put(.65F,1355);
+      map.put(.66F,1510);
+      map.put(.67F,1805);
+      map.put(.68F,2170);
+    }
+    
+    /**
+     * Returns the size of the word in {@link RandomTextDataGenerator}'s 
+     * dictionary that can generate text with the desired compression ratio.
+     * 
+     * @throws RuntimeException If ratio is less than {@value #MIN_RATIO} or 
+     *                          greater than {@value #MAX_RATIO}.
+     */
+    int getWordSizeForRatio(float ratio) {
+      ratio = standardizeCompressionRatio(ratio);
+      if (ratio >= MIN_RATIO && ratio <= MAX_RATIO) {
+        return map.get(ratio);
+      } else {
+        throw new RuntimeException("Compression ratio should be in the range [" 
+          + MIN_RATIO + "," + MAX_RATIO + "]. Configured compression ratio is " 
+          + ratio + ".");
+      }
+    }
+  }
+  
+  /**
+   * Setup the data generator's configuration to generate compressible random 
+   * text data with the desired compression ratio.
+   * Note that the compression ratio, if configured, will set the 
+   * {@link RandomTextDataGenerator}'s list-size and word-size based on 
+   * empirical values using the compression ratio set in the configuration. 
+   * 
+   * Hence to achieve the desired compression ratio, 
+   * {@link RandomTextDataGenerator}'s list-size will be set to the default 
+   * value i.e {@value RandomTextDataGenerator#DEFAULT_LIST_SIZE}.
+   */
+  static void setupDataGeneratorConfig(Configuration conf) {
+    boolean compress = isCompressionEmulationEnabled(conf);
+    if (compress) {
+      float ratio = getMapInputCompressionEmulationRatio(conf);
+      LOG.info("GridMix is configured to generate compressed input data with "
+               + " a compression ratio of " + ratio);
+      int wordSize = COMPRESSION_LOOKUP_TABLE.getWordSizeForRatio(ratio);
+      RandomTextDataGenerator.setRandomTextDataGeneratorWordSize(conf, 
+                                                                 wordSize);
+
+      // since the compression ratios are computed using the default value of 
+      // list size
+      RandomTextDataGenerator.setRandomTextDataGeneratorListSize(conf, 
+          RandomTextDataGenerator.DEFAULT_LIST_SIZE);
+    }
+  }
+  
+  /**
+   * Returns a {@link RandomTextDataGenerator} that generates random 
+   * compressible text with the desired compression ratio.
+   */
+  static RandomTextDataGenerator getRandomTextDataGenerator(float ratio, 
+                                                            long seed) {
+    int wordSize = COMPRESSION_LOOKUP_TABLE.getWordSizeForRatio(ratio);
+    RandomTextDataGenerator rtg = 
+      new RandomTextDataGenerator(RandomTextDataGenerator.DEFAULT_LIST_SIZE, 
+            seed, wordSize);
+    return rtg;
+  }
+  
+  /** Publishes compression related data statistics. Following statistics are
+   * published
+   * <ul>
+   *   <li>Total compressed input data size</li>
+   *   <li>Number of compressed input data files</li>
+   *   <li>Compression Ratio</li>
+   *   <li>Text data dictionary size</li>
+   *   <li>Random text word size</li>
+   * </ul>
+   */
+  static void publishCompressedDataStatistics(Path inputDir, Configuration conf,
+                                              long uncompressedDataSize) 
+  throws IOException {
+    FileSystem fs = inputDir.getFileSystem(conf);
+    CompressionCodecFactory compressionCodecs = 
+      new CompressionCodecFactory(conf);
+
+    // iterate over compressed files and sum up the compressed file sizes
+    long compressedDataSize = 0;
+    int numCompressedFiles = 0;
+    // obtain input data file statuses
+    FileStatus[] outFileStatuses = 
+      fs.listStatus(inputDir, new Utils.OutputFileUtils.OutputFilesFilter());
+    for (FileStatus status : outFileStatuses) {
+      // check if the input file is compressed
+      if (compressionCodecs != null) {
+        CompressionCodec codec = compressionCodecs.getCodec(status.getPath());
+        if (codec != null) {
+          ++numCompressedFiles;
+          compressedDataSize += status.getLen();
+        }
+      }
+    }
+
+    LOG.info("Gridmix is configured to use compressed input data.");
+    // publish the input data size
+    LOG.info("Total size of compressed input data : " 
+             + StringUtils.humanReadableInt(compressedDataSize));
+    LOG.info("Total number of compressed input data files : " 
+             + numCompressedFiles);
+
+    if (numCompressedFiles == 0) {
+      throw new RuntimeException("No compressed file found in the input" 
+          + " directory : " + inputDir.toString() + ". To enable compression"
+          + " emulation, run Gridmix either with "
+          + " an input directory containing compressed input file(s) or" 
+          + " use the -generate option to (re)generate it. If compression"
+          + " emulation is not desired, disable it by setting '" 
+          + COMPRESSION_EMULATION_ENABLE + "' to 'false'.");
+    }
+    
+    // publish compression ratio only if its generated in this gridmix run
+    if (uncompressedDataSize > 0) {
+      // compute the compression ratio
+      double ratio = ((double)compressedDataSize) / uncompressedDataSize;
+
+      // publish the compression ratio
+      LOG.info("Input Data Compression Ratio : " + ratio);
+    }
+  }
+  
+  /**
+   * Enables/Disables compression emulation.
+   * @param conf Target configuration where the parameter 
+   * {@value #COMPRESSION_EMULATION_ENABLE} will be set. 
+   * @param val The value to be set.
+   */
+  static void setCompressionEmulationEnabled(Configuration conf, boolean val) {
+    conf.setBoolean(COMPRESSION_EMULATION_ENABLE, val);
+  }
+  
+  /**
+   * Checks if compression emulation is enabled or not. Default is {@code true}.
+   */
+  static boolean isCompressionEmulationEnabled(Configuration conf) {
+    return conf.getBoolean(COMPRESSION_EMULATION_ENABLE, true);
+  }
+  
+  /**
+   * Enables/Disables input decompression emulation.
+   * @param conf Target configuration where the parameter 
+   * {@value #INPUT_DECOMPRESSION_EMULATION_ENABLE} will be set. 
+   * @param val The value to be set.
+   */
+  static void setInputCompressionEmulationEnabled(Configuration conf, 
+                                                  boolean val) {
+    conf.setBoolean(INPUT_DECOMPRESSION_EMULATION_ENABLE, val);
+  }
+  
+  /**
+   * Check if input decompression emulation is enabled or not. 
+   * Default is {@code false}.
+   */
+  static boolean isInputCompressionEmulationEnabled(Configuration conf) {
+    return conf.getBoolean(INPUT_DECOMPRESSION_EMULATION_ENABLE, false);
+  }
+  
+  /**
+   * Set the map input data compression ratio in the given conf.
+   */
+  static void setMapInputCompressionEmulationRatio(Configuration conf, 
+                                                   float ratio) {
+    conf.setFloat(GRIDMIX_MAP_INPUT_COMPRESSION_RATIO, ratio);
+  }
+  
+  /**
+   * Get the map input data compression ratio using the given configuration.
+   * If the compression ratio is not set in the configuration then use the 
+   * default value i.e {@value #DEFAULT_COMPRESSION_RATIO}.
+   */
+  static float getMapInputCompressionEmulationRatio(Configuration conf) {
+    return conf.getFloat(GRIDMIX_MAP_INPUT_COMPRESSION_RATIO, 
+                         DEFAULT_COMPRESSION_RATIO);
+  }
+  
+  /**
+   * Set the map output data compression ratio in the given configuration.
+   */
+  static void setMapOutputCompressionEmulationRatio(Configuration conf, 
+                                                    float ratio) {
+    conf.setFloat(GRIDMIX_MAP_OUTPUT_COMPRESSION_RATIO, ratio);
+  }
+  
+  /**
+   * Get the map output data compression ratio using the given configuration.
+   * If the compression ratio is not set in the configuration then use the 
+   * default value i.e {@value #DEFAULT_COMPRESSION_RATIO}.
+   */
+  static float getMapOutputCompressionEmulationRatio(Configuration conf) {
+    return conf.getFloat(GRIDMIX_MAP_OUTPUT_COMPRESSION_RATIO, 
+                         DEFAULT_COMPRESSION_RATIO);
+  }
+  
+  /**
+   * Set the reduce output data compression ratio in the given configuration.
+   */
+  static void setReduceOutputCompressionEmulationRatio(Configuration conf, 
+                                                       float ratio) {
+    conf.setFloat(GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO, ratio);
+  }
+  
+  /**
+   * Get the reduce output data compression ratio using the given configuration.
+   * If the compression ratio is not set in the configuration then use the 
+   * default value i.e {@value #DEFAULT_COMPRESSION_RATIO}.
+   */
+  static float getReduceOutputCompressionEmulationRatio(Configuration conf) {
+    return conf.getFloat(GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO, 
+                         DEFAULT_COMPRESSION_RATIO);
+  }
+  
+  /**
+   * Standardize the compression ratio i.e round off the compression ratio to
+   * only 2 significant digits.
+   */
+  static float standardizeCompressionRatio(float ratio) {
+    // round off to 2 significant digits
+    int significant = (int)Math.round(ratio * 100);
+    return ((float)significant)/100;
+  }
+  
+  /**
+   * Returns a {@link InputStream} for a file that might be compressed.
+   */
+  static InputStream getPossiblyDecompressedInputStream(Path file, 
+                                                        Configuration conf,
+                                                        long offset)
+  throws IOException {
+    FileSystem fs = file.getFileSystem(conf);
+    if (isCompressionEmulationEnabled(conf)
+        && isInputCompressionEmulationEnabled(conf)) {
+      CompressionCodecFactory compressionCodecs = 
+        new CompressionCodecFactory(conf);
+      CompressionCodec codec = compressionCodecs.getCodec(file);
+      if (codec != null) {
+        Decompressor decompressor = CodecPool.getDecompressor(codec);
+        if (decompressor != null) {
+          CompressionInputStream in = 
+            codec.createInputStream(fs.open(file), decompressor);
+          //TODO Seek doesnt work with compressed input stream. 
+          //     Use SplittableCompressionCodec?
+          return (InputStream)in;
+        }
+      }
+    }
+    FSDataInputStream in = fs.open(file);
+    in.seek(offset);
+    return (InputStream)in;
+  }
+  
+  /**
+   * Returns a {@link OutputStream} for a file that might need 
+   * compression.
+   */
+  static OutputStream getPossiblyCompressedOutputStream(Path file, 
+                                                        Configuration conf)
+  throws IOException {
+    FileSystem fs = file.getFileSystem(conf);
+    JobConf jConf = new JobConf(conf);
+    if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jConf)) {
+      // get the codec class
+      Class<? extends CompressionCodec> codecClass =
+        org.apache.hadoop.mapred.FileOutputFormat
+                                .getOutputCompressorClass(jConf, 
+                                                          GzipCodec.class);
+      // get the codec implementation
+      CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
+
+      // add the appropriate extension
+      file = file.suffix(codec.getDefaultExtension());
+
+      if (isCompressionEmulationEnabled(conf)) {
+        FSDataOutputStream fileOut = fs.create(file, false);
+        return new DataOutputStream(codec.createOutputStream(fileOut));
+      }
+    }
+    return fs.create(file, false);
+  }
+  
+  /**
+   * Extracts compression/decompression related configuration parameters from 
+   * the source configuration to the target configuration.
+   */
+  static void configureCompressionEmulation(Configuration source, 
+                                            Configuration target) {
+    // enable output compression
+    target.setBoolean(FileOutputFormat.COMPRESS, 
+        source.getBoolean(FileOutputFormat.COMPRESS, false));
+
+    // set the job output compression codec
+    String jobOutputCompressionCodec = 
+      source.get(FileOutputFormat.COMPRESS_CODEC);
+    if (jobOutputCompressionCodec != null) {
+      target.set(FileOutputFormat.COMPRESS_CODEC, jobOutputCompressionCodec);
+    }
+
+    // set the job output compression type
+    String jobOutputCompressionType = 
+      source.get(FileOutputFormat.COMPRESS_TYPE);
+    if (jobOutputCompressionType != null) {
+      target.set(FileOutputFormat.COMPRESS_TYPE, jobOutputCompressionType);
+    }
+
+    // enable map output compression
+    target.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS,
+        source.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false));
+
+    // set the map output compression codecs
+    String mapOutputCompressionCodec = 
+      source.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC);
+    if (mapOutputCompressionCodec != null) {
+      target.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, 
+                 mapOutputCompressionCodec);
+    }
+
+    // enable input decompression
+    //TODO replace with mapInputBytes and hdfsBytesRead
+    Path[] inputs = 
+      org.apache.hadoop.mapred.FileInputFormat
+         .getInputPaths(new JobConf(source));
+    boolean needsCompressedInput = false;
+    CompressionCodecFactory compressionCodecs = 
+      new CompressionCodecFactory(source);
+    for (Path input : inputs) {
+      CompressionCodec codec = compressionCodecs.getCodec(input);
+      if (codec != null) {
+        needsCompressedInput = true;
+      }
+    }
+    setInputCompressionEmulationEnabled(target, needsCompressedInput);
+  }
+}
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
index 0cd41ca..6a93428 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
@@ -458,7 +458,7 @@
 
     conf.setInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, fileCount);
     conf.setLong(GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, byteCount);
-    LOG.info("Number of HDFS based distributed cache files to be generated is"
+    LOG.info("Number of HDFS based distributed cache files to be generated is "
         + fileCount + ". Total size of HDFS based distributed cache files "
         + "to be generated is " + byteCount);
 
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
index 840f83c..2e4222c 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
@@ -21,8 +21,6 @@
 import java.io.InputStream;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
@@ -35,7 +33,7 @@
 
   private int idx = -1;
   private long curlen = -1L;
-  private FSDataInputStream input;
+  private InputStream input;
   private final byte[] z = new byte[1];
   private final Path[] paths;
   private final long[] lengths;
@@ -65,9 +63,9 @@
     idx = (idx + 1) % paths.length;
     curlen = lengths[idx];
     final Path file = paths[idx];
-    final FileSystem fs = file.getFileSystem(conf);
-    input = fs.open(file);
-    input.seek(startoffset[idx]);
+    input = 
+      CompressionEmulationUtil.getPossiblyDecompressedInputStream(file, 
+                                 conf, startoffset[idx]);
   }
 
   @Override
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
index 066cdf1..0f2ca04 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
@@ -31,7 +31,10 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -41,6 +44,7 @@
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Utils;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -52,6 +56,7 @@
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
 
 // TODO can replace with form of GridmixJob
 class GenerateData extends GridmixJob {
@@ -94,6 +99,43 @@
     FileOutputFormat.setOutputPath(job, outdir);
   }
 
+  /**
+   * Publish the data statistics.
+   */
+  static void publishDataStatistics(Path inputDir, long genBytes, 
+                                    Configuration conf) 
+  throws IOException {
+    if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)) {
+      CompressionEmulationUtil.publishCompressedDataStatistics(inputDir, 
+                                                               conf, genBytes);
+    } else {
+      publishPlainDataStatistics(conf, inputDir);
+    }
+  }
+  
+  static void publishPlainDataStatistics(Configuration conf, Path inputDir) 
+  throws IOException {
+    FileSystem fs = inputDir.getFileSystem(conf);
+
+    // obtain input data file statuses
+    long dataSize = 0;
+    long fileCount = 0;
+    RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inputDir, true);
+    PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter();
+    while (iter.hasNext()) {
+      LocatedFileStatus lStatus = iter.next();
+      if (filter.accept(lStatus.getPath())) {
+        dataSize += lStatus.getLen();
+        ++fileCount;
+      }
+    }
+
+    // publish the plain data statistics
+    LOG.info("Total size of input data : " 
+             + StringUtils.humanReadableInt(dataSize));
+    LOG.info("Total number of input data files : " + fileCount);
+  }
+  
   @Override
   public Job call() throws IOException, InterruptedException,
                            ClassNotFoundException {
@@ -101,6 +143,18 @@
     ugi.doAs( new PrivilegedExceptionAction <Job>() {
        public Job run() throws IOException, ClassNotFoundException,
                                InterruptedException {
+         // check if compression emulation is enabled
+         if (CompressionEmulationUtil
+             .isCompressionEmulationEnabled(job.getConfiguration())) {
+           CompressionEmulationUtil.configure(job);
+         } else {
+           configureRandomBytesDataGenerator();
+         }
+         job.submit();
+         return job;
+       }
+       
+       private void configureRandomBytesDataGenerator() {
         job.setMapperClass(GenDataMapper.class);
         job.setNumReduceTasks(0);
         job.setMapOutputKeyClass(NullWritable.class);
@@ -113,12 +167,15 @@
         } catch (IOException e) {
           LOG.error("Error while adding input path ", e);
         }
-        job.submit();
-        return job;
       }
     });
     return job;
   }
+  
+  @Override
+  protected boolean canEmulateCompression() {
+    return false;
+  }
 
   public static class GenDataMapper
       extends Mapper<NullWritable,LongWritable,NullWritable,BytesWritable> {
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
index f5acc9a..c90e17c 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
@@ -116,6 +116,11 @@
     return job;
   }
 
+  @Override
+  protected boolean canEmulateCompression() {
+    return false;
+  }
+
   public static class GenDCDataMapper
       extends Mapper<LongWritable, BytesWritable, NullWritable, BytesWritable> {
 
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
index b64a036..455ea16 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
@@ -117,15 +117,24 @@
   // Shutdown hook
   private final Shutdown sdh = new Shutdown();
 
+  // Get the input data directory for Gridmix. Input directory is 
+  // <io-path>/input
+  static Path getGridmixInputDataPath(Path ioPath) {
+    return new Path(ioPath, "input");
+  }
+  
   /**
-   * Write random bytes at the path &lt;ioPath&gt;/input/
+   * Write random bytes at the path &lt;inputDir&gt;.
    * @see org.apache.hadoop.mapred.gridmix.GenerateData
    */
-  protected void writeInputData(long genbytes, Path ioPath)
+  protected void writeInputData(long genbytes, Path inputDir)
       throws IOException, InterruptedException {
-    Path inputDir = new Path(ioPath, "input");
     final Configuration conf = getConf();
-    final GridmixJob genData = new GenerateData(conf, inputDir, genbytes);
+    
+    // configure the compression ratio if needed
+    CompressionEmulationUtil.setupDataGeneratorConfig(conf);
+    
+    final GenerateData genData = new GenerateData(conf, inputDir, genbytes);
     LOG.info("Generating " + StringUtils.humanReadableInt(genbytes) +
         " of test data...");
     launchGridmixJob(genData);
@@ -138,6 +147,8 @@
       LOG.error("Couldnt change the file permissions " , e);
       throw new IOException(e);
     }
+    
+    LOG.info("Input data generation successful.");
   }
 
   /**
@@ -209,7 +220,7 @@
       Path scratchDir, CountDownLatch startFlag, UserResolver userResolver)
       throws IOException {
     try {
-      Path inputDir = new Path(ioPath, "input");
+      Path inputDir = getGridmixInputDataPath(ioPath);
       GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
         conf, GridmixJobSubmissionPolicy.STRESS);
       LOG.info(" Submission policy is " + policy.name());
@@ -375,10 +386,17 @@
         // Create, start job submission threads
         startThreads(conf, traceIn, ioPath, scratchDir, startFlag,
                      userResolver);
+        
+        Path inputDir = getGridmixInputDataPath(ioPath);
+        
         // Write input data if specified
         if (genbytes > 0) {
-          writeInputData(genbytes, ioPath);
+          writeInputData(genbytes, inputDir);
         }
+        
+        // publish the data statistics
+        GenerateData.publishDataStatistics(inputDir, genbytes, conf);
+        
         // scan input dir contents
         submitter.refreshFilePool();
 
@@ -660,3 +678,4 @@
   }
 
 }
+
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
index 0ce5e76..81b7508 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Formatter;
 import java.util.List;
@@ -27,8 +28,6 @@
 import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
@@ -117,6 +116,16 @@
             setJobQueue(ret, conf.get(GRIDMIX_DEFAULT_QUEUE));
           }
 
+          // check if the job can emulate compression
+          if (canEmulateCompression()) {
+            // set the compression related configs if compression emulation is
+            // enabled
+            if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)) {
+              CompressionEmulationUtil.configureCompressionEmulation(
+                  jobdesc.getJobConf(), ret.getConfiguration());
+            }
+          }
+          
           return ret;
         }
       });
@@ -129,6 +138,11 @@
     outdir = new Path(outRoot, "" + seq);
   }
 
+  /**
+   * Indicates whether this {@link GridmixJob} supports compression emulation.
+   */
+  protected abstract boolean canEmulateCompression();
+  
   protected GridmixJob(final Configuration conf, long submissionMillis, 
                        final String name) throws IOException {
     submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
@@ -293,13 +307,18 @@
         TaskAttemptContext job) throws IOException {
 
       Path file = getDefaultWorkFile(job, "");
-      FileSystem fs = file.getFileSystem(job.getConfiguration());
-      final FSDataOutputStream fileOut = fs.create(file, false);
+      final DataOutputStream fileOut;
+
+      fileOut = 
+        new DataOutputStream(CompressionEmulationUtil
+            .getPossiblyCompressedOutputStream(file, job.getConfiguration()));
+
       return new RecordWriter<K,GridmixRecord>() {
         @Override
         public void write(K ignored, GridmixRecord value)
             throws IOException {
-          value.writeRandom(fileOut, value.getSize());
+          // Let the Gridmix record fill itself.
+          value.write(fileOut);
         }
         @Override
         public void close(TaskAttemptContext ctxt) throws IOException {
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
index c4409e0..481799f 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
@@ -28,6 +28,7 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 class GridmixRecord implements WritableComparable<GridmixRecord> {
 
@@ -39,6 +40,10 @@
   private final DataOutputBuffer dob =
     new DataOutputBuffer(Long.SIZE / Byte.SIZE);
   private byte[] literal = dob.getData();
+  private boolean compressible = false;
+  private float compressionRatio = 
+    CompressionEmulationUtil.DEFAULT_COMPRESSION_RATIO;
+  private RandomTextDataGenerator rtg = null;
 
   GridmixRecord() {
     this(1, 0L);
@@ -57,6 +62,19 @@
     setSizeInternal(size);
   }
 
+  void setCompressibility(boolean compressible, float ratio) {
+    this.compressible = compressible;
+    this.compressionRatio = ratio;
+    // Initialize the RandomTextDataGenerator once for every GridMix record
+    // Note that RandomTextDataGenerator is needed only when the GridMix record
+    // is configured to generate compressible text data.
+    if (compressible) {
+      rtg = 
+        CompressionEmulationUtil.getRandomTextDataGenerator(ratio, 
+                                   RandomTextDataGenerator.DEFAULT_SEED);
+    }
+  }
+  
   private void setSizeInternal(int size) {
     this.size = Math.max(1, size);
     try {
@@ -79,6 +97,39 @@
     return (x ^= (x << 17));
   }
 
+  /**
+   * Generate random text data that can be compressed. If the record is marked
+   * compressible (via {@link FileOutputFormat#COMPRESS}), only then the 
+   * random data will be text data else 
+   * {@link GridmixRecord#writeRandom(DataOutput, int)} will be invoked.
+   */
+  private void writeRandomText(DataOutput out, final int size) 
+  throws IOException {
+    long tmp = seed;
+    out.writeLong(tmp);
+    int i = size - (Long.SIZE / Byte.SIZE);
+    //TODO Should we use long for size. What if the data is more than 4G?
+    
+    String randomWord = rtg.getRandomWord();
+    byte[] bytes = randomWord.getBytes("UTF-8");
+    long randomWordSize = bytes.length;
+    while (i >= randomWordSize) {
+      out.write(bytes);
+      i -= randomWordSize;
+      
+      // get the next random word
+      randomWord = rtg.getRandomWord();
+      bytes = randomWord.getBytes("UTF-8");
+      // determine the random word size
+      randomWordSize = bytes.length;
+    }
+    
+    // pad the remaining bytes
+    if (i > 0) {
+      out.write(bytes, 0, i);
+    }
+  }
+  
   public void writeRandom(DataOutput out, final int size) throws IOException {
     long tmp = seed;
     out.writeLong(tmp);
@@ -120,8 +171,13 @@
     WritableUtils.writeVInt(out, size);
     final int payload = size - WritableUtils.getVIntSize(size);
     if (payload > Long.SIZE / Byte.SIZE) {
-      writeRandom(out, payload);
+      if (compressible) {
+        writeRandomText(out, payload);
+      } else {
+        writeRandom(out, payload);
+      }
     } else if (payload > 0) {
+      //TODO What is compressible is turned on? LOG is a bad idea!
       out.write(literal, 0, payload);
     }
   }
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
index 0576162..a9d404d 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
@@ -25,9 +25,12 @@
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 
 import org.apache.commons.logging.Log;
@@ -43,6 +46,7 @@
   long currentStart;
   FileStatus current;
   final List<FileStatus> files = new ArrayList<FileStatus>();
+  final Configuration conf = new Configuration();
 
   /**
    * @param inputDir Pool from which files are requested.
@@ -92,7 +96,15 @@
       }
       currentStart += fromFile;
       bytes -= fromFile;
-      if (current.getLen() - currentStart == 0) {
+      // Switch to a new file if
+      //  - the current file is uncompressed and completely used
+      //  - the current file is compressed
+      
+      CompressionCodecFactory compressionCodecs = 
+        new CompressionCodecFactory(conf);
+      CompressionCodec codec = compressionCodecs.getCodec(current.getPath());
+      if (current.getLen() - currentStart == 0
+          || codec != null) {
         current = files.get(++idx % files.size());
         currentStart = 0;
       }
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
index e0df06b..c090a5c 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
@@ -26,12 +26,12 @@
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.rumen.JobStory;
@@ -83,6 +83,11 @@
     return job;
   }
 
+  @Override
+  protected boolean canEmulateCompression() {
+    return true;
+  }
+  
   public static class LoadMapper
   extends Mapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> {
 
@@ -104,6 +109,20 @@
       final long[] reduceBytes = split.getOutputBytes();
       final long[] reduceRecords = split.getOutputRecords();
 
+      // enable gridmix map output record for compression
+      final boolean emulateMapOutputCompression = 
+        CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
+        && conf.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false);
+      float compressionRatio = 1.0f;
+      if (emulateMapOutputCompression) {
+        compressionRatio = 
+          CompressionEmulationUtil.getMapOutputCompressionEmulationRatio(conf);
+        LOG.info("GridMix is configured to use a compression ratio of " 
+                 + compressionRatio + " for the map output data.");
+        key.setCompressibility(true, compressionRatio);
+        val.setCompressibility(true, compressionRatio);
+      }
+      
       long totalRecords = 0L;
       final int nReduces = ctxt.getNumReduceTasks();
       if (nReduces > 0) {
@@ -117,14 +136,26 @@
             ++idx;
             id += maps;
           }
+          
+          // set the map output bytes such that the final reduce input bytes 
+          // match the expected value obtained from the original job
+          long mapOutputBytes = reduceBytes[i];
+          if (emulateMapOutputCompression) {
+            mapOutputBytes /= compressionRatio;
+          }
           reduces.add(new IntermediateRecordFactory(
-              new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf),
+              new AvgRecordFactory(mapOutputBytes, reduceRecords[i], conf, 
+                                   5*1024),
               i, reduceRecords[i], spec, conf));
           totalRecords += reduceRecords[i];
         }
       } else {
-        reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0],
-                                         conf));
+        long mapOutputBytes = reduceBytes[0];
+        if (emulateMapOutputCompression) {
+          mapOutputBytes /= compressionRatio;
+        }
+        reduces.add(new AvgRecordFactory(mapOutputBytes, reduceRecords[0],
+                                         conf, 5*1024));
         totalRecords = reduceRecords[0];
       }
       final long splitRecords = split.getInputRecords();
@@ -199,8 +230,26 @@
         LOG.info("Spec output bytes w/o records. Using input record count");
         outRecords = inRecords;
       }
+      
+      // enable gridmix reduce output record for compression
+      Configuration conf = context.getConfiguration();
+      if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
+          && FileOutputFormat.getCompressOutput(context)) {
+        float compressionRatio = 
+          CompressionEmulationUtil
+            .getReduceOutputCompressionEmulationRatio(conf);
+        LOG.info("GridMix is configured to use a compression ratio of " 
+                 + compressionRatio + " for the reduce output data.");
+        val.setCompressibility(true, compressionRatio);
+        
+        // Set the actual output data size to make sure that the actual output 
+        // data size is same after compression
+        outBytes /= compressionRatio;
+      }
+      
       factory =
-        new AvgRecordFactory(outBytes, outRecords, context.getConfiguration());
+        new AvgRecordFactory(outBytes, outRecords, 
+                             context.getConfiguration(), 5*1024);
       ratio = outRecords / (1.0 * inRecords);
       acc = 0.0;
     }
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomTextDataGenerator.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomTextDataGenerator.java
new file mode 100644
index 0000000..877d434
--- /dev/null
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomTextDataGenerator.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A random text generator. The words are simply sequences of alphabets.
+ */
+class RandomTextDataGenerator {
+  static final Log LOG = LogFactory.getLog(RandomTextDataGenerator.class);
+  
+  /**
+   * Configuration key for random text data generator's list size.
+   */
+  static final String GRIDMIX_DATAGEN_RANDOMTEXT_LISTSIZE = 
+    "gridmix.datagenerator.randomtext.listsize";
+  
+  /**
+   * Configuration key for random text data generator's word size.
+   */
+  static final String GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE = 
+    "gridmix.datagenerator.randomtext.wordsize";
+  
+  /**
+   * Default random text data generator's list size.
+   */
+  static final int DEFAULT_LIST_SIZE = 200;
+  
+  /**
+   * Default random text data generator's word size.
+   */
+  static final int DEFAULT_WORD_SIZE = 10;
+  
+  /**
+   * Default random text data generator's seed.
+   */
+  static final long DEFAULT_SEED = 0L;
+  
+  /**
+   * A list of random words
+   */
+  private String[] words;
+  private Random random;
+  
+  /**
+   * Constructor for {@link RandomTextDataGenerator} with default seed.
+   * @param size the total number of words to consider.
+   * @param wordSize Size of each word
+   */
+  RandomTextDataGenerator(int size, int wordSize) {
+    this(size, DEFAULT_SEED , wordSize);
+  }
+  
+  /**
+   * Constructor for {@link RandomTextDataGenerator}.
+   * @param size the total number of words to consider.
+   * @param seed Random number generator seed for repeatability
+   * @param wordSize Size of each word
+   */
+  RandomTextDataGenerator(int size, Long seed, int wordSize) {
+    random = new Random(seed);
+    words = new String[size];
+    
+    //TODO change the default with the actual stats
+    //TODO do u need varied sized words?
+    for (int i = 0; i < size; ++i) {
+      words[i] = 
+        RandomStringUtils.random(wordSize, 0, 0, true, false, null, random);
+    }
+  }
+  
+  /**
+   * Get the configured random text data generator's list size.
+   */
+  static int getRandomTextDataGeneratorListSize(Configuration conf) {
+    return conf.getInt(GRIDMIX_DATAGEN_RANDOMTEXT_LISTSIZE, DEFAULT_LIST_SIZE);
+  }
+  
+  /**
+   * Set the random text data generator's list size.
+   */
+  static void setRandomTextDataGeneratorListSize(Configuration conf, 
+                                                 int listSize) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Random text data generator is configured to use a dictionary " 
+                + " with " + listSize + " words");
+    }
+    conf.setInt(GRIDMIX_DATAGEN_RANDOMTEXT_LISTSIZE, listSize);
+  }
+  
+  /**
+   * Get the configured random text data generator word size.
+   */
+  static int getRandomTextDataGeneratorWordSize(Configuration conf) {
+    return conf.getInt(GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE, DEFAULT_WORD_SIZE);
+  }
+  
+  /**
+   * Set the random text data generator word size.
+   */
+  static void setRandomTextDataGeneratorWordSize(Configuration conf, 
+                                                 int wordSize) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Random text data generator is configured to use a dictionary " 
+                + " with words of length " + wordSize);
+    }
+    conf.setInt(GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE, wordSize);
+  }
+  
+  /**
+   * Returns a randomly selected word from a list of random words.
+   */
+  String getRandomWord() {
+    int index = random.nextInt(words.length);
+    return words[index];
+  }
+  
+  /**
+   * This is mainly for testing.
+   */
+  List<String> getRandomWords() {
+    return Arrays.asList(words);
+  }
+}
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
index 15a772e..a9f2999 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
@@ -94,6 +94,11 @@
   }
 
   @Override
+  protected boolean canEmulateCompression() {
+    return false;
+  }
+  
+  @Override
   public Job call()
     throws IOException, InterruptedException, ClassNotFoundException {
     ugi.doAs(
diff --git a/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java
new file mode 100644
index 0000000..d1c1b98
--- /dev/null
+++ b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java
@@ -0,0 +1,562 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.mapred.gridmix.CompressionEmulationUtil.RandomTextDataMapper;
+import org.apache.hadoop.mapred.gridmix.GenerateData.GenSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * Test {@link CompressionEmulationUtil}
+ */
+public class TestCompressionEmulationUtils {
+  //TODO Remove this once LocalJobRunner can run Gridmix.
+  static class CustomInputFormat extends GenerateData.GenDataFormat {
+    @Override
+    public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+      // get the total data to be generated
+      long toGen =
+        jobCtxt.getConfiguration().getLong(GenerateData.GRIDMIX_GEN_BYTES, -1);
+      if (toGen < 0) {
+        throw new IOException("Invalid/missing generation bytes: " + toGen);
+      }
+      // get the total number of mappers configured
+      int totalMappersConfigured =
+        jobCtxt.getConfiguration().getInt(MRJobConfig.NUM_MAPS, -1);
+      if (totalMappersConfigured < 0) {
+        throw new IOException("Invalid/missing num mappers: " 
+                              + totalMappersConfigured);
+      }
+      
+      final long bytesPerTracker = toGen / totalMappersConfigured;
+      final ArrayList<InputSplit> splits = 
+        new ArrayList<InputSplit>(totalMappersConfigured);
+      for (int i = 0; i < totalMappersConfigured; ++i) {
+        splits.add(new GenSplit(bytesPerTracker, 
+                   new String[] { "tracker_local" }));
+      }
+      return splits;
+    }
+  }
+  
+  /**
+   * Test {@link RandomTextDataMapper} via {@link CompressionEmulationUtil}.
+   */
+  @Test
+  public void testRandomCompressedTextDataGenerator() throws Exception {
+    int wordSize = 10;
+    int listSize = 20;
+    long dataSize = 10*1024*1024;
+    
+    Configuration conf = new Configuration();
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+    
+    // configure the RandomTextDataGenerator to generate desired sized data
+    conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_LISTSIZE, 
+                listSize);
+    conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE, 
+                wordSize);
+    conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize);
+    
+    FileSystem lfs = FileSystem.getLocal(conf);
+    
+    // define the test's root temp directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    Path tempDir = new Path(rootTempDir, "TestRandomCompressedTextDataGenr");
+    lfs.delete(tempDir, true);
+    
+    runDataGenJob(conf, tempDir);
+    
+    // validate the output data
+    FileStatus[] files = 
+      lfs.listStatus(tempDir, new Utils.OutputFileUtils.OutputFilesFilter());
+    long size = 0;
+    long maxLineSize = 0;
+    
+    for (FileStatus status : files) {
+      InputStream in = 
+        CompressionEmulationUtil
+          .getPossiblyDecompressedInputStream(status.getPath(), conf, 0);
+      BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+      String line = reader.readLine();
+      if (line != null) {
+        long lineSize = line.getBytes().length;
+        if (lineSize > maxLineSize) {
+          maxLineSize = lineSize;
+        }
+        while (line != null) {
+          for (String word : line.split("\\s")) {
+            size += word.getBytes().length;
+          }
+          line = reader.readLine();
+        }
+      }
+      reader.close();
+    }
+
+    assertTrue(size >= dataSize);
+    assertTrue(size <= dataSize + maxLineSize);
+  }
+  
+  /**
+   * Runs a GridMix data-generation job.
+   */
+  private static void runDataGenJob(Configuration conf, Path tempDir) 
+  throws IOException, ClassNotFoundException, InterruptedException {
+    JobClient client = new JobClient(conf);
+    
+    // get the local job runner
+    conf.setInt(MRJobConfig.NUM_MAPS, 1);
+    
+    Job job = new Job(conf);
+    
+    CompressionEmulationUtil.configure(job);
+    job.setInputFormatClass(CustomInputFormat.class);
+    
+    // set the output path
+    FileOutputFormat.setOutputPath(job, tempDir);
+    
+    // submit and wait for completion
+    job.submit();
+    int ret = job.waitForCompletion(true) ? 0 : 1;
+
+    assertEquals("Job Failed", 0, ret);
+  }
+  
+  /**
+   * Test if {@link RandomTextDataGenerator} can generate random text data 
+   * with the desired compression ratio. This involves
+   *   - using {@link CompressionEmulationUtil} to configure the MR job for 
+   *     generating the random text data with the desired compression ratio
+   *   - running the MR job
+   *   - test {@link RandomTextDataGenerator}'s output and match the output size
+   *     (compressed) with the expected compression ratio.
+   */
+  private void testCompressionRatioConfigure(float ratio)
+  throws Exception {
+    long dataSize = 10*1024*1024;
+    
+    Configuration conf = new Configuration();
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+    
+    conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize);
+    
+    float expectedRatio = CompressionEmulationUtil.DEFAULT_COMPRESSION_RATIO;
+    if (ratio > 0) {
+      // set the compression ratio in the conf
+      CompressionEmulationUtil.setMapInputCompressionEmulationRatio(conf, ratio);
+      expectedRatio = 
+        CompressionEmulationUtil.standardizeCompressionRatio(ratio);
+    }
+    
+    // invoke the utility to map from ratio to word-size
+    CompressionEmulationUtil.setupDataGeneratorConfig(conf);
+    
+    FileSystem lfs = FileSystem.getLocal(conf);
+    
+    // define the test's root temp directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    Path tempDir = 
+      new Path(rootTempDir, "TestCustomRandomCompressedTextDataGenr");
+    lfs.delete(tempDir, true);
+    
+    runDataGenJob(conf, tempDir);
+    
+    // validate the output data
+    FileStatus[] files = 
+      lfs.listStatus(tempDir, new Utils.OutputFileUtils.OutputFilesFilter());
+    long size = 0;
+    
+    for (FileStatus status : files) {
+      size += status.getLen();
+    }
+
+    float compressionRatio = ((float)size)/dataSize;
+    float stdRatio = 
+      CompressionEmulationUtil.standardizeCompressionRatio(compressionRatio);
+    
+    assertEquals(expectedRatio, stdRatio, 0.0D);
+  }
+  
+  /**
+   * Test compression ratio with multiple compression ratios.
+   */
+  @Test
+  public void testCompressionRatios() throws Exception {
+    // test default compression ratio i.e 0.5
+    testCompressionRatioConfigure(0F);
+    // test for a sample compression ratio of 0.2
+    testCompressionRatioConfigure(0.2F);
+    // test for a sample compression ratio of 0.4
+    testCompressionRatioConfigure(0.4F);
+    // test for a sample compression ratio of 0.65
+    testCompressionRatioConfigure(0.65F);
+    // test for a compression ratio of 0.682 which should be standardized
+    // to round(0.682) i.e 0.68
+    testCompressionRatioConfigure(0.682F);
+    // test for a compression ratio of 0.567 which should be standardized
+    // to round(0.567) i.e 0.57
+    testCompressionRatioConfigure(0.567F);
+    
+    // test with a compression ratio of 0.01 which less than the min supported
+    // value of 0.07
+    boolean failed = false;
+    try {
+      testCompressionRatioConfigure(0.01F);
+    } catch (RuntimeException re) {
+      failed = true;
+    }
+    assertTrue("Compression ratio min value (0.07) check failed!", failed);
+    
+    // test with a compression ratio of 0.01 which less than the max supported
+    // value of 0.68
+    failed = false;
+    try {
+      testCompressionRatioConfigure(0.7F);
+    } catch (RuntimeException re) {
+      failed = true;
+    }
+    assertTrue("Compression ratio max value (0.68) check failed!", failed);
+  }
+  
+  /**
+   * Test compression ratio standardization.
+   */
+  @Test
+  public void testCompressionRatioStandardization() throws Exception {
+    assertEquals(0.55F, 
+        CompressionEmulationUtil.standardizeCompressionRatio(0.55F), 0.0D);
+    assertEquals(0.65F, 
+        CompressionEmulationUtil.standardizeCompressionRatio(0.652F), 0.0D);
+    assertEquals(0.78F, 
+        CompressionEmulationUtil.standardizeCompressionRatio(0.777F), 0.0D);
+    assertEquals(0.86F, 
+        CompressionEmulationUtil.standardizeCompressionRatio(0.855F), 0.0D);
+  }
+  
+  /**
+   * Test map input compression ratio configuration utilities.
+   */
+  @Test
+  public void testInputCompressionRatioConfiguration() throws Exception {
+    Configuration conf = new Configuration();
+    float ratio = 0.567F;
+    CompressionEmulationUtil.setMapInputCompressionEmulationRatio(conf, ratio);
+    assertEquals(ratio, 
+        CompressionEmulationUtil.getMapInputCompressionEmulationRatio(conf), 
+        0.0D);
+  }
+  
+  /**
+   * Test map output compression ratio configuration utilities.
+   */
+  @Test
+  public void testIntermediateCompressionRatioConfiguration() 
+  throws Exception {
+    Configuration conf = new Configuration();
+    float ratio = 0.567F;
+    CompressionEmulationUtil.setMapOutputCompressionEmulationRatio(conf, ratio);
+    assertEquals(ratio, 
+        CompressionEmulationUtil.getMapOutputCompressionEmulationRatio(conf), 
+        0.0D);
+  }
+  
+  /**
+   * Test reduce output compression ratio configuration utilities.
+   */
+  @Test
+  public void testOutputCompressionRatioConfiguration() throws Exception {
+    Configuration conf = new Configuration();
+    float ratio = 0.567F;
+    CompressionEmulationUtil.setReduceOutputCompressionEmulationRatio(conf, 
+                                                                      ratio);
+    assertEquals(ratio, 
+        CompressionEmulationUtil.getReduceOutputCompressionEmulationRatio(conf),
+        0.0D);
+  }
+  
+  /**
+   * Test compressible {@link GridmixRecord}.
+   */
+  @Test
+  public void testCompressibleGridmixRecord() throws IOException {
+    JobConf conf = new JobConf();
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+    
+    FileSystem lfs = FileSystem.getLocal(conf);
+    int dataSize = 1024 * 1024 * 10; // 10 MB
+    float ratio = 0.357F;
+    
+    // define the test's root temp directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    Path tempDir = new Path(rootTempDir, 
+                            "TestPossiblyCompressibleGridmixRecord");
+    lfs.delete(tempDir, true);
+    
+    // define a compressible GridmixRecord
+    GridmixRecord record = new GridmixRecord(dataSize, 0);
+    record.setCompressibility(true, ratio); // enable compression
+    
+    conf.setClass(FileOutputFormat.COMPRESS_CODEC, GzipCodec.class, 
+                  CompressionCodec.class);
+    org.apache.hadoop.mapred.FileOutputFormat.setCompressOutput(conf, true);
+    
+    // write the record to a file
+    Path recordFile = new Path(tempDir, "record");
+    OutputStream outStream = CompressionEmulationUtil
+                               .getPossiblyCompressedOutputStream(recordFile, 
+                                                                  conf);    
+    DataOutputStream out = new DataOutputStream(outStream);
+    record.write(out);
+    out.close();
+    outStream.close();
+    
+    // open the compressed stream for reading
+    Path actualRecordFile = recordFile.suffix(".gz");
+    InputStream in = 
+      CompressionEmulationUtil
+        .getPossiblyDecompressedInputStream(actualRecordFile, conf, 0);
+    
+    // get the compressed file size
+    long compressedFileSize = lfs.listStatus(actualRecordFile)[0].getLen();
+    
+    GridmixRecord recordRead = new GridmixRecord();
+    recordRead.readFields(new DataInputStream(in));
+    
+    assertEquals("Record size mismatch in a compressible GridmixRecord",
+                 dataSize, recordRead.getSize());
+    assertTrue("Failed to generate a compressible GridmixRecord",
+               recordRead.getSize() > compressedFileSize);
+    
+    // check if the record can generate data with the desired compression ratio
+    float seenRatio = ((float)compressedFileSize)/dataSize;
+    assertEquals(CompressionEmulationUtil.standardizeCompressionRatio(ratio), 
+        CompressionEmulationUtil.standardizeCompressionRatio(seenRatio), 1.0D);
+  }
+  
+  /**
+   * Test 
+   * {@link CompressionEmulationUtil#isCompressionEmulationEnabled(
+   *          org.apache.hadoop.conf.Configuration)}.
+   */
+  @Test
+  public void testIsCompressionEmulationEnabled() {
+    Configuration conf = new Configuration();
+    // Check default values
+    assertTrue(CompressionEmulationUtil.isCompressionEmulationEnabled(conf));
+    
+    // Check disabled
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false);
+    assertFalse(CompressionEmulationUtil.isCompressionEmulationEnabled(conf));
+    
+    // Check enabled
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    assertTrue(CompressionEmulationUtil.isCompressionEmulationEnabled(conf));
+  }
+  
+  /**
+   * Test 
+   * {@link CompressionEmulationUtil#getPossiblyDecompressedInputStream(Path, 
+   *                                   Configuration, long)}
+   *  and
+   *  {@link CompressionEmulationUtil#getPossiblyCompressedOutputStream(Path, 
+   *                                    Configuration)}.
+   */
+  @Test
+  public void testPossiblyCompressedDecompressedStreams() throws IOException {
+    JobConf conf = new JobConf();
+    FileSystem lfs = FileSystem.getLocal(conf);
+    String inputLine = "Hi Hello!";
+
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+    conf.setBoolean(FileOutputFormat.COMPRESS, true);
+    conf.setClass(FileOutputFormat.COMPRESS_CODEC, GzipCodec.class, 
+                  CompressionCodec.class);
+
+    // define the test's root temp directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    Path tempDir =
+      new Path(rootTempDir, "TestPossiblyCompressedDecompressedStreams");
+    lfs.delete(tempDir, true);
+
+    // create a compressed file
+    Path compressedFile = new Path(tempDir, "test");
+    OutputStream out = 
+      CompressionEmulationUtil.getPossiblyCompressedOutputStream(compressedFile, 
+                                                                 conf);
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+    writer.write(inputLine);
+    writer.close();
+    
+    // now read back the data from the compressed stream
+    compressedFile = compressedFile.suffix(".gz");
+    InputStream in = 
+      CompressionEmulationUtil
+        .getPossiblyDecompressedInputStream(compressedFile, conf, 0);
+    BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+    String readLine = reader.readLine();
+    assertEquals("Compression/Decompression error", inputLine, readLine);
+    reader.close();
+  }
+  
+  /**
+   * Test if 
+   * {@link CompressionEmulationUtil#configureCompressionEmulation(
+   *        org.apache.hadoop.mapred.JobConf, org.apache.hadoop.mapred.JobConf)}
+   *  can extract compression related configuration parameters.
+   */
+  @Test
+  public void testExtractCompressionConfigs() {
+    JobConf source = new JobConf();
+    JobConf target = new JobConf();
+    
+    // set the default values
+    source.setBoolean(FileOutputFormat.COMPRESS, false);
+    source.set(FileOutputFormat.COMPRESS_CODEC, "MyDefaultCodec");
+    source.set(FileOutputFormat.COMPRESS_TYPE, "MyDefaultType");
+    source.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false); 
+    source.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, "MyDefaultCodec2");
+    
+    CompressionEmulationUtil.configureCompressionEmulation(source, target);
+    
+    // check default values
+    assertFalse(target.getBoolean(FileOutputFormat.COMPRESS, true));
+    assertEquals("MyDefaultCodec", target.get(FileOutputFormat.COMPRESS_CODEC));
+    assertEquals("MyDefaultType", target.get(FileOutputFormat.COMPRESS_TYPE));
+    assertFalse(target.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true));
+    assertEquals("MyDefaultCodec2", 
+                 target.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC));
+    assertFalse(CompressionEmulationUtil
+                .isInputCompressionEmulationEnabled(target));
+    
+    // set new values
+    source.setBoolean(FileOutputFormat.COMPRESS, true);
+    source.set(FileOutputFormat.COMPRESS_CODEC, "MyCodec");
+    source.set(FileOutputFormat.COMPRESS_TYPE, "MyType");
+    source.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); 
+    source.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, "MyCodec2");
+    org.apache.hadoop.mapred.FileInputFormat.setInputPaths(source, "file.gz");
+    
+    target = new JobConf(); // reset
+    CompressionEmulationUtil.configureCompressionEmulation(source, target);
+    
+    // check new values
+    assertTrue(target.getBoolean(FileOutputFormat.COMPRESS, false));
+    assertEquals("MyCodec", target.get(FileOutputFormat.COMPRESS_CODEC));
+    assertEquals("MyType", target.get(FileOutputFormat.COMPRESS_TYPE));
+    assertTrue(target.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false));
+    assertEquals("MyCodec2", 
+                 target.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC));
+    assertTrue(CompressionEmulationUtil
+               .isInputCompressionEmulationEnabled(target));
+  }
+  
+  /**
+   * Test of {@link FileQueue} can identify compressed file and provide
+   * readers to extract uncompressed data only if input-compression is enabled.
+   */
+  @Test
+  public void testFileQueueDecompression() throws IOException {
+    JobConf conf = new JobConf();
+    FileSystem lfs = FileSystem.getLocal(conf);
+    String inputLine = "Hi Hello!";
+    
+    CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+    CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+    org.apache.hadoop.mapred.FileOutputFormat.setCompressOutput(conf, true);
+    org.apache.hadoop.mapred.FileOutputFormat.setOutputCompressorClass(conf, 
+                                                GzipCodec.class);
+
+    // define the test's root temp directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+            lfs.getUri(), lfs.getWorkingDirectory());
+
+    Path tempDir = new Path(rootTempDir, "TestFileQueueDecompression");
+    lfs.delete(tempDir, true);
+
+    // create a compressed file
+    Path compressedFile = new Path(tempDir, "test");
+    OutputStream out = 
+      CompressionEmulationUtil.getPossiblyCompressedOutputStream(compressedFile, 
+                                                                 conf);
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+    writer.write(inputLine);
+    writer.close();
+    
+    compressedFile = compressedFile.suffix(".gz");
+    // now read back the data from the compressed stream using FileQueue
+    long fileSize = lfs.listStatus(compressedFile)[0].getLen();
+    CombineFileSplit split = 
+      new CombineFileSplit(new Path[] {compressedFile}, new long[] {fileSize});
+    FileQueue queue = new FileQueue(split, conf);
+    byte[] bytes = new byte[inputLine.getBytes().length];
+    queue.read(bytes);
+    queue.close();
+    String readLine = new String(bytes);
+    assertEquals("Compression/Decompression error", inputLine, readLine);
+  }
+}
diff --git a/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomTextDataGenerator.java b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomTextDataGenerator.java
new file mode 100644
index 0000000..e302db5
--- /dev/null
+++ b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomTextDataGenerator.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.mapred.gridmix.RandomTextDataGenerator;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * Test {@link RandomTextDataGenerator}.
+ */
+public class TestRandomTextDataGenerator {
+  /**
+   * Test if {@link RandomTextDataGenerator} can generate random words of 
+   * desired size.
+   */
+  @Test
+  public void testRandomTextDataGenerator() {
+    RandomTextDataGenerator rtdg = new RandomTextDataGenerator(10, 0L, 5);
+    List<String> words = rtdg.getRandomWords();
+
+    // check the size
+    assertEquals("List size mismatch", 10, words.size());
+
+    // check the words
+    Set<String> wordsSet = new HashSet<String>(words);
+    assertEquals("List size mismatch due to duplicates", 10, wordsSet.size());
+
+    // check the word lengths
+    for (String word : wordsSet) {
+      assertEquals("Word size mismatch", 5, word.length());
+    }
+  }
+  
+  /**
+   * Test if {@link RandomTextDataGenerator} can generate same words given the
+   * same list-size, word-length and seed.
+   */
+  @Test
+  public void testRandomTextDataGeneratorRepeatability() {
+    RandomTextDataGenerator rtdg1 = new RandomTextDataGenerator(10, 0L, 5);
+    List<String> words1 = rtdg1.getRandomWords();
+
+    RandomTextDataGenerator rtdg2 = new RandomTextDataGenerator(10, 0L, 5);
+    List<String> words2 = rtdg2.getRandomWords();
+    
+    assertTrue("List mismatch", words1.equals(words2));
+  }
+  
+  /**
+   * Test if {@link RandomTextDataGenerator} can generate different words given 
+   * different seeds.
+   */
+  @Test
+  public void testRandomTextDataGeneratorUniqueness() {
+    RandomTextDataGenerator rtdg1 = new RandomTextDataGenerator(10, 1L, 5);
+    Set<String> words1 = new HashSet(rtdg1.getRandomWords());
+
+    RandomTextDataGenerator rtdg2 = new RandomTextDataGenerator(10, 0L, 5);
+    Set<String> words2 = new HashSet(rtdg2.getRandomWords());
+    
+    assertFalse("List size mismatch across lists", words1.equals(words2));
+  }
+}
diff --git a/src/docs/src/documentation/content/xdocs/gridmix.xml b/src/docs/src/documentation/content/xdocs/gridmix.xml
index 3c72713..fac3526 100644
--- a/src/docs/src/documentation/content/xdocs/gridmix.xml
+++ b/src/docs/src/documentation/content/xdocs/gridmix.xml
@@ -518,7 +518,7 @@
     </section>
 
   <section id="distributedcacheload">
-  <title>Emulation of Distributed Cache Load</title>
+  <title>Emulating Distributed Cache Load</title>
     <p>Gridmix emulates Distributed Cache load by default for LOADJOB type of
     jobs. This is done by precreating the needed Distributed Cache files for all
     the simulated jobs as part of a separate MapReduce job.</p>
@@ -570,6 +570,75 @@
       </table>
     </section>
 
+  <section id="compression-emulation">
+      <title>Emulating Compression/Decompression</title>
+      <p>MapReduce supports data compression and decompression. 
+         Input to a MapReduce job can be compressed. Similarly, output of Map
+         and Reduce tasks can also be compressed. Compression/Decompression 
+         emulation in GridMix is important because emulating 
+         compression/decompression will effect the CPU and Memory usage of the 
+         task. A task emulating compression/decompression will affect other 
+         tasks and daemons running on the same node.
+       </p>
+       <p>Compression emulation is enabled if 
+         <code>gridmix.compression-emulation.enable</code> is set to
+         <code>true</code>. By default compression emulation is enabled for 
+         jobs of type <em>LOADJOB</em>. With compression emulation enabled, 
+         GridMix will now generate compressed text data with a constant 
+         compression ratio. Hence a simulated GridMix job will now emulate 
+         compression/decompression using compressible text data (having a 
+         constant compression ratio), irrespective of the compression ratio 
+         observed in the actual job.
+      </p>
+      <p>A typical MapReduce Job deals with data compression/decompression in 
+         the following phases </p>
+      <ul>
+        <li><code>Job input data decompression: </code> GridMix generates 
+            compressible input data when compression emulation is enabled. 
+            Based on the original job's configuration, a simulated GridMix job 
+            will use a decompressor to read the compressed input data. 
+            Currently, GridMix uses
+            <code>mapreduce.input.fileinputformat.inputdir</code> to determine 
+            if the original job used compressed input data or 
+            not. If the original job input files are uncompressed then the 
+            simulated job will read the compressed input file with using a 
+            decompressor. 
+        </li>
+        <li><code>Intermediate data compression and decompression: </code>
+            If the original job has map output compression enabled then GridMix 
+            too will enable map output compression for the simulated job. 
+            Accordingly, the reducers will use a decompressor to read the map 
+            output data.
+        </li>
+        <li><code>Job output data compression: </code>
+            If the original job's output is compressed then GridMix 
+            too will enable job output compression for the simulated job. 
+        </li>
+      </ul>
+       
+      <p>The following configuration parameters affect compression emulation
+      </p>
+      <table>
+        <tr>
+          <th>Parameter</th>
+          <th>Description</th>
+        </tr>
+        <tr>
+          <td>gridmix.compression-emulation.enable</td>
+          <td>Enables compression emulation in simulated GridMix jobs. 
+              Default is true.</td>
+        </tr>
+      </table>
+      
+      <p>With compression emulation turned on, GridMix will generate compressed
+         input data. Hence the total size of the input 
+         data will be lesser than the expected size. Set 
+         <code>gridmix.min.file.size</code> to a smaller value (roughly 10% of
+         <code>gridmix.gen.bytes.per.file</code>) for enabling GridMix to 
+         correctly emulate compression.
+      </p>
+    </section>
+
     <section id="assumptions">
       <title>Simplifying Assumptions</title>
       <p>GridMix will be developed in stages, incorporating feedback and
@@ -587,9 +656,8 @@
 	sizes, namespace hierarchies, or any property of input, intermediate
 	or output data other than the bytes/records consumed and emitted from
 	a given task. This implies that some of the most heavily-used parts of
-	the system - the compression libraries, text processing, streaming,
-	etc. - cannot be meaningfully tested with the current
-	implementation.</li>
+	the system - text processing, streaming, etc. - cannot be meaningfully tested 
+	with the current implementation.</li>
 	<li><em>I/O Rates</em> - The rate at which records are
 	consumed/emitted is assumed to be limited only by the speed of the
 	reader/writer and constant throughout the task.</li>