MAPREDUCE-2408. [Gridmix] Compression emulation in Gridmix. (amarrk)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1128162 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 0166f4a..2a74014 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@
NEW FEATURES
+ MAPREDUCE-2408. [Gridmix] Compression emulation in Gridmix. (amarrk)
+
MAPREDUCE-2473. Add "mapred groups" command to query the server-side groups
resolved for a user. (Aaron T. Myers via todd)
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
index a7d4e81..9ba6e9a 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/AvgRecordFactory.java
@@ -42,6 +42,8 @@
private final int keyLen;
private long accBytes = 0L;
private long accRecords = 0L;
+ private int unspilledBytes = 0;
+ private int minSpilledBytes = 0;
/**
* @param targetBytes Expected byte count.
@@ -50,6 +52,14 @@
*/
public AvgRecordFactory(long targetBytes, long targetRecords,
Configuration conf) {
+ this(targetBytes, targetRecords, conf, 0);
+ }
+
+ /**
+ * @param minSpilledBytes Minimum amount of data expected per record
+ */
+ public AvgRecordFactory(long targetBytes, long targetRecords,
+ Configuration conf, int minSpilledBytes) {
this.targetBytes = targetBytes;
this.targetRecords = targetRecords <= 0 && this.targetBytes >= 0
? Math.max(1,
@@ -60,6 +70,7 @@
avgrec = (int) Math.min(Integer.MAX_VALUE, tmp + 1);
keyLen = Math.max(1,
(int)(tmp * Math.min(1.0f, conf.getFloat(GRIDMIX_KEY_FRC, 0.1f))));
+ this.minSpilledBytes = minSpilledBytes;
}
@Override
@@ -69,14 +80,33 @@
}
final int reclen = accRecords++ >= step ? avgrec - 1 : avgrec;
final int len = (int) Math.min(targetBytes - accBytes, reclen);
+
+ unspilledBytes += len;
+
// len != reclen?
if (key != null) {
- key.setSize(keyLen);
- val.setSize(len - key.getSize());
+ if (unspilledBytes < minSpilledBytes && accRecords < targetRecords) {
+ key.setSize(1);
+ val.setSize(1);
+ accBytes += key.getSize() + val.getSize();
+ unspilledBytes -= (key.getSize() + val.getSize());
+ } else {
+ key.setSize(keyLen);
+ val.setSize(unspilledBytes - key.getSize());
+ accBytes += unspilledBytes;
+ unspilledBytes = 0;
+ }
} else {
- val.setSize(len);
+ if (unspilledBytes < minSpilledBytes && accRecords < targetRecords) {
+ val.setSize(1);
+ accBytes += val.getSize();
+ unspilledBytes -= val.getSize();
+ } else {
+ val.setSize(unspilledBytes);
+ accBytes += unspilledBytes;
+ unspilledBytes = 0;
+ }
}
- accBytes += len;
return true;
}
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
new file mode 100644
index 0000000..aafb300
--- /dev/null
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/CompressionEmulationUtil.java
@@ -0,0 +1,571 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.mapred.gridmix.GenerateData.GenDataFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This is a utility class for all the compression related modules.
+ */
+class CompressionEmulationUtil {
+ static final Log LOG = LogFactory.getLog(CompressionEmulationUtil.class);
+
+ /**
+ * Enable compression usage in GridMix runs.
+ */
+ private static final String COMPRESSION_EMULATION_ENABLE =
+ "gridmix.compression-emulation.enable";
+
+ /**
+ * Enable input data decompression.
+ */
+ private static final String INPUT_DECOMPRESSION_EMULATION_ENABLE =
+ "gridmix.compression-emulation.input-decompression.enable";
+
+ /**
+ * Configuration property for setting the compression ratio for map input
+ * data.
+ */
+ private static final String GRIDMIX_MAP_INPUT_COMPRESSION_RATIO =
+ "gridmix.compression-emulation.map-input.decompression-ratio";
+
+ /**
+ * Configuration property for setting the compression ratio of map output.
+ */
+ private static final String GRIDMIX_MAP_OUTPUT_COMPRESSION_RATIO =
+ "gridmix.compression-emulation.map-output.compression-ratio";
+
+ /**
+ * Configuration property for setting the compression ratio of reduce output.
+ */
+ private static final String GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO =
+ "gridmix.compression-emulation.reduce-output.compression-ratio";
+
+ /**
+ * Default compression ratio.
+ */
+ static final float DEFAULT_COMPRESSION_RATIO = 0.5F;
+
+ private static final CompressionRatioLookupTable COMPRESSION_LOOKUP_TABLE =
+ new CompressionRatioLookupTable();
+
+ /**
+ * This is a {@link Mapper} implementation for generating random text data.
+ * It uses {@link RandomTextDataGenerator} for generating text data and the
+ * output files are compressed.
+ */
+ public static class RandomTextDataMapper
+ extends Mapper<NullWritable, LongWritable, Text, Text> {
+ private RandomTextDataGenerator rtg;
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ int listSize =
+ RandomTextDataGenerator.getRandomTextDataGeneratorListSize(conf);
+ int wordSize =
+ RandomTextDataGenerator.getRandomTextDataGeneratorWordSize(conf);
+ rtg = new RandomTextDataGenerator(listSize, wordSize);
+ }
+
+ /**
+ * Emits random words sequence of desired size. Note that the desired output
+ * size is passed as the value parameter to this map.
+ */
+ @Override
+ public void map(NullWritable key, LongWritable value, Context context)
+ throws IOException, InterruptedException {
+ //TODO Control the extra data written ..
+ //TODO Should the key\tvalue\n be considered for measuring size?
+ // Can counters like BYTES_WRITTEN be used? What will be the value of
+ // such counters in LocalJobRunner?
+ for (long bytes = value.get(); bytes > 0;) {
+ String randomKey = rtg.getRandomWord();
+ String randomValue = rtg.getRandomWord();
+ context.write(new Text(randomKey), new Text(randomValue));
+ bytes -= (randomValue.getBytes().length + randomKey.getBytes().length);
+ }
+ }
+ }
+
+ /**
+ * Configure the {@link Job} for enabling compression emulation.
+ */
+ static void configure(final Job job) throws IOException, InterruptedException,
+ ClassNotFoundException {
+ // set the random text mapper
+ job.setMapperClass(RandomTextDataMapper.class);
+ job.setNumReduceTasks(0);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setInputFormatClass(GenDataFormat.class);
+ job.setJarByClass(GenerateData.class);
+
+ // set the output compression true
+ FileOutputFormat.setCompressOutput(job, true);
+ try {
+ FileInputFormat.addInputPath(job, new Path("ignored"));
+ } catch (IOException e) {
+ LOG.error("Error while adding input path ", e);
+ }
+ }
+
+ /**
+ * This is the lookup table for mapping compression ratio to the size of the
+ * word in the {@link RandomTextDataGenerator}'s dictionary.
+ *
+ * Note that this table is computed (empirically) using a dictionary of
+ * default length i.e {@value RandomTextDataGenerator#DEFAULT_LIST_SIZE}.
+ */
+ private static class CompressionRatioLookupTable {
+ private static Map<Float, Integer> map = new HashMap<Float, Integer>(60);
+ private static final float MIN_RATIO = 0.07F;
+ private static final float MAX_RATIO = 0.68F;
+
+ // add the empirically obtained data points in the lookup table
+ CompressionRatioLookupTable() {
+ map.put(.07F,30);
+ map.put(.08F,25);
+ map.put(.09F,60);
+ map.put(.10F,20);
+ map.put(.11F,70);
+ map.put(.12F,15);
+ map.put(.13F,80);
+ map.put(.14F,85);
+ map.put(.15F,90);
+ map.put(.16F,95);
+ map.put(.17F,100);
+ map.put(.18F,105);
+ map.put(.19F,110);
+ map.put(.20F,115);
+ map.put(.21F,120);
+ map.put(.22F,125);
+ map.put(.23F,130);
+ map.put(.24F,140);
+ map.put(.25F,145);
+ map.put(.26F,150);
+ map.put(.27F,155);
+ map.put(.28F,160);
+ map.put(.29F,170);
+ map.put(.30F,175);
+ map.put(.31F,180);
+ map.put(.32F,190);
+ map.put(.33F,195);
+ map.put(.34F,205);
+ map.put(.35F,215);
+ map.put(.36F,225);
+ map.put(.37F,230);
+ map.put(.38F,240);
+ map.put(.39F,250);
+ map.put(.40F,260);
+ map.put(.41F,270);
+ map.put(.42F,280);
+ map.put(.43F,295);
+ map.put(.44F,310);
+ map.put(.45F,325);
+ map.put(.46F,335);
+ map.put(.47F,355);
+ map.put(.48F,375);
+ map.put(.49F,395);
+ map.put(.50F,420);
+ map.put(.51F,440);
+ map.put(.52F,465);
+ map.put(.53F,500);
+ map.put(.54F,525);
+ map.put(.55F,550);
+ map.put(.56F,600);
+ map.put(.57F,640);
+ map.put(.58F,680);
+ map.put(.59F,734);
+ map.put(.60F,813);
+ map.put(.61F,905);
+ map.put(.62F,1000);
+ map.put(.63F,1055);
+ map.put(.64F,1160);
+ map.put(.65F,1355);
+ map.put(.66F,1510);
+ map.put(.67F,1805);
+ map.put(.68F,2170);
+ }
+
+ /**
+ * Returns the size of the word in {@link RandomTextDataGenerator}'s
+ * dictionary that can generate text with the desired compression ratio.
+ *
+ * @throws RuntimeException If ratio is less than {@value #MIN_RATIO} or
+ * greater than {@value #MAX_RATIO}.
+ */
+ int getWordSizeForRatio(float ratio) {
+ ratio = standardizeCompressionRatio(ratio);
+ if (ratio >= MIN_RATIO && ratio <= MAX_RATIO) {
+ return map.get(ratio);
+ } else {
+ throw new RuntimeException("Compression ratio should be in the range ["
+ + MIN_RATIO + "," + MAX_RATIO + "]. Configured compression ratio is "
+ + ratio + ".");
+ }
+ }
+ }
+
+ /**
+ * Setup the data generator's configuration to generate compressible random
+ * text data with the desired compression ratio.
+ * Note that the compression ratio, if configured, will set the
+ * {@link RandomTextDataGenerator}'s list-size and word-size based on
+ * empirical values using the compression ratio set in the configuration.
+ *
+ * Hence to achieve the desired compression ratio,
+ * {@link RandomTextDataGenerator}'s list-size will be set to the default
+ * value i.e {@value RandomTextDataGenerator#DEFAULT_LIST_SIZE}.
+ */
+ static void setupDataGeneratorConfig(Configuration conf) {
+ boolean compress = isCompressionEmulationEnabled(conf);
+ if (compress) {
+ float ratio = getMapInputCompressionEmulationRatio(conf);
+ LOG.info("GridMix is configured to generate compressed input data with "
+ + " a compression ratio of " + ratio);
+ int wordSize = COMPRESSION_LOOKUP_TABLE.getWordSizeForRatio(ratio);
+ RandomTextDataGenerator.setRandomTextDataGeneratorWordSize(conf,
+ wordSize);
+
+ // since the compression ratios are computed using the default value of
+ // list size
+ RandomTextDataGenerator.setRandomTextDataGeneratorListSize(conf,
+ RandomTextDataGenerator.DEFAULT_LIST_SIZE);
+ }
+ }
+
+ /**
+ * Returns a {@link RandomTextDataGenerator} that generates random
+ * compressible text with the desired compression ratio.
+ */
+ static RandomTextDataGenerator getRandomTextDataGenerator(float ratio,
+ long seed) {
+ int wordSize = COMPRESSION_LOOKUP_TABLE.getWordSizeForRatio(ratio);
+ RandomTextDataGenerator rtg =
+ new RandomTextDataGenerator(RandomTextDataGenerator.DEFAULT_LIST_SIZE,
+ seed, wordSize);
+ return rtg;
+ }
+
+ /** Publishes compression related data statistics. Following statistics are
+ * published
+ * <ul>
+ * <li>Total compressed input data size</li>
+ * <li>Number of compressed input data files</li>
+ * <li>Compression Ratio</li>
+ * <li>Text data dictionary size</li>
+ * <li>Random text word size</li>
+ * </ul>
+ */
+ static void publishCompressedDataStatistics(Path inputDir, Configuration conf,
+ long uncompressedDataSize)
+ throws IOException {
+ FileSystem fs = inputDir.getFileSystem(conf);
+ CompressionCodecFactory compressionCodecs =
+ new CompressionCodecFactory(conf);
+
+ // iterate over compressed files and sum up the compressed file sizes
+ long compressedDataSize = 0;
+ int numCompressedFiles = 0;
+ // obtain input data file statuses
+ FileStatus[] outFileStatuses =
+ fs.listStatus(inputDir, new Utils.OutputFileUtils.OutputFilesFilter());
+ for (FileStatus status : outFileStatuses) {
+ // check if the input file is compressed
+ if (compressionCodecs != null) {
+ CompressionCodec codec = compressionCodecs.getCodec(status.getPath());
+ if (codec != null) {
+ ++numCompressedFiles;
+ compressedDataSize += status.getLen();
+ }
+ }
+ }
+
+ LOG.info("Gridmix is configured to use compressed input data.");
+ // publish the input data size
+ LOG.info("Total size of compressed input data : "
+ + StringUtils.humanReadableInt(compressedDataSize));
+ LOG.info("Total number of compressed input data files : "
+ + numCompressedFiles);
+
+ if (numCompressedFiles == 0) {
+ throw new RuntimeException("No compressed file found in the input"
+ + " directory : " + inputDir.toString() + ". To enable compression"
+ + " emulation, run Gridmix either with "
+ + " an input directory containing compressed input file(s) or"
+ + " use the -generate option to (re)generate it. If compression"
+ + " emulation is not desired, disable it by setting '"
+ + COMPRESSION_EMULATION_ENABLE + "' to 'false'.");
+ }
+
+ // publish compression ratio only if its generated in this gridmix run
+ if (uncompressedDataSize > 0) {
+ // compute the compression ratio
+ double ratio = ((double)compressedDataSize) / uncompressedDataSize;
+
+ // publish the compression ratio
+ LOG.info("Input Data Compression Ratio : " + ratio);
+ }
+ }
+
+ /**
+ * Enables/Disables compression emulation.
+ * @param conf Target configuration where the parameter
+ * {@value #COMPRESSION_EMULATION_ENABLE} will be set.
+ * @param val The value to be set.
+ */
+ static void setCompressionEmulationEnabled(Configuration conf, boolean val) {
+ conf.setBoolean(COMPRESSION_EMULATION_ENABLE, val);
+ }
+
+ /**
+ * Checks if compression emulation is enabled or not. Default is {@code true}.
+ */
+ static boolean isCompressionEmulationEnabled(Configuration conf) {
+ return conf.getBoolean(COMPRESSION_EMULATION_ENABLE, true);
+ }
+
+ /**
+ * Enables/Disables input decompression emulation.
+ * @param conf Target configuration where the parameter
+ * {@value #INPUT_DECOMPRESSION_EMULATION_ENABLE} will be set.
+ * @param val The value to be set.
+ */
+ static void setInputCompressionEmulationEnabled(Configuration conf,
+ boolean val) {
+ conf.setBoolean(INPUT_DECOMPRESSION_EMULATION_ENABLE, val);
+ }
+
+ /**
+ * Check if input decompression emulation is enabled or not.
+ * Default is {@code false}.
+ */
+ static boolean isInputCompressionEmulationEnabled(Configuration conf) {
+ return conf.getBoolean(INPUT_DECOMPRESSION_EMULATION_ENABLE, false);
+ }
+
+ /**
+ * Set the map input data compression ratio in the given conf.
+ */
+ static void setMapInputCompressionEmulationRatio(Configuration conf,
+ float ratio) {
+ conf.setFloat(GRIDMIX_MAP_INPUT_COMPRESSION_RATIO, ratio);
+ }
+
+ /**
+ * Get the map input data compression ratio using the given configuration.
+ * If the compression ratio is not set in the configuration then use the
+ * default value i.e {@value #DEFAULT_COMPRESSION_RATIO}.
+ */
+ static float getMapInputCompressionEmulationRatio(Configuration conf) {
+ return conf.getFloat(GRIDMIX_MAP_INPUT_COMPRESSION_RATIO,
+ DEFAULT_COMPRESSION_RATIO);
+ }
+
+ /**
+ * Set the map output data compression ratio in the given configuration.
+ */
+ static void setMapOutputCompressionEmulationRatio(Configuration conf,
+ float ratio) {
+ conf.setFloat(GRIDMIX_MAP_OUTPUT_COMPRESSION_RATIO, ratio);
+ }
+
+ /**
+ * Get the map output data compression ratio using the given configuration.
+ * If the compression ratio is not set in the configuration then use the
+ * default value i.e {@value #DEFAULT_COMPRESSION_RATIO}.
+ */
+ static float getMapOutputCompressionEmulationRatio(Configuration conf) {
+ return conf.getFloat(GRIDMIX_MAP_OUTPUT_COMPRESSION_RATIO,
+ DEFAULT_COMPRESSION_RATIO);
+ }
+
+ /**
+ * Set the reduce output data compression ratio in the given configuration.
+ */
+ static void setReduceOutputCompressionEmulationRatio(Configuration conf,
+ float ratio) {
+ conf.setFloat(GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO, ratio);
+ }
+
+ /**
+ * Get the reduce output data compression ratio using the given configuration.
+ * If the compression ratio is not set in the configuration then use the
+ * default value i.e {@value #DEFAULT_COMPRESSION_RATIO}.
+ */
+ static float getReduceOutputCompressionEmulationRatio(Configuration conf) {
+ return conf.getFloat(GRIDMIX_REDUCE_OUTPUT_COMPRESSION_RATIO,
+ DEFAULT_COMPRESSION_RATIO);
+ }
+
+ /**
+ * Standardize the compression ratio i.e round off the compression ratio to
+ * only 2 significant digits.
+ */
+ static float standardizeCompressionRatio(float ratio) {
+ // round off to 2 significant digits
+ int significant = (int)Math.round(ratio * 100);
+ return ((float)significant)/100;
+ }
+
+ /**
+ * Returns a {@link InputStream} for a file that might be compressed.
+ */
+ static InputStream getPossiblyDecompressedInputStream(Path file,
+ Configuration conf,
+ long offset)
+ throws IOException {
+ FileSystem fs = file.getFileSystem(conf);
+ if (isCompressionEmulationEnabled(conf)
+ && isInputCompressionEmulationEnabled(conf)) {
+ CompressionCodecFactory compressionCodecs =
+ new CompressionCodecFactory(conf);
+ CompressionCodec codec = compressionCodecs.getCodec(file);
+ if (codec != null) {
+ Decompressor decompressor = CodecPool.getDecompressor(codec);
+ if (decompressor != null) {
+ CompressionInputStream in =
+ codec.createInputStream(fs.open(file), decompressor);
+ //TODO Seek doesnt work with compressed input stream.
+ // Use SplittableCompressionCodec?
+ return (InputStream)in;
+ }
+ }
+ }
+ FSDataInputStream in = fs.open(file);
+ in.seek(offset);
+ return (InputStream)in;
+ }
+
+ /**
+ * Returns a {@link OutputStream} for a file that might need
+ * compression.
+ */
+ static OutputStream getPossiblyCompressedOutputStream(Path file,
+ Configuration conf)
+ throws IOException {
+ FileSystem fs = file.getFileSystem(conf);
+ JobConf jConf = new JobConf(conf);
+ if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jConf)) {
+ // get the codec class
+ Class<? extends CompressionCodec> codecClass =
+ org.apache.hadoop.mapred.FileOutputFormat
+ .getOutputCompressorClass(jConf,
+ GzipCodec.class);
+ // get the codec implementation
+ CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
+
+ // add the appropriate extension
+ file = file.suffix(codec.getDefaultExtension());
+
+ if (isCompressionEmulationEnabled(conf)) {
+ FSDataOutputStream fileOut = fs.create(file, false);
+ return new DataOutputStream(codec.createOutputStream(fileOut));
+ }
+ }
+ return fs.create(file, false);
+ }
+
+ /**
+ * Extracts compression/decompression related configuration parameters from
+ * the source configuration to the target configuration.
+ */
+ static void configureCompressionEmulation(Configuration source,
+ Configuration target) {
+ // enable output compression
+ target.setBoolean(FileOutputFormat.COMPRESS,
+ source.getBoolean(FileOutputFormat.COMPRESS, false));
+
+ // set the job output compression codec
+ String jobOutputCompressionCodec =
+ source.get(FileOutputFormat.COMPRESS_CODEC);
+ if (jobOutputCompressionCodec != null) {
+ target.set(FileOutputFormat.COMPRESS_CODEC, jobOutputCompressionCodec);
+ }
+
+ // set the job output compression type
+ String jobOutputCompressionType =
+ source.get(FileOutputFormat.COMPRESS_TYPE);
+ if (jobOutputCompressionType != null) {
+ target.set(FileOutputFormat.COMPRESS_TYPE, jobOutputCompressionType);
+ }
+
+ // enable map output compression
+ target.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS,
+ source.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false));
+
+ // set the map output compression codecs
+ String mapOutputCompressionCodec =
+ source.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC);
+ if (mapOutputCompressionCodec != null) {
+ target.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
+ mapOutputCompressionCodec);
+ }
+
+ // enable input decompression
+ //TODO replace with mapInputBytes and hdfsBytesRead
+ Path[] inputs =
+ org.apache.hadoop.mapred.FileInputFormat
+ .getInputPaths(new JobConf(source));
+ boolean needsCompressedInput = false;
+ CompressionCodecFactory compressionCodecs =
+ new CompressionCodecFactory(source);
+ for (Path input : inputs) {
+ CompressionCodec codec = compressionCodecs.getCodec(input);
+ if (codec != null) {
+ needsCompressedInput = true;
+ }
+ }
+ setInputCompressionEmulationEnabled(target, needsCompressedInput);
+ }
+}
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
index 0cd41ca..6a93428 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
@@ -458,7 +458,7 @@
conf.setInt(GenerateDistCacheData.GRIDMIX_DISTCACHE_FILE_COUNT, fileCount);
conf.setLong(GenerateDistCacheData.GRIDMIX_DISTCACHE_BYTE_COUNT, byteCount);
- LOG.info("Number of HDFS based distributed cache files to be generated is"
+ LOG.info("Number of HDFS based distributed cache files to be generated is "
+ fileCount + ". Total size of HDFS based distributed cache files "
+ "to be generated is " + byteCount);
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
index 840f83c..2e4222c 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/FileQueue.java
@@ -21,8 +21,6 @@
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
@@ -35,7 +33,7 @@
private int idx = -1;
private long curlen = -1L;
- private FSDataInputStream input;
+ private InputStream input;
private final byte[] z = new byte[1];
private final Path[] paths;
private final long[] lengths;
@@ -65,9 +63,9 @@
idx = (idx + 1) % paths.length;
curlen = lengths[idx];
final Path file = paths[idx];
- final FileSystem fs = file.getFileSystem(conf);
- input = fs.open(file);
- input.seek(startoffset[idx]);
+ input =
+ CompressionEmulationUtil.getPossiblyDecompressedInputStream(file,
+ conf, startoffset[idx]);
}
@Override
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
index 066cdf1..0f2ca04 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateData.java
@@ -31,7 +31,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
@@ -41,6 +44,7 @@
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Utils;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -52,6 +56,7 @@
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
// TODO can replace with form of GridmixJob
class GenerateData extends GridmixJob {
@@ -94,6 +99,43 @@
FileOutputFormat.setOutputPath(job, outdir);
}
+ /**
+ * Publish the data statistics.
+ */
+ static void publishDataStatistics(Path inputDir, long genBytes,
+ Configuration conf)
+ throws IOException {
+ if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)) {
+ CompressionEmulationUtil.publishCompressedDataStatistics(inputDir,
+ conf, genBytes);
+ } else {
+ publishPlainDataStatistics(conf, inputDir);
+ }
+ }
+
+ static void publishPlainDataStatistics(Configuration conf, Path inputDir)
+ throws IOException {
+ FileSystem fs = inputDir.getFileSystem(conf);
+
+ // obtain input data file statuses
+ long dataSize = 0;
+ long fileCount = 0;
+ RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inputDir, true);
+ PathFilter filter = new Utils.OutputFileUtils.OutputFilesFilter();
+ while (iter.hasNext()) {
+ LocatedFileStatus lStatus = iter.next();
+ if (filter.accept(lStatus.getPath())) {
+ dataSize += lStatus.getLen();
+ ++fileCount;
+ }
+ }
+
+ // publish the plain data statistics
+ LOG.info("Total size of input data : "
+ + StringUtils.humanReadableInt(dataSize));
+ LOG.info("Total number of input data files : " + fileCount);
+ }
+
@Override
public Job call() throws IOException, InterruptedException,
ClassNotFoundException {
@@ -101,6 +143,18 @@
ugi.doAs( new PrivilegedExceptionAction <Job>() {
public Job run() throws IOException, ClassNotFoundException,
InterruptedException {
+ // check if compression emulation is enabled
+ if (CompressionEmulationUtil
+ .isCompressionEmulationEnabled(job.getConfiguration())) {
+ CompressionEmulationUtil.configure(job);
+ } else {
+ configureRandomBytesDataGenerator();
+ }
+ job.submit();
+ return job;
+ }
+
+ private void configureRandomBytesDataGenerator() {
job.setMapperClass(GenDataMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(NullWritable.class);
@@ -113,12 +167,15 @@
} catch (IOException e) {
LOG.error("Error while adding input path ", e);
}
- job.submit();
- return job;
}
});
return job;
}
+
+ @Override
+ protected boolean canEmulateCompression() {
+ return false;
+ }
public static class GenDataMapper
extends Mapper<NullWritable,LongWritable,NullWritable,BytesWritable> {
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
index f5acc9a..c90e17c 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GenerateDistCacheData.java
@@ -116,6 +116,11 @@
return job;
}
+ @Override
+ protected boolean canEmulateCompression() {
+ return false;
+ }
+
public static class GenDCDataMapper
extends Mapper<LongWritable, BytesWritable, NullWritable, BytesWritable> {
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
index b64a036..455ea16 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
@@ -117,15 +117,24 @@
// Shutdown hook
private final Shutdown sdh = new Shutdown();
+ // Get the input data directory for Gridmix. Input directory is
+ // <io-path>/input
+ static Path getGridmixInputDataPath(Path ioPath) {
+ return new Path(ioPath, "input");
+ }
+
/**
- * Write random bytes at the path <ioPath>/input/
+ * Write random bytes at the path <inputDir>.
* @see org.apache.hadoop.mapred.gridmix.GenerateData
*/
- protected void writeInputData(long genbytes, Path ioPath)
+ protected void writeInputData(long genbytes, Path inputDir)
throws IOException, InterruptedException {
- Path inputDir = new Path(ioPath, "input");
final Configuration conf = getConf();
- final GridmixJob genData = new GenerateData(conf, inputDir, genbytes);
+
+ // configure the compression ratio if needed
+ CompressionEmulationUtil.setupDataGeneratorConfig(conf);
+
+ final GenerateData genData = new GenerateData(conf, inputDir, genbytes);
LOG.info("Generating " + StringUtils.humanReadableInt(genbytes) +
" of test data...");
launchGridmixJob(genData);
@@ -138,6 +147,8 @@
LOG.error("Couldnt change the file permissions " , e);
throw new IOException(e);
}
+
+ LOG.info("Input data generation successful.");
}
/**
@@ -209,7 +220,7 @@
Path scratchDir, CountDownLatch startFlag, UserResolver userResolver)
throws IOException {
try {
- Path inputDir = new Path(ioPath, "input");
+ Path inputDir = getGridmixInputDataPath(ioPath);
GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
conf, GridmixJobSubmissionPolicy.STRESS);
LOG.info(" Submission policy is " + policy.name());
@@ -375,10 +386,17 @@
// Create, start job submission threads
startThreads(conf, traceIn, ioPath, scratchDir, startFlag,
userResolver);
+
+ Path inputDir = getGridmixInputDataPath(ioPath);
+
// Write input data if specified
if (genbytes > 0) {
- writeInputData(genbytes, ioPath);
+ writeInputData(genbytes, inputDir);
}
+
+ // publish the data statistics
+ GenerateData.publishDataStatistics(inputDir, genbytes, conf);
+
// scan input dir contents
submitter.refreshFilePool();
@@ -660,3 +678,4 @@
}
}
+
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
index 0ce5e76..81b7508 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.mapred.gridmix;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Formatter;
import java.util.List;
@@ -27,8 +28,6 @@
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
@@ -117,6 +116,16 @@
setJobQueue(ret, conf.get(GRIDMIX_DEFAULT_QUEUE));
}
+ // check if the job can emulate compression
+ if (canEmulateCompression()) {
+ // set the compression related configs if compression emulation is
+ // enabled
+ if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)) {
+ CompressionEmulationUtil.configureCompressionEmulation(
+ jobdesc.getJobConf(), ret.getConfiguration());
+ }
+ }
+
return ret;
}
});
@@ -129,6 +138,11 @@
outdir = new Path(outRoot, "" + seq);
}
+ /**
+ * Indicates whether this {@link GridmixJob} supports compression emulation.
+ */
+ protected abstract boolean canEmulateCompression();
+
protected GridmixJob(final Configuration conf, long submissionMillis,
final String name) throws IOException {
submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
@@ -293,13 +307,18 @@
TaskAttemptContext job) throws IOException {
Path file = getDefaultWorkFile(job, "");
- FileSystem fs = file.getFileSystem(job.getConfiguration());
- final FSDataOutputStream fileOut = fs.create(file, false);
+ final DataOutputStream fileOut;
+
+ fileOut =
+ new DataOutputStream(CompressionEmulationUtil
+ .getPossiblyCompressedOutputStream(file, job.getConfiguration()));
+
return new RecordWriter<K,GridmixRecord>() {
@Override
public void write(K ignored, GridmixRecord value)
throws IOException {
- value.writeRandom(fileOut, value.getSize());
+ // Let the Gridmix record fill itself.
+ value.write(fileOut);
}
@Override
public void close(TaskAttemptContext ctxt) throws IOException {
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
index c4409e0..481799f 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixRecord.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
class GridmixRecord implements WritableComparable<GridmixRecord> {
@@ -39,6 +40,10 @@
private final DataOutputBuffer dob =
new DataOutputBuffer(Long.SIZE / Byte.SIZE);
private byte[] literal = dob.getData();
+ private boolean compressible = false;
+ private float compressionRatio =
+ CompressionEmulationUtil.DEFAULT_COMPRESSION_RATIO;
+ private RandomTextDataGenerator rtg = null;
GridmixRecord() {
this(1, 0L);
@@ -57,6 +62,19 @@
setSizeInternal(size);
}
+ void setCompressibility(boolean compressible, float ratio) {
+ this.compressible = compressible;
+ this.compressionRatio = ratio;
+ // Initialize the RandomTextDataGenerator once for every GridMix record
+ // Note that RandomTextDataGenerator is needed only when the GridMix record
+ // is configured to generate compressible text data.
+ if (compressible) {
+ rtg =
+ CompressionEmulationUtil.getRandomTextDataGenerator(ratio,
+ RandomTextDataGenerator.DEFAULT_SEED);
+ }
+ }
+
private void setSizeInternal(int size) {
this.size = Math.max(1, size);
try {
@@ -79,6 +97,39 @@
return (x ^= (x << 17));
}
+ /**
+ * Generate random text data that can be compressed. If the record is marked
+ * compressible (via {@link FileOutputFormat#COMPRESS}), only then the
+ * random data will be text data else
+ * {@link GridmixRecord#writeRandom(DataOutput, int)} will be invoked.
+ */
+ private void writeRandomText(DataOutput out, final int size)
+ throws IOException {
+ long tmp = seed;
+ out.writeLong(tmp);
+ int i = size - (Long.SIZE / Byte.SIZE);
+ //TODO Should we use long for size. What if the data is more than 4G?
+
+ String randomWord = rtg.getRandomWord();
+ byte[] bytes = randomWord.getBytes("UTF-8");
+ long randomWordSize = bytes.length;
+ while (i >= randomWordSize) {
+ out.write(bytes);
+ i -= randomWordSize;
+
+ // get the next random word
+ randomWord = rtg.getRandomWord();
+ bytes = randomWord.getBytes("UTF-8");
+ // determine the random word size
+ randomWordSize = bytes.length;
+ }
+
+ // pad the remaining bytes
+ if (i > 0) {
+ out.write(bytes, 0, i);
+ }
+ }
+
public void writeRandom(DataOutput out, final int size) throws IOException {
long tmp = seed;
out.writeLong(tmp);
@@ -120,8 +171,13 @@
WritableUtils.writeVInt(out, size);
final int payload = size - WritableUtils.getVIntSize(size);
if (payload > Long.SIZE / Byte.SIZE) {
- writeRandom(out, payload);
+ if (compressible) {
+ writeRandomText(out, payload);
+ } else {
+ writeRandom(out, payload);
+ }
} else if (payload > 0) {
+ //TODO What is compressible is turned on? LOG is a bad idea!
out.write(literal, 0, payload);
}
}
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
index 0576162..a9d404d 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/InputStriper.java
@@ -25,9 +25,12 @@
import java.util.List;
import java.util.Map.Entry;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.commons.logging.Log;
@@ -43,6 +46,7 @@
long currentStart;
FileStatus current;
final List<FileStatus> files = new ArrayList<FileStatus>();
+ final Configuration conf = new Configuration();
/**
* @param inputDir Pool from which files are requested.
@@ -92,7 +96,15 @@
}
currentStart += fromFile;
bytes -= fromFile;
- if (current.getLen() - currentStart == 0) {
+ // Switch to a new file if
+ // - the current file is uncompressed and completely used
+ // - the current file is compressed
+
+ CompressionCodecFactory compressionCodecs =
+ new CompressionCodecFactory(conf);
+ CompressionCodec codec = compressionCodecs.getCodec(current.getPath());
+ if (current.getLen() - currentStart == 0
+ || codec != null) {
current = files.get(++idx % files.size());
currentStart = 0;
}
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
index e0df06b..c090a5c 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/LoadJob.java
@@ -26,12 +26,12 @@
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
@@ -83,6 +83,11 @@
return job;
}
+ @Override
+ protected boolean canEmulateCompression() {
+ return true;
+ }
+
public static class LoadMapper
extends Mapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> {
@@ -104,6 +109,20 @@
final long[] reduceBytes = split.getOutputBytes();
final long[] reduceRecords = split.getOutputRecords();
+ // enable gridmix map output record for compression
+ final boolean emulateMapOutputCompression =
+ CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
+ && conf.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false);
+ float compressionRatio = 1.0f;
+ if (emulateMapOutputCompression) {
+ compressionRatio =
+ CompressionEmulationUtil.getMapOutputCompressionEmulationRatio(conf);
+ LOG.info("GridMix is configured to use a compression ratio of "
+ + compressionRatio + " for the map output data.");
+ key.setCompressibility(true, compressionRatio);
+ val.setCompressibility(true, compressionRatio);
+ }
+
long totalRecords = 0L;
final int nReduces = ctxt.getNumReduceTasks();
if (nReduces > 0) {
@@ -117,14 +136,26 @@
++idx;
id += maps;
}
+
+ // set the map output bytes such that the final reduce input bytes
+ // match the expected value obtained from the original job
+ long mapOutputBytes = reduceBytes[i];
+ if (emulateMapOutputCompression) {
+ mapOutputBytes /= compressionRatio;
+ }
reduces.add(new IntermediateRecordFactory(
- new AvgRecordFactory(reduceBytes[i], reduceRecords[i], conf),
+ new AvgRecordFactory(mapOutputBytes, reduceRecords[i], conf,
+ 5*1024),
i, reduceRecords[i], spec, conf));
totalRecords += reduceRecords[i];
}
} else {
- reduces.add(new AvgRecordFactory(reduceBytes[0], reduceRecords[0],
- conf));
+ long mapOutputBytes = reduceBytes[0];
+ if (emulateMapOutputCompression) {
+ mapOutputBytes /= compressionRatio;
+ }
+ reduces.add(new AvgRecordFactory(mapOutputBytes, reduceRecords[0],
+ conf, 5*1024));
totalRecords = reduceRecords[0];
}
final long splitRecords = split.getInputRecords();
@@ -199,8 +230,26 @@
LOG.info("Spec output bytes w/o records. Using input record count");
outRecords = inRecords;
}
+
+ // enable gridmix reduce output record for compression
+ Configuration conf = context.getConfiguration();
+ if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)
+ && FileOutputFormat.getCompressOutput(context)) {
+ float compressionRatio =
+ CompressionEmulationUtil
+ .getReduceOutputCompressionEmulationRatio(conf);
+ LOG.info("GridMix is configured to use a compression ratio of "
+ + compressionRatio + " for the reduce output data.");
+ val.setCompressibility(true, compressionRatio);
+
+ // Set the actual output data size to make sure that the actual output
+ // data size is same after compression
+ outBytes /= compressionRatio;
+ }
+
factory =
- new AvgRecordFactory(outBytes, outRecords, context.getConfiguration());
+ new AvgRecordFactory(outBytes, outRecords,
+ context.getConfiguration(), 5*1024);
ratio = outRecords / (1.0 * inRecords);
acc = 0.0;
}
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomTextDataGenerator.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomTextDataGenerator.java
new file mode 100644
index 0000000..877d434
--- /dev/null
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/RandomTextDataGenerator.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A random text generator. The words are simply sequences of alphabets.
+ */
+class RandomTextDataGenerator {
+ static final Log LOG = LogFactory.getLog(RandomTextDataGenerator.class);
+
+ /**
+ * Configuration key for random text data generator's list size.
+ */
+ static final String GRIDMIX_DATAGEN_RANDOMTEXT_LISTSIZE =
+ "gridmix.datagenerator.randomtext.listsize";
+
+ /**
+ * Configuration key for random text data generator's word size.
+ */
+ static final String GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE =
+ "gridmix.datagenerator.randomtext.wordsize";
+
+ /**
+ * Default random text data generator's list size.
+ */
+ static final int DEFAULT_LIST_SIZE = 200;
+
+ /**
+ * Default random text data generator's word size.
+ */
+ static final int DEFAULT_WORD_SIZE = 10;
+
+ /**
+ * Default random text data generator's seed.
+ */
+ static final long DEFAULT_SEED = 0L;
+
+ /**
+ * A list of random words
+ */
+ private String[] words;
+ private Random random;
+
+ /**
+ * Constructor for {@link RandomTextDataGenerator} with default seed.
+ * @param size the total number of words to consider.
+ * @param wordSize Size of each word
+ */
+ RandomTextDataGenerator(int size, int wordSize) {
+ this(size, DEFAULT_SEED , wordSize);
+ }
+
+ /**
+ * Constructor for {@link RandomTextDataGenerator}.
+ * @param size the total number of words to consider.
+ * @param seed Random number generator seed for repeatability
+ * @param wordSize Size of each word
+ */
+ RandomTextDataGenerator(int size, Long seed, int wordSize) {
+ random = new Random(seed);
+ words = new String[size];
+
+ //TODO change the default with the actual stats
+ //TODO do u need varied sized words?
+ for (int i = 0; i < size; ++i) {
+ words[i] =
+ RandomStringUtils.random(wordSize, 0, 0, true, false, null, random);
+ }
+ }
+
+ /**
+ * Get the configured random text data generator's list size.
+ */
+ static int getRandomTextDataGeneratorListSize(Configuration conf) {
+ return conf.getInt(GRIDMIX_DATAGEN_RANDOMTEXT_LISTSIZE, DEFAULT_LIST_SIZE);
+ }
+
+ /**
+ * Set the random text data generator's list size.
+ */
+ static void setRandomTextDataGeneratorListSize(Configuration conf,
+ int listSize) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Random text data generator is configured to use a dictionary "
+ + " with " + listSize + " words");
+ }
+ conf.setInt(GRIDMIX_DATAGEN_RANDOMTEXT_LISTSIZE, listSize);
+ }
+
+ /**
+ * Get the configured random text data generator word size.
+ */
+ static int getRandomTextDataGeneratorWordSize(Configuration conf) {
+ return conf.getInt(GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE, DEFAULT_WORD_SIZE);
+ }
+
+ /**
+ * Set the random text data generator word size.
+ */
+ static void setRandomTextDataGeneratorWordSize(Configuration conf,
+ int wordSize) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Random text data generator is configured to use a dictionary "
+ + " with words of length " + wordSize);
+ }
+ conf.setInt(GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE, wordSize);
+ }
+
+ /**
+ * Returns a randomly selected word from a list of random words.
+ */
+ String getRandomWord() {
+ int index = random.nextInt(words.length);
+ return words[index];
+ }
+
+ /**
+ * This is mainly for testing.
+ */
+ List<String> getRandomWords() {
+ return Arrays.asList(words);
+ }
+}
diff --git a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
index 15a772e..a9f2999 100644
--- a/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
+++ b/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/SleepJob.java
@@ -94,6 +94,11 @@
}
@Override
+ protected boolean canEmulateCompression() {
+ return false;
+ }
+
+ @Override
public Job call()
throws IOException, InterruptedException, ClassNotFoundException {
ugi.doAs(
diff --git a/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java
new file mode 100644
index 0000000..d1c1b98
--- /dev/null
+++ b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestCompressionEmulationUtils.java
@@ -0,0 +1,562 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Utils;
+import org.apache.hadoop.mapred.gridmix.CompressionEmulationUtil.RandomTextDataMapper;
+import org.apache.hadoop.mapred.gridmix.GenerateData.GenSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * Test {@link CompressionEmulationUtil}
+ */
+public class TestCompressionEmulationUtils {
+ //TODO Remove this once LocalJobRunner can run Gridmix.
+ static class CustomInputFormat extends GenerateData.GenDataFormat {
+ @Override
+ public List<InputSplit> getSplits(JobContext jobCtxt) throws IOException {
+ // get the total data to be generated
+ long toGen =
+ jobCtxt.getConfiguration().getLong(GenerateData.GRIDMIX_GEN_BYTES, -1);
+ if (toGen < 0) {
+ throw new IOException("Invalid/missing generation bytes: " + toGen);
+ }
+ // get the total number of mappers configured
+ int totalMappersConfigured =
+ jobCtxt.getConfiguration().getInt(MRJobConfig.NUM_MAPS, -1);
+ if (totalMappersConfigured < 0) {
+ throw new IOException("Invalid/missing num mappers: "
+ + totalMappersConfigured);
+ }
+
+ final long bytesPerTracker = toGen / totalMappersConfigured;
+ final ArrayList<InputSplit> splits =
+ new ArrayList<InputSplit>(totalMappersConfigured);
+ for (int i = 0; i < totalMappersConfigured; ++i) {
+ splits.add(new GenSplit(bytesPerTracker,
+ new String[] { "tracker_local" }));
+ }
+ return splits;
+ }
+ }
+
+ /**
+ * Test {@link RandomTextDataMapper} via {@link CompressionEmulationUtil}.
+ */
+ @Test
+ public void testRandomCompressedTextDataGenerator() throws Exception {
+ int wordSize = 10;
+ int listSize = 20;
+ long dataSize = 10*1024*1024;
+
+ Configuration conf = new Configuration();
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+ CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+
+ // configure the RandomTextDataGenerator to generate desired sized data
+ conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_LISTSIZE,
+ listSize);
+ conf.setInt(RandomTextDataGenerator.GRIDMIX_DATAGEN_RANDOMTEXT_WORDSIZE,
+ wordSize);
+ conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize);
+
+ FileSystem lfs = FileSystem.getLocal(conf);
+
+ // define the test's root temp directory
+ Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+
+ Path tempDir = new Path(rootTempDir, "TestRandomCompressedTextDataGenr");
+ lfs.delete(tempDir, true);
+
+ runDataGenJob(conf, tempDir);
+
+ // validate the output data
+ FileStatus[] files =
+ lfs.listStatus(tempDir, new Utils.OutputFileUtils.OutputFilesFilter());
+ long size = 0;
+ long maxLineSize = 0;
+
+ for (FileStatus status : files) {
+ InputStream in =
+ CompressionEmulationUtil
+ .getPossiblyDecompressedInputStream(status.getPath(), conf, 0);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ String line = reader.readLine();
+ if (line != null) {
+ long lineSize = line.getBytes().length;
+ if (lineSize > maxLineSize) {
+ maxLineSize = lineSize;
+ }
+ while (line != null) {
+ for (String word : line.split("\\s")) {
+ size += word.getBytes().length;
+ }
+ line = reader.readLine();
+ }
+ }
+ reader.close();
+ }
+
+ assertTrue(size >= dataSize);
+ assertTrue(size <= dataSize + maxLineSize);
+ }
+
+ /**
+ * Runs a GridMix data-generation job.
+ */
+ private static void runDataGenJob(Configuration conf, Path tempDir)
+ throws IOException, ClassNotFoundException, InterruptedException {
+ JobClient client = new JobClient(conf);
+
+ // get the local job runner
+ conf.setInt(MRJobConfig.NUM_MAPS, 1);
+
+ Job job = new Job(conf);
+
+ CompressionEmulationUtil.configure(job);
+ job.setInputFormatClass(CustomInputFormat.class);
+
+ // set the output path
+ FileOutputFormat.setOutputPath(job, tempDir);
+
+ // submit and wait for completion
+ job.submit();
+ int ret = job.waitForCompletion(true) ? 0 : 1;
+
+ assertEquals("Job Failed", 0, ret);
+ }
+
+ /**
+ * Test if {@link RandomTextDataGenerator} can generate random text data
+ * with the desired compression ratio. This involves
+ * - using {@link CompressionEmulationUtil} to configure the MR job for
+ * generating the random text data with the desired compression ratio
+ * - running the MR job
+ * - test {@link RandomTextDataGenerator}'s output and match the output size
+ * (compressed) with the expected compression ratio.
+ */
+ private void testCompressionRatioConfigure(float ratio)
+ throws Exception {
+ long dataSize = 10*1024*1024;
+
+ Configuration conf = new Configuration();
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+ CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+
+ conf.setLong(GenerateData.GRIDMIX_GEN_BYTES, dataSize);
+
+ float expectedRatio = CompressionEmulationUtil.DEFAULT_COMPRESSION_RATIO;
+ if (ratio > 0) {
+ // set the compression ratio in the conf
+ CompressionEmulationUtil.setMapInputCompressionEmulationRatio(conf, ratio);
+ expectedRatio =
+ CompressionEmulationUtil.standardizeCompressionRatio(ratio);
+ }
+
+ // invoke the utility to map from ratio to word-size
+ CompressionEmulationUtil.setupDataGeneratorConfig(conf);
+
+ FileSystem lfs = FileSystem.getLocal(conf);
+
+ // define the test's root temp directory
+ Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+
+ Path tempDir =
+ new Path(rootTempDir, "TestCustomRandomCompressedTextDataGenr");
+ lfs.delete(tempDir, true);
+
+ runDataGenJob(conf, tempDir);
+
+ // validate the output data
+ FileStatus[] files =
+ lfs.listStatus(tempDir, new Utils.OutputFileUtils.OutputFilesFilter());
+ long size = 0;
+
+ for (FileStatus status : files) {
+ size += status.getLen();
+ }
+
+ float compressionRatio = ((float)size)/dataSize;
+ float stdRatio =
+ CompressionEmulationUtil.standardizeCompressionRatio(compressionRatio);
+
+ assertEquals(expectedRatio, stdRatio, 0.0D);
+ }
+
+ /**
+ * Test compression ratio with multiple compression ratios.
+ */
+ @Test
+ public void testCompressionRatios() throws Exception {
+ // test default compression ratio i.e 0.5
+ testCompressionRatioConfigure(0F);
+ // test for a sample compression ratio of 0.2
+ testCompressionRatioConfigure(0.2F);
+ // test for a sample compression ratio of 0.4
+ testCompressionRatioConfigure(0.4F);
+ // test for a sample compression ratio of 0.65
+ testCompressionRatioConfigure(0.65F);
+ // test for a compression ratio of 0.682 which should be standardized
+ // to round(0.682) i.e 0.68
+ testCompressionRatioConfigure(0.682F);
+ // test for a compression ratio of 0.567 which should be standardized
+ // to round(0.567) i.e 0.57
+ testCompressionRatioConfigure(0.567F);
+
+ // test with a compression ratio of 0.01 which less than the min supported
+ // value of 0.07
+ boolean failed = false;
+ try {
+ testCompressionRatioConfigure(0.01F);
+ } catch (RuntimeException re) {
+ failed = true;
+ }
+ assertTrue("Compression ratio min value (0.07) check failed!", failed);
+
+ // test with a compression ratio of 0.01 which less than the max supported
+ // value of 0.68
+ failed = false;
+ try {
+ testCompressionRatioConfigure(0.7F);
+ } catch (RuntimeException re) {
+ failed = true;
+ }
+ assertTrue("Compression ratio max value (0.68) check failed!", failed);
+ }
+
+ /**
+ * Test compression ratio standardization.
+ */
+ @Test
+ public void testCompressionRatioStandardization() throws Exception {
+ assertEquals(0.55F,
+ CompressionEmulationUtil.standardizeCompressionRatio(0.55F), 0.0D);
+ assertEquals(0.65F,
+ CompressionEmulationUtil.standardizeCompressionRatio(0.652F), 0.0D);
+ assertEquals(0.78F,
+ CompressionEmulationUtil.standardizeCompressionRatio(0.777F), 0.0D);
+ assertEquals(0.86F,
+ CompressionEmulationUtil.standardizeCompressionRatio(0.855F), 0.0D);
+ }
+
+ /**
+ * Test map input compression ratio configuration utilities.
+ */
+ @Test
+ public void testInputCompressionRatioConfiguration() throws Exception {
+ Configuration conf = new Configuration();
+ float ratio = 0.567F;
+ CompressionEmulationUtil.setMapInputCompressionEmulationRatio(conf, ratio);
+ assertEquals(ratio,
+ CompressionEmulationUtil.getMapInputCompressionEmulationRatio(conf),
+ 0.0D);
+ }
+
+ /**
+ * Test map output compression ratio configuration utilities.
+ */
+ @Test
+ public void testIntermediateCompressionRatioConfiguration()
+ throws Exception {
+ Configuration conf = new Configuration();
+ float ratio = 0.567F;
+ CompressionEmulationUtil.setMapOutputCompressionEmulationRatio(conf, ratio);
+ assertEquals(ratio,
+ CompressionEmulationUtil.getMapOutputCompressionEmulationRatio(conf),
+ 0.0D);
+ }
+
+ /**
+ * Test reduce output compression ratio configuration utilities.
+ */
+ @Test
+ public void testOutputCompressionRatioConfiguration() throws Exception {
+ Configuration conf = new Configuration();
+ float ratio = 0.567F;
+ CompressionEmulationUtil.setReduceOutputCompressionEmulationRatio(conf,
+ ratio);
+ assertEquals(ratio,
+ CompressionEmulationUtil.getReduceOutputCompressionEmulationRatio(conf),
+ 0.0D);
+ }
+
+ /**
+ * Test compressible {@link GridmixRecord}.
+ */
+ @Test
+ public void testCompressibleGridmixRecord() throws IOException {
+ JobConf conf = new JobConf();
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+ CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+
+ FileSystem lfs = FileSystem.getLocal(conf);
+ int dataSize = 1024 * 1024 * 10; // 10 MB
+ float ratio = 0.357F;
+
+ // define the test's root temp directory
+ Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+
+ Path tempDir = new Path(rootTempDir,
+ "TestPossiblyCompressibleGridmixRecord");
+ lfs.delete(tempDir, true);
+
+ // define a compressible GridmixRecord
+ GridmixRecord record = new GridmixRecord(dataSize, 0);
+ record.setCompressibility(true, ratio); // enable compression
+
+ conf.setClass(FileOutputFormat.COMPRESS_CODEC, GzipCodec.class,
+ CompressionCodec.class);
+ org.apache.hadoop.mapred.FileOutputFormat.setCompressOutput(conf, true);
+
+ // write the record to a file
+ Path recordFile = new Path(tempDir, "record");
+ OutputStream outStream = CompressionEmulationUtil
+ .getPossiblyCompressedOutputStream(recordFile,
+ conf);
+ DataOutputStream out = new DataOutputStream(outStream);
+ record.write(out);
+ out.close();
+ outStream.close();
+
+ // open the compressed stream for reading
+ Path actualRecordFile = recordFile.suffix(".gz");
+ InputStream in =
+ CompressionEmulationUtil
+ .getPossiblyDecompressedInputStream(actualRecordFile, conf, 0);
+
+ // get the compressed file size
+ long compressedFileSize = lfs.listStatus(actualRecordFile)[0].getLen();
+
+ GridmixRecord recordRead = new GridmixRecord();
+ recordRead.readFields(new DataInputStream(in));
+
+ assertEquals("Record size mismatch in a compressible GridmixRecord",
+ dataSize, recordRead.getSize());
+ assertTrue("Failed to generate a compressible GridmixRecord",
+ recordRead.getSize() > compressedFileSize);
+
+ // check if the record can generate data with the desired compression ratio
+ float seenRatio = ((float)compressedFileSize)/dataSize;
+ assertEquals(CompressionEmulationUtil.standardizeCompressionRatio(ratio),
+ CompressionEmulationUtil.standardizeCompressionRatio(seenRatio), 1.0D);
+ }
+
+ /**
+ * Test
+ * {@link CompressionEmulationUtil#isCompressionEmulationEnabled(
+ * org.apache.hadoop.conf.Configuration)}.
+ */
+ @Test
+ public void testIsCompressionEmulationEnabled() {
+ Configuration conf = new Configuration();
+ // Check default values
+ assertTrue(CompressionEmulationUtil.isCompressionEmulationEnabled(conf));
+
+ // Check disabled
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, false);
+ assertFalse(CompressionEmulationUtil.isCompressionEmulationEnabled(conf));
+
+ // Check enabled
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+ assertTrue(CompressionEmulationUtil.isCompressionEmulationEnabled(conf));
+ }
+
+ /**
+ * Test
+ * {@link CompressionEmulationUtil#getPossiblyDecompressedInputStream(Path,
+ * Configuration, long)}
+ * and
+ * {@link CompressionEmulationUtil#getPossiblyCompressedOutputStream(Path,
+ * Configuration)}.
+ */
+ @Test
+ public void testPossiblyCompressedDecompressedStreams() throws IOException {
+ JobConf conf = new JobConf();
+ FileSystem lfs = FileSystem.getLocal(conf);
+ String inputLine = "Hi Hello!";
+
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+ CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+ conf.setBoolean(FileOutputFormat.COMPRESS, true);
+ conf.setClass(FileOutputFormat.COMPRESS_CODEC, GzipCodec.class,
+ CompressionCodec.class);
+
+ // define the test's root temp directory
+ Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+
+ Path tempDir =
+ new Path(rootTempDir, "TestPossiblyCompressedDecompressedStreams");
+ lfs.delete(tempDir, true);
+
+ // create a compressed file
+ Path compressedFile = new Path(tempDir, "test");
+ OutputStream out =
+ CompressionEmulationUtil.getPossiblyCompressedOutputStream(compressedFile,
+ conf);
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+ writer.write(inputLine);
+ writer.close();
+
+ // now read back the data from the compressed stream
+ compressedFile = compressedFile.suffix(".gz");
+ InputStream in =
+ CompressionEmulationUtil
+ .getPossiblyDecompressedInputStream(compressedFile, conf, 0);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ String readLine = reader.readLine();
+ assertEquals("Compression/Decompression error", inputLine, readLine);
+ reader.close();
+ }
+
+ /**
+ * Test if
+ * {@link CompressionEmulationUtil#configureCompressionEmulation(
+ * org.apache.hadoop.mapred.JobConf, org.apache.hadoop.mapred.JobConf)}
+ * can extract compression related configuration parameters.
+ */
+ @Test
+ public void testExtractCompressionConfigs() {
+ JobConf source = new JobConf();
+ JobConf target = new JobConf();
+
+ // set the default values
+ source.setBoolean(FileOutputFormat.COMPRESS, false);
+ source.set(FileOutputFormat.COMPRESS_CODEC, "MyDefaultCodec");
+ source.set(FileOutputFormat.COMPRESS_TYPE, "MyDefaultType");
+ source.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false);
+ source.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, "MyDefaultCodec2");
+
+ CompressionEmulationUtil.configureCompressionEmulation(source, target);
+
+ // check default values
+ assertFalse(target.getBoolean(FileOutputFormat.COMPRESS, true));
+ assertEquals("MyDefaultCodec", target.get(FileOutputFormat.COMPRESS_CODEC));
+ assertEquals("MyDefaultType", target.get(FileOutputFormat.COMPRESS_TYPE));
+ assertFalse(target.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true));
+ assertEquals("MyDefaultCodec2",
+ target.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC));
+ assertFalse(CompressionEmulationUtil
+ .isInputCompressionEmulationEnabled(target));
+
+ // set new values
+ source.setBoolean(FileOutputFormat.COMPRESS, true);
+ source.set(FileOutputFormat.COMPRESS_CODEC, "MyCodec");
+ source.set(FileOutputFormat.COMPRESS_TYPE, "MyType");
+ source.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
+ source.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, "MyCodec2");
+ org.apache.hadoop.mapred.FileInputFormat.setInputPaths(source, "file.gz");
+
+ target = new JobConf(); // reset
+ CompressionEmulationUtil.configureCompressionEmulation(source, target);
+
+ // check new values
+ assertTrue(target.getBoolean(FileOutputFormat.COMPRESS, false));
+ assertEquals("MyCodec", target.get(FileOutputFormat.COMPRESS_CODEC));
+ assertEquals("MyType", target.get(FileOutputFormat.COMPRESS_TYPE));
+ assertTrue(target.getBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, false));
+ assertEquals("MyCodec2",
+ target.get(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC));
+ assertTrue(CompressionEmulationUtil
+ .isInputCompressionEmulationEnabled(target));
+ }
+
+ /**
+ * Test of {@link FileQueue} can identify compressed file and provide
+ * readers to extract uncompressed data only if input-compression is enabled.
+ */
+ @Test
+ public void testFileQueueDecompression() throws IOException {
+ JobConf conf = new JobConf();
+ FileSystem lfs = FileSystem.getLocal(conf);
+ String inputLine = "Hi Hello!";
+
+ CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
+ CompressionEmulationUtil.setInputCompressionEmulationEnabled(conf, true);
+ org.apache.hadoop.mapred.FileOutputFormat.setCompressOutput(conf, true);
+ org.apache.hadoop.mapred.FileOutputFormat.setOutputCompressorClass(conf,
+ GzipCodec.class);
+
+ // define the test's root temp directory
+ Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+
+ Path tempDir = new Path(rootTempDir, "TestFileQueueDecompression");
+ lfs.delete(tempDir, true);
+
+ // create a compressed file
+ Path compressedFile = new Path(tempDir, "test");
+ OutputStream out =
+ CompressionEmulationUtil.getPossiblyCompressedOutputStream(compressedFile,
+ conf);
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+ writer.write(inputLine);
+ writer.close();
+
+ compressedFile = compressedFile.suffix(".gz");
+ // now read back the data from the compressed stream using FileQueue
+ long fileSize = lfs.listStatus(compressedFile)[0].getLen();
+ CombineFileSplit split =
+ new CombineFileSplit(new Path[] {compressedFile}, new long[] {fileSize});
+ FileQueue queue = new FileQueue(split, conf);
+ byte[] bytes = new byte[inputLine.getBytes().length];
+ queue.read(bytes);
+ queue.close();
+ String readLine = new String(bytes);
+ assertEquals("Compression/Decompression error", inputLine, readLine);
+ }
+}
diff --git a/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomTextDataGenerator.java b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomTextDataGenerator.java
new file mode 100644
index 0000000..e302db5
--- /dev/null
+++ b/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestRandomTextDataGenerator.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.mapred.gridmix.RandomTextDataGenerator;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * Test {@link RandomTextDataGenerator}.
+ */
+public class TestRandomTextDataGenerator {
+ /**
+ * Test if {@link RandomTextDataGenerator} can generate random words of
+ * desired size.
+ */
+ @Test
+ public void testRandomTextDataGenerator() {
+ RandomTextDataGenerator rtdg = new RandomTextDataGenerator(10, 0L, 5);
+ List<String> words = rtdg.getRandomWords();
+
+ // check the size
+ assertEquals("List size mismatch", 10, words.size());
+
+ // check the words
+ Set<String> wordsSet = new HashSet<String>(words);
+ assertEquals("List size mismatch due to duplicates", 10, wordsSet.size());
+
+ // check the word lengths
+ for (String word : wordsSet) {
+ assertEquals("Word size mismatch", 5, word.length());
+ }
+ }
+
+ /**
+ * Test if {@link RandomTextDataGenerator} can generate same words given the
+ * same list-size, word-length and seed.
+ */
+ @Test
+ public void testRandomTextDataGeneratorRepeatability() {
+ RandomTextDataGenerator rtdg1 = new RandomTextDataGenerator(10, 0L, 5);
+ List<String> words1 = rtdg1.getRandomWords();
+
+ RandomTextDataGenerator rtdg2 = new RandomTextDataGenerator(10, 0L, 5);
+ List<String> words2 = rtdg2.getRandomWords();
+
+ assertTrue("List mismatch", words1.equals(words2));
+ }
+
+ /**
+ * Test if {@link RandomTextDataGenerator} can generate different words given
+ * different seeds.
+ */
+ @Test
+ public void testRandomTextDataGeneratorUniqueness() {
+ RandomTextDataGenerator rtdg1 = new RandomTextDataGenerator(10, 1L, 5);
+ Set<String> words1 = new HashSet(rtdg1.getRandomWords());
+
+ RandomTextDataGenerator rtdg2 = new RandomTextDataGenerator(10, 0L, 5);
+ Set<String> words2 = new HashSet(rtdg2.getRandomWords());
+
+ assertFalse("List size mismatch across lists", words1.equals(words2));
+ }
+}
diff --git a/src/docs/src/documentation/content/xdocs/gridmix.xml b/src/docs/src/documentation/content/xdocs/gridmix.xml
index 3c72713..fac3526 100644
--- a/src/docs/src/documentation/content/xdocs/gridmix.xml
+++ b/src/docs/src/documentation/content/xdocs/gridmix.xml
@@ -518,7 +518,7 @@
</section>
<section id="distributedcacheload">
- <title>Emulation of Distributed Cache Load</title>
+ <title>Emulating Distributed Cache Load</title>
<p>Gridmix emulates Distributed Cache load by default for LOADJOB type of
jobs. This is done by precreating the needed Distributed Cache files for all
the simulated jobs as part of a separate MapReduce job.</p>
@@ -570,6 +570,75 @@
</table>
</section>
+ <section id="compression-emulation">
+ <title>Emulating Compression/Decompression</title>
+ <p>MapReduce supports data compression and decompression.
+ Input to a MapReduce job can be compressed. Similarly, output of Map
+ and Reduce tasks can also be compressed. Compression/Decompression
+ emulation in GridMix is important because emulating
+ compression/decompression will effect the CPU and Memory usage of the
+ task. A task emulating compression/decompression will affect other
+ tasks and daemons running on the same node.
+ </p>
+ <p>Compression emulation is enabled if
+ <code>gridmix.compression-emulation.enable</code> is set to
+ <code>true</code>. By default compression emulation is enabled for
+ jobs of type <em>LOADJOB</em>. With compression emulation enabled,
+ GridMix will now generate compressed text data with a constant
+ compression ratio. Hence a simulated GridMix job will now emulate
+ compression/decompression using compressible text data (having a
+ constant compression ratio), irrespective of the compression ratio
+ observed in the actual job.
+ </p>
+ <p>A typical MapReduce Job deals with data compression/decompression in
+ the following phases </p>
+ <ul>
+ <li><code>Job input data decompression: </code> GridMix generates
+ compressible input data when compression emulation is enabled.
+ Based on the original job's configuration, a simulated GridMix job
+ will use a decompressor to read the compressed input data.
+ Currently, GridMix uses
+ <code>mapreduce.input.fileinputformat.inputdir</code> to determine
+ if the original job used compressed input data or
+ not. If the original job input files are uncompressed then the
+ simulated job will read the compressed input file with using a
+ decompressor.
+ </li>
+ <li><code>Intermediate data compression and decompression: </code>
+ If the original job has map output compression enabled then GridMix
+ too will enable map output compression for the simulated job.
+ Accordingly, the reducers will use a decompressor to read the map
+ output data.
+ </li>
+ <li><code>Job output data compression: </code>
+ If the original job's output is compressed then GridMix
+ too will enable job output compression for the simulated job.
+ </li>
+ </ul>
+
+ <p>The following configuration parameters affect compression emulation
+ </p>
+ <table>
+ <tr>
+ <th>Parameter</th>
+ <th>Description</th>
+ </tr>
+ <tr>
+ <td>gridmix.compression-emulation.enable</td>
+ <td>Enables compression emulation in simulated GridMix jobs.
+ Default is true.</td>
+ </tr>
+ </table>
+
+ <p>With compression emulation turned on, GridMix will generate compressed
+ input data. Hence the total size of the input
+ data will be lesser than the expected size. Set
+ <code>gridmix.min.file.size</code> to a smaller value (roughly 10% of
+ <code>gridmix.gen.bytes.per.file</code>) for enabling GridMix to
+ correctly emulate compression.
+ </p>
+ </section>
+
<section id="assumptions">
<title>Simplifying Assumptions</title>
<p>GridMix will be developed in stages, incorporating feedback and
@@ -587,9 +656,8 @@
sizes, namespace hierarchies, or any property of input, intermediate
or output data other than the bytes/records consumed and emitted from
a given task. This implies that some of the most heavily-used parts of
- the system - the compression libraries, text processing, streaming,
- etc. - cannot be meaningfully tested with the current
- implementation.</li>
+ the system - text processing, streaming, etc. - cannot be meaningfully tested
+ with the current implementation.</li>
<li><em>I/O Rates</em> - The rate at which records are
consumed/emitted is assumed to be limited only by the speed of the
reader/writer and constant throughout the task.</li>