blob: 342f549233ef1212e01e45955ade895807d8af14 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import junit.framework.TestCase;
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
public class TestMapCollection {
private static final Log LOG = LogFactory.getLog(
TestMapCollection.class.getName());
public static abstract class FillWritable implements Writable, Configurable {
private int len;
protected boolean disableRead;
private byte[] b;
private final Random r;
protected final byte fillChar;
public FillWritable(byte fillChar) {
this.fillChar = fillChar;
r = new Random();
final long seed = r.nextLong();
LOG.info("seed: " + seed);
r.setSeed(seed);
}
@Override
public Configuration getConf() {
return null;
}
public void setLength(int len) {
this.len = len;
}
public int compareTo(FillWritable o) {
if (o == this) return 0;
return len - o.len;
}
@Override
public int hashCode() {
return 37 * len;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof FillWritable)) return false;
return 0 == compareTo((FillWritable)o);
}
@Override
public void readFields(DataInput in) throws IOException {
if (disableRead) {
return;
}
len = WritableUtils.readVInt(in);
for (int i = 0; i < len; ++i) {
assertEquals("Invalid byte at " + i, fillChar, in.readByte());
}
}
@Override
public void write(DataOutput out) throws IOException {
if (0 == len) {
return;
}
int written = 0;
if (!disableRead) {
WritableUtils.writeVInt(out, len);
written -= WritableUtils.getVIntSize(len);
}
if (len > 1024) {
if (null == b || b.length < len) {
b = new byte[2 * len];
}
Arrays.fill(b, fillChar);
do {
final int write = Math.min(len - written, r.nextInt(len));
out.write(b, 0, write);
written += write;
} while (written < len);
assertEquals(len, written);
} else {
for (int i = written; i < len; ++i) {
out.write(fillChar);
}
}
}
}
public static class KeyWritable
extends FillWritable implements WritableComparable<FillWritable> {
static final byte keyFill = (byte)('K' & 0xFF);
public KeyWritable() {
super(keyFill);
}
@Override
public void setConf(Configuration conf) {
disableRead = conf.getBoolean("test.disable.key.read", false);
}
}
public static class ValWritable extends FillWritable {
public ValWritable() {
super((byte)('V' & 0xFF));
}
@Override
public void setConf(Configuration conf) {
disableRead = conf.getBoolean("test.disable.val.read", false);
}
}
public static class VariableComparator
implements RawComparator<KeyWritable>, Configurable {
private boolean readLen;
public VariableComparator() { }
@Override
public void setConf(Configuration conf) {
readLen = !conf.getBoolean("test.disable.key.read", false);
}
@Override
public Configuration getConf() { return null; }
public int compare(KeyWritable k1, KeyWritable k2) {
return k1.compareTo(k2);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
final int n1;
final int n2;
if (readLen) {
n1 = WritableUtils.decodeVIntSize(b1[s1]);
n2 = WritableUtils.decodeVIntSize(b2[s2]);
} else {
n1 = 0;
n2 = 0;
}
for (int i = s1 + n1; i < l1 - n1; ++i) {
assertEquals("Invalid key at " + s1, (int)KeyWritable.keyFill, b1[i]);
}
for (int i = s2 + n2; i < l2 - n2; ++i) {
assertEquals("Invalid key at " + s2, (int)KeyWritable.keyFill, b2[i]);
}
return l1 - l2;
}
}
public static class SpillReducer
extends Reducer<KeyWritable,ValWritable,NullWritable,NullWritable> {
private int numrecs;
private int expected;
@Override
protected void setup(Context job) {
numrecs = 0;
expected = job.getConfiguration().getInt("test.spillmap.records", 100);
}
@Override
protected void reduce(KeyWritable k, Iterable<ValWritable> values,
Context context) throws IOException, InterruptedException {
for (ValWritable val : values) {
++numrecs;
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
assertEquals("Unexpected record count", expected, numrecs);
}
}
public static class FakeSplit extends InputSplit implements Writable {
@Override
public void write(DataOutput out) throws IOException { }
@Override
public void readFields(DataInput in) throws IOException { }
@Override
public long getLength() { return 0L; }
@Override
public String[] getLocations() { return new String[0]; }
}
public abstract static class RecordFactory implements Configurable {
public Configuration getConf() { return null; }
public abstract int keyLen(int i);
public abstract int valLen(int i);
}
public static class FixedRecordFactory extends RecordFactory {
private int keylen;
private int vallen;
public FixedRecordFactory() { }
public void setConf(Configuration conf) {
keylen = conf.getInt("test.fixedrecord.keylen", 0);
vallen = conf.getInt("test.fixedrecord.vallen", 0);
}
public int keyLen(int i) { return keylen; }
public int valLen(int i) { return vallen; }
public static void setLengths(Configuration conf, int keylen, int vallen) {
conf.setInt("test.fixedrecord.keylen", keylen);
conf.setInt("test.fixedrecord.vallen", vallen);
conf.setBoolean("test.disable.key.read", 0 == keylen);
conf.setBoolean("test.disable.val.read", 0 == vallen);
}
}
public static class FakeIF extends InputFormat<KeyWritable,ValWritable> {
public FakeIF() { }
@Override
public List<InputSplit> getSplits(JobContext ctxt) throws IOException {
final int numSplits = ctxt.getConfiguration().getInt(
"test.mapcollection.num.maps", -1);
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
for (int i = 0; i < numSplits; ++i) {
splits.add(i, new FakeSplit());
}
return splits;
}
public RecordReader<KeyWritable,ValWritable> createRecordReader(
InputSplit ignored, final TaskAttemptContext taskContext) {
return new RecordReader<KeyWritable,ValWritable>() {
private RecordFactory factory;
private final KeyWritable key = new KeyWritable();
private final ValWritable val = new ValWritable();
private int current;
private int records;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) {
final Configuration conf = context.getConfiguration();
key.setConf(conf);
val.setConf(conf);
factory = ReflectionUtils.newInstance(
conf.getClass("test.mapcollection.class",
FixedRecordFactory.class, RecordFactory.class), conf);
assertNotNull(factory);
current = 0;
records = conf.getInt("test.spillmap.records", 100);
}
@Override
public boolean nextKeyValue() {
key.setLength(factory.keyLen(current));
val.setLength(factory.valLen(current));
return current++ < records;
}
@Override
public KeyWritable getCurrentKey() { return key; }
@Override
public ValWritable getCurrentValue() { return val; }
@Override
public float getProgress() { return (float) current / records; }
@Override
public void close() {
assertEquals("Unexpected count", records, current - 1);
}
};
}
}
private static void runTest(String name, int keylen, int vallen,
int records, int ioSortMB, float spillPer)
throws Exception {
Configuration conf = new Configuration();
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
Job job = Job.getInstance(new Cluster(conf), conf);
conf = job.getConfiguration();
conf.setInt(MRJobConfig.IO_SORT_MB, ioSortMB);
conf.set(MRJobConfig.MAP_SORT_SPILL_PERCENT, Float.toString(spillPer));
conf.setClass("test.mapcollection.class", FixedRecordFactory.class,
RecordFactory.class);
FixedRecordFactory.setLengths(conf, keylen, vallen);
conf.setInt("test.spillmap.records", records);
runTest(name, job);
}
private static void runTest(String name, Job job) throws Exception {
job.setNumReduceTasks(1);
job.getConfiguration().setInt(MRJobConfig.IO_SORT_FACTOR, 1000);
job.getConfiguration().set("fs.default.name", "file:///");
job.getConfiguration().setInt("test.mapcollection.num.maps", 1);
job.setInputFormatClass(FakeIF.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setMapperClass(Mapper.class);
job.setReducerClass(SpillReducer.class);
job.setMapOutputKeyClass(KeyWritable.class);
job.setMapOutputValueClass(ValWritable.class);
job.setSortComparatorClass(VariableComparator.class);
LOG.info("Running " + name);
assertTrue("Job failed!", job.waitForCompletion(false));
}
@Test
public void testValLastByte() throws Exception {
// last byte of record/key is the last/first byte in the spill buffer
runTest("vallastbyte", 128, 896, 1344, 1, 0.5f);
runTest("keylastbyte", 512, 1024, 896, 1, 0.5f);
}
@Test
public void testLargeRecords() throws Exception {
// maps emitting records larger than mapreduce.task.io.sort.mb
runTest("largerec", 100, 1024*1024, 5, 1, .8f);
runTest("largekeyzeroval", 1024*1024, 0, 5, 1, .8f);
}
@Test
public void testSpillPer2B() throws Exception {
// set non-default, 100% speculative spill boundary
runTest("fullspill2B", 1, 1, 10000, 1, 1.0f);
runTest("fullspill200B", 100, 100, 10000, 1, 1.0f);
runTest("fullspillbuf", 10 * 1024, 20 * 1024, 256, 1, 1.0f);
runTest("lt50perspill", 100, 100, 10000, 1, 0.3f);
}
@Test
public void testZeroVal() throws Exception {
// test key/value at zero-length
runTest("zeroval", 1, 0, 10000, 1, .8f);
runTest("zerokey", 0, 1, 10000, 1, .8f);
runTest("zerokeyval", 0, 0, 10000, 1, .8f);
runTest("zerokeyvalfull", 0, 0, 10000, 1, 1.0f);
}
@Test
public void testSingleRecord() throws Exception {
runTest("singlerecord", 100, 100, 1, 1, 1.0f);
runTest("zerokeyvalsingle", 0, 0, 1, 1, 1.0f);
}
@Test
public void testLowSpill() throws Exception {
runTest("lowspill", 4000, 96, 20, 1, 0.00390625f);
}
@Test
public void testSplitMetaSpill() throws Exception {
runTest("splitmetaspill", 7, 1, 131072, 1, 0.8f);
}
public static class StepFactory extends RecordFactory {
public int prekey;
public int postkey;
public int preval;
public int postval;
public int steprec;
public void setConf(Configuration conf) {
prekey = conf.getInt("test.stepfactory.prekey", 0);
postkey = conf.getInt("test.stepfactory.postkey", 0);
preval = conf.getInt("test.stepfactory.preval", 0);
postval = conf.getInt("test.stepfactory.postval", 0);
steprec = conf.getInt("test.stepfactory.steprec", 0);
}
public static void setLengths(Configuration conf, int prekey, int postkey,
int preval, int postval, int steprec) {
conf.setInt("test.stepfactory.prekey", prekey);
conf.setInt("test.stepfactory.postkey", postkey);
conf.setInt("test.stepfactory.preval", preval);
conf.setInt("test.stepfactory.postval", postval);
conf.setInt("test.stepfactory.steprec", steprec);
}
public int keyLen(int i) {
return i > steprec ? postkey : prekey;
}
public int valLen(int i) {
return i > steprec ? postval : preval;
}
}
@Test
public void testPostSpillMeta() throws Exception {
// write larger records until spill, then write records that generate
// no writes into the serialization buffer
Configuration conf = new Configuration();
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
Job job = Job.getInstance(new Cluster(conf), conf);
conf = job.getConfiguration();
conf.setInt(MRJobConfig.IO_SORT_MB, 1);
// 2^20 * spill = 14336 bytes available post-spill, at most 896 meta
conf.set(MRJobConfig.MAP_SORT_SPILL_PERCENT, Float.toString(.986328125f));
conf.setClass("test.mapcollection.class", StepFactory.class,
RecordFactory.class);
StepFactory.setLengths(conf, 4000, 0, 96, 0, 252);
conf.setInt("test.spillmap.records", 1000);
conf.setBoolean("test.disable.key.read", true);
conf.setBoolean("test.disable.val.read", true);
runTest("postspillmeta", job);
}
@Test
public void testLargeRecConcurrent() throws Exception {
Configuration conf = new Configuration();
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
Job job = Job.getInstance(new Cluster(conf), conf);
conf = job.getConfiguration();
conf.setInt(MRJobConfig.IO_SORT_MB, 1);
conf.set(MRJobConfig.MAP_SORT_SPILL_PERCENT, Float.toString(.986328125f));
conf.setClass("test.mapcollection.class", StepFactory.class,
RecordFactory.class);
StepFactory.setLengths(conf, 4000, 261120, 96, 1024, 251);
conf.setInt("test.spillmap.records", 255);
conf.setBoolean("test.disable.key.read", false);
conf.setBoolean("test.disable.val.read", false);
runTest("largeconcurrent", job);
}
public static class RandomFactory extends RecordFactory {
public int minkey;
public int maxkey;
public int minval;
public int maxval;
private final Random r = new Random();
private static int nextRand(Random r, int max) {
return (int)Math.exp(r.nextDouble() * Math.log(max));
}
public void setConf(Configuration conf) {
r.setSeed(conf.getLong("test.randomfactory.seed", 0L));
minkey = conf.getInt("test.randomfactory.minkey", 0);
maxkey = conf.getInt("test.randomfactory.maxkey", 0) - minkey;
minval = conf.getInt("test.randomfactory.minval", 0);
maxval = conf.getInt("test.randomfactory.maxval", 0) - minval;
}
public static void setLengths(Configuration conf, Random r, int max) {
int k1 = nextRand(r, max);
int k2 = nextRand(r, max);
if (k1 > k2) {
final int tmp = k1;
k1 = k2;
k2 = k1;
}
int v1 = nextRand(r, max);
int v2 = nextRand(r, max);
if (v1 > v2) {
final int tmp = v1;
v1 = v2;
v2 = v1;
}
setLengths(conf, k1, ++k2, v1, ++v2);
}
public static void setLengths(Configuration conf, int minkey, int maxkey,
int minval, int maxval) {
assert minkey < maxkey;
assert minval < maxval;
conf.setInt("test.randomfactory.minkey", minkey);
conf.setInt("test.randomfactory.maxkey", maxkey);
conf.setInt("test.randomfactory.minval", minval);
conf.setInt("test.randomfactory.maxval", maxval);
conf.setBoolean("test.disable.key.read", minkey == 0);
conf.setBoolean("test.disable.val.read", minval == 0);
}
public int keyLen(int i) {
return minkey + nextRand(r, maxkey - minkey);
}
public int valLen(int i) {
return minval + nextRand(r, maxval - minval);
}
}
@Test
public void testRandom() throws Exception {
Configuration conf = new Configuration();
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
Job job = Job.getInstance(new Cluster(conf), conf);
conf = job.getConfiguration();
conf.setInt(MRJobConfig.IO_SORT_MB, 1);
conf.setClass("test.mapcollection.class", RandomFactory.class,
RecordFactory.class);
final Random r = new Random();
final long seed = r.nextLong();
LOG.info("SEED: " + seed);
r.setSeed(seed);
conf.set(MRJobConfig.MAP_SORT_SPILL_PERCENT,
Float.toString(Math.max(0.1f, r.nextFloat())));
RandomFactory.setLengths(conf, r, 1 << 14);
conf.setInt("test.spillmap.records", r.nextInt(500));
conf.setLong("test.randomfactory.seed", r.nextLong());
runTest("random", job);
}
@Test
public void testRandomCompress() throws Exception {
Configuration conf = new Configuration();
conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 100);
Job job = Job.getInstance(new Cluster(conf), conf);
conf = job.getConfiguration();
conf.setInt(MRJobConfig.IO_SORT_MB, 1);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
conf.setClass("test.mapcollection.class", RandomFactory.class,
RecordFactory.class);
final Random r = new Random();
final long seed = r.nextLong();
LOG.info("SEED: " + seed);
r.setSeed(seed);
conf.set(MRJobConfig.MAP_SORT_SPILL_PERCENT,
Float.toString(Math.max(0.1f, r.nextFloat())));
RandomFactory.setLengths(conf, r, 1 << 14);
conf.setInt("test.spillmap.records", r.nextInt(500));
conf.setLong("test.randomfactory.seed", r.nextLong());
runTest("randomCompress", job);
}
}