blob: dc8f7c0a7b710226c2f4a06fe713884cb4fa286a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import junit.framework.TestCase;
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.util.Arrays;
import java.util.Iterator;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
public class TestMapCollection extends TestCase {
private static final Log LOG = LogFactory.getLog(
TestMapCollection.class.getName());
public static class KeyWritable
implements WritableComparable<KeyWritable>, JobConfigurable {
private final byte c = (byte)('K' & 0xFF);
static private boolean pedantic = false;
protected int expectedlen;
public void configure(JobConf conf) {
expectedlen = conf.getInt("test.keywritable.length", 1);
pedantic = conf.getBoolean("test.pedantic.verification", false);
}
public KeyWritable() { }
public KeyWritable(int len) {
this();
expectedlen = len;
}
public int getLength() {
return expectedlen;
}
public int compareTo(KeyWritable o) {
if (o == this) return 0;
return expectedlen - o.getLength();
}
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof KeyWritable)) return false;
return 0 == compareTo((KeyWritable)o);
}
public int hashCode() {
return 37 * expectedlen;
}
public void readFields(DataInput in) throws IOException {
if (expectedlen != 0) {
int bytesread;
if (pedantic) {
for (int i = 0; i < expectedlen; ++i)
assertEquals("Invalid byte at " + i, c, in.readByte());
bytesread = expectedlen;
} else {
bytesread = in.skipBytes(expectedlen);
}
assertEquals("Too few bytes in record", expectedlen, bytesread);
}
// cannot verify that the stream has been exhausted
}
public void write(DataOutput out) throws IOException {
if (expectedlen != 0) {
if (expectedlen > 1024) {
byte[] b = new byte[expectedlen];
Arrays.fill(b, c);
out.write(b);
} else {
for (int i = 0; i < expectedlen; ++i) {
out.write(c);
}
}
}
}
public static class Comparator extends WritableComparator {
public Comparator() {
super(KeyWritable.class);
}
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
if (pedantic) {
for (int i = s1; i < l1; ++i) {
assertEquals("Invalid key at " + s1, b1[i], (byte)('K' & 0xFF));
}
for (int i = s2; i < l2; ++i) {
assertEquals("Invalid key at " + s2, b2[i], (byte)('K' & 0xFF));
}
}
return l1 - l2;
}
}
static {
WritableComparator.define(KeyWritable.class, new Comparator());
}
}
public static class ValWritable extends KeyWritable {
private final byte c = (byte)('V' & 0xFF);
public ValWritable() { }
public ValWritable(int len) {
this();
expectedlen = len;
}
public void configure(JobConf conf) {
expectedlen = conf.getInt("test.valwritable.length", 1);
}
}
public static class SpillMapper
implements Mapper<NullWritable,NullWritable,KeyWritable,ValWritable> {
private int keylen = 1;
private int vallen = 1;
private int numrecs = 100;
public void configure(JobConf job) {
keylen = job.getInt("test.keywritable.length", 1);
vallen = job.getInt("test.valwritable.length", 1);
numrecs = job.getInt("test.spillmap.records", 100);
}
public void map(NullWritable key, NullWritable value,
OutputCollector<KeyWritable,ValWritable> out, Reporter reporter)
throws IOException {
KeyWritable k = new KeyWritable(keylen);
ValWritable v = new ValWritable(vallen);
for (int i = 0; i < numrecs; ++i) {
if ((i % 1000) == 0) {
reporter.progress();
}
out.collect(k, v);
}
}
public void close() { }
}
public static class SpillReducer
implements Reducer<KeyWritable,ValWritable,NullWritable,NullWritable> {
private int numrecs = 100;
public void configure(JobConf job) {
numrecs = job.getInt("test.spillmap.records", 100);
}
public void reduce(KeyWritable k, Iterator<ValWritable> values,
OutputCollector<NullWritable,NullWritable> out, Reporter reporter) {
int i = 0;
while (values.hasNext()) {
values.next();
++i;
}
assertEquals("Unexpected record count (" + i + "/" +
numrecs + ")", numrecs, i);
}
public void close() { }
}
public static class FakeSplit implements InputSplit {
public void write(DataOutput out) throws IOException { }
public void readFields(DataInput in) throws IOException { }
public long getLength() { return 0L; }
public String[] getLocations() { return new String[0]; }
}
public static class FakeIF
implements InputFormat<NullWritable,NullWritable> {
public FakeIF() { }
public InputSplit[] getSplits(JobConf conf, int numSplits) {
InputSplit[] splits = new InputSplit[numSplits];
for (int i = 0; i < splits.length; ++i) {
splits[i] = new FakeSplit();
}
return splits;
}
public RecordReader<NullWritable,NullWritable> getRecordReader(
InputSplit ignored, JobConf conf, Reporter reporter) {
return new RecordReader<NullWritable,NullWritable>() {
private boolean done = false;
public boolean next(NullWritable key, NullWritable value)
throws IOException {
if (done)
return false;
done = true;
return true;
}
public NullWritable createKey() { return NullWritable.get(); }
public NullWritable createValue() { return NullWritable.get(); }
public long getPos() throws IOException { return 0L; }
public void close() throws IOException { }
public float getProgress() throws IOException { return 0.0f; }
};
}
}
private static void runTest(String name, int keylen, int vallen,
int records, int ioSortMB, float recPer, float spillPer,
boolean pedantic) throws Exception {
JobConf conf = new JobConf(new Configuration(), SpillMapper.class);
conf.setInt("io.sort.mb", ioSortMB);
conf.set("io.sort.record.percent", Float.toString(recPer));
conf.set("io.sort.spill.percent", Float.toString(spillPer));
conf.setInt("test.keywritable.length", keylen);
conf.setInt("test.valwritable.length", vallen);
conf.setInt("test.spillmap.records", records);
conf.setBoolean("test.pedantic.verification", pedantic);
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
conf.setInputFormat(FakeIF.class);
conf.setOutputFormat(NullOutputFormat.class);
conf.setMapperClass(SpillMapper.class);
conf.setReducerClass(SpillReducer.class);
conf.setMapOutputKeyClass(KeyWritable.class);
conf.setMapOutputValueClass(ValWritable.class);
LOG.info("Running " + name);
JobClient.runJob(conf);
}
private static void runTest(String name, int keylen, int vallen, int records,
boolean pedantic) throws Exception {
runTest(name, keylen, vallen, records, 1, 0.05f, .8f, pedantic);
}
public void testLastFill() throws Exception {
// last byte of record/key is the last/first byte in the spill buffer
runTest("vallastbyte", 128, 896, 1344, 1, 0.125f, 0.5f, true);
runTest("keylastbyte", 512, 1024, 896, 1, 0.125f, 0.5f, true);
}
public void testLargeRecords() throws Exception {
// maps emitting records larger than io.sort.mb
runTest("largerec", 100, 1024*1024, 5, false);
runTest("largekeyzeroval", 1024*1024, 0, 5, false);
}
public void testSpillPer() throws Exception {
// set non-default, 100% speculative spill boundary
runTest("fullspill2B", 1, 1, 10000, 1, 0.05f, 1.0f, true);
runTest("fullspill200B", 100, 100, 10000, 1, 0.05f, 1.0f, true);
runTest("fullspillbuf", 10 * 1024, 20 * 1024, 256, 1, 0.3f, 1.0f, true);
runTest("lt50perspill", 100, 100, 10000, 1, 0.05f, 0.3f, true);
}
public void testZeroLength() throws Exception {
// test key/value at zero-length
runTest("zeroval", 1, 0, 10000, true);
runTest("zerokey", 0, 1, 10000, true);
runTest("zerokeyval", 0, 0, 10000, false);
runTest("zerokeyvalfull", 0, 0, 10000, 1, 0.05f, 1.0f, false);
}
}