blob: 362b35e5de6b46fdfd4b2716ce20a55557df3a7f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.CustomOutputCommitter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.gridmix.GridmixKey.Spec;
import org.apache.hadoop.mapred.gridmix.SleepJob.SleepReducer;
import org.apache.hadoop.mapred.gridmix.SleepJob.SleepSplit;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.ReduceContext;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.counters.GenericCounter;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl.DummyReporter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.apache.hadoop.tools.rumen.ResourceUsageMetrics;
import org.apache.hadoop.tools.rumen.ZombieJobProducer;
import org.apache.hadoop.util.Progress;
import org.junit.Assert;
import org.junit.Test;
import static org.mockito.Mockito.*;
import static org.junit.Assert.*;
public class TestGridMixClasses {
private static final Logger LOG = LoggerFactory.getLogger(TestGridMixClasses.class);
/*
* simple test LoadSplit (getters,copy, write, read...)
*/
@Test (timeout=1000)
public void testLoadSplit() throws Exception {
LoadSplit test = getLoadSplit();
ByteArrayOutputStream data = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(data);
test.write(out);
LoadSplit copy = new LoadSplit();
copy.readFields(new DataInputStream(new ByteArrayInputStream(data
.toByteArray())));
// data should be the same
assertEquals(test.getId(), copy.getId());
assertEquals(test.getMapCount(), copy.getMapCount());
assertEquals(test.getInputRecords(), copy.getInputRecords());
assertEquals(test.getOutputBytes()[0], copy.getOutputBytes()[0]);
assertEquals(test.getOutputRecords()[0], copy.getOutputRecords()[0]);
assertEquals(test.getReduceBytes(0), copy.getReduceBytes(0));
assertEquals(test.getReduceRecords(0), copy.getReduceRecords(0));
assertEquals(test.getMapResourceUsageMetrics().getCumulativeCpuUsage(),
copy.getMapResourceUsageMetrics().getCumulativeCpuUsage());
assertEquals(test.getReduceResourceUsageMetrics(0).getCumulativeCpuUsage(),
copy.getReduceResourceUsageMetrics(0).getCumulativeCpuUsage());
}
/*
* simple test GridmixSplit (copy, getters, write, read..)
*/
@Test (timeout=1000)
public void testGridmixSplit() throws Exception {
Path[] files = {new Path("one"), new Path("two")};
long[] start = {1, 2};
long[] lengths = {100, 200};
String[] locations = {"locOne", "loctwo"};
CombineFileSplit cfSplit = new CombineFileSplit(files, start, lengths,
locations);
ResourceUsageMetrics metrics = new ResourceUsageMetrics();
metrics.setCumulativeCpuUsage(200);
double[] reduceBytes = {8.1d, 8.2d};
double[] reduceRecords = {9.1d, 9.2d};
long[] reduceOutputBytes = {101L, 102L};
long[] reduceOutputRecords = {111L, 112L};
GridmixSplit test = new GridmixSplit(cfSplit, 2, 3, 4L, 5L, 6L, 7L,
reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords);
ByteArrayOutputStream data = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(data);
test.write(out);
GridmixSplit copy = new GridmixSplit();
copy.readFields(new DataInputStream(new ByteArrayInputStream(data
.toByteArray())));
// data should be the same
assertEquals(test.getId(), copy.getId());
assertEquals(test.getMapCount(), copy.getMapCount());
assertEquals(test.getInputRecords(), copy.getInputRecords());
assertEquals(test.getOutputBytes()[0], copy.getOutputBytes()[0]);
assertEquals(test.getOutputRecords()[0], copy.getOutputRecords()[0]);
assertEquals(test.getReduceBytes(0), copy.getReduceBytes(0));
assertEquals(test.getReduceRecords(0), copy.getReduceRecords(0));
}
/*
* test LoadMapper loadMapper should write to writer record for each reduce
*/
@SuppressWarnings({"rawtypes", "unchecked"})
@Test (timeout=10000)
public void testLoadMapper() throws Exception {
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
RecordReader<NullWritable, GridmixRecord> reader = new FakeRecordReader();
LoadRecordGkGrWriter writer = new LoadRecordGkGrWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
LoadSplit split = getLoadSplit();
MapContext<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> mapContext = new MapContextImpl<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>(
conf, taskId, reader, writer, committer, reporter, split);
// context
Context ctx = new WrappedMapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>()
.getMapContext(mapContext);
reader.initialize(split, ctx);
ctx.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(
ctx.getConfiguration(), true);
LoadJob.LoadMapper mapper = new LoadJob.LoadMapper();
// setup, map, clean
mapper.run(ctx);
Map<GridmixKey, GridmixRecord> data = writer.getData();
// check result
assertEquals(2, data.size());
}
private LoadSplit getLoadSplit() throws Exception {
Path[] files = {new Path("one"), new Path("two")};
long[] start = {1, 2};
long[] lengths = {100, 200};
String[] locations = {"locOne", "loctwo"};
CombineFileSplit cfSplit = new CombineFileSplit(files, start, lengths,
locations);
ResourceUsageMetrics metrics = new ResourceUsageMetrics();
metrics.setCumulativeCpuUsage(200);
ResourceUsageMetrics[] rMetrics = {metrics};
double[] reduceBytes = {8.1d, 8.2d};
double[] reduceRecords = {9.1d, 9.2d};
long[] reduceOutputBytes = {101L, 102L};
long[] reduceOutputRecords = {111L, 112L};
return new LoadSplit(cfSplit, 2, 1, 4L, 5L, 6L, 7L,
reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords,
metrics, rMetrics);
}
private class FakeRecordLLReader extends
RecordReader<LongWritable, LongWritable> {
int counter = 10;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
counter--;
return counter > 0;
}
@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return new LongWritable(counter);
}
@Override
public LongWritable getCurrentValue() throws IOException,
InterruptedException {
return new LongWritable(counter * 10);
}
@Override
public float getProgress() throws IOException, InterruptedException {
return counter / 10.0f;
}
@Override
public void close() throws IOException {
// restore data
counter = 10;
}
}
private class FakeRecordReader extends
RecordReader<NullWritable, GridmixRecord> {
int counter = 10;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
counter--;
return counter > 0;
}
@Override
public NullWritable getCurrentKey() throws IOException,
InterruptedException {
return NullWritable.get();
}
@Override
public GridmixRecord getCurrentValue() throws IOException,
InterruptedException {
return new GridmixRecord(100, 100L);
}
@Override
public float getProgress() throws IOException, InterruptedException {
return counter / 10.0f;
}
@Override
public void close() throws IOException {
// restore data
counter = 10;
}
}
private class LoadRecordGkGrWriter extends
RecordWriter<GridmixKey, GridmixRecord> {
private Map<GridmixKey, GridmixRecord> data = new HashMap<GridmixKey, GridmixRecord>();
@Override
public void write(GridmixKey key, GridmixRecord value) throws IOException,
InterruptedException {
data.put(key, value);
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
}
public Map<GridmixKey, GridmixRecord> getData() {
return data;
}
}
private class LoadRecordGkNullWriter extends
RecordWriter<GridmixKey, NullWritable> {
private Map<GridmixKey, NullWritable> data = new HashMap<GridmixKey, NullWritable>();
@Override
public void write(GridmixKey key, NullWritable value) throws IOException,
InterruptedException {
data.put(key, value);
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
}
public Map<GridmixKey, NullWritable> getData() {
return data;
}
}
private class LoadRecordWriter extends
RecordWriter<NullWritable, GridmixRecord> {
private Map<NullWritable, GridmixRecord> data = new HashMap<NullWritable, GridmixRecord>();
@Override
public void write(NullWritable key, GridmixRecord value)
throws IOException, InterruptedException {
data.put(key, value);
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
}
public Map<NullWritable, GridmixRecord> getData() {
return data;
}
}
/*
* test LoadSortComparator
*/
@Test (timeout=3000)
public void testLoadJobLoadSortComparator() throws Exception {
LoadJob.LoadSortComparator test = new LoadJob.LoadSortComparator();
ByteArrayOutputStream data = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(data);
WritableUtils.writeVInt(dos, 2);
WritableUtils.writeVInt(dos, 1);
WritableUtils.writeVInt(dos, 4);
WritableUtils.writeVInt(dos, 7);
WritableUtils.writeVInt(dos, 4);
byte[] b1 = data.toByteArray();
byte[] b2 = data.toByteArray();
// the same data should be equals
assertEquals(0, test.compare(b1, 0, 1, b2, 0, 1));
b2[2] = 5;
// compare like GridMixKey first byte: shift count -1=4-5
assertEquals(-1, test.compare(b1, 0, 1, b2, 0, 1));
b2[2] = 2;
// compare like GridMixKey first byte: shift count 2=4-2
assertEquals(2, test.compare(b1, 0, 1, b2, 0, 1));
// compare arrays by first byte witch offset (2-1) because 4==4
b2[2] = 4;
assertEquals(1, test.compare(b1, 0, 1, b2, 1, 1));
}
/*
* test SpecGroupingComparator
*/
@Test (timeout=3000)
public void testGridmixJobSpecGroupingComparator() throws Exception {
GridmixJob.SpecGroupingComparator test = new GridmixJob.SpecGroupingComparator();
ByteArrayOutputStream data = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(data);
WritableUtils.writeVInt(dos, 2);
WritableUtils.writeVInt(dos, 1);
// 0: REDUCE SPEC
WritableUtils.writeVInt(dos, 0);
WritableUtils.writeVInt(dos, 7);
WritableUtils.writeVInt(dos, 4);
byte[] b1 = data.toByteArray();
byte[] b2 = data.toByteArray();
// the same object should be equals
assertEquals(0, test.compare(b1, 0, 1, b2, 0, 1));
b2[2] = 1;
// for Reduce
assertEquals(-1, test.compare(b1, 0, 1, b2, 0, 1));
// by Reduce spec
b2[2] = 1; // 1: DATA SPEC
assertEquals(-1, test.compare(b1, 0, 1, b2, 0, 1));
// compare GridmixKey the same objects should be equals
assertEquals(0, test.compare(new GridmixKey(GridmixKey.DATA, 100, 2),
new GridmixKey(GridmixKey.DATA, 100, 2)));
// REDUSE SPEC
assertEquals(-1, test.compare(
new GridmixKey(GridmixKey.REDUCE_SPEC, 100, 2), new GridmixKey(
GridmixKey.DATA, 100, 2)));
assertEquals(1, test.compare(new GridmixKey(GridmixKey.DATA, 100, 2),
new GridmixKey(GridmixKey.REDUCE_SPEC, 100, 2)));
// only DATA
assertEquals(2, test.compare(new GridmixKey(GridmixKey.DATA, 102, 2),
new GridmixKey(GridmixKey.DATA, 100, 2)));
}
/*
* test CompareGridmixJob only equals and compare
*/
@Test (timeout=30000)
public void testCompareGridmixJob() throws Exception {
Configuration conf = new Configuration();
Path outRoot = new Path("target");
JobStory jobDesc = mock(JobStory.class);
when(jobDesc.getName()).thenReturn("JobName");
when(jobDesc.getJobConf()).thenReturn(new JobConf(conf));
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
GridmixJob j1 = new LoadJob(conf, 1000L, jobDesc, outRoot, ugi, 0);
GridmixJob j2 = new LoadJob(conf, 1000L, jobDesc, outRoot, ugi, 0);
GridmixJob j3 = new LoadJob(conf, 1000L, jobDesc, outRoot, ugi, 1);
GridmixJob j4 = new LoadJob(conf, 1000L, jobDesc, outRoot, ugi, 1);
assertTrue(j1.equals(j2));
assertEquals(0, j1.compareTo(j2));
// Only one parameter matters
assertFalse(j1.equals(j3));
// compare id and submissionMillis
assertEquals(-1, j1.compareTo(j3));
assertEquals(-1, j1.compareTo(j4));
}
/*
* test ReadRecordFactory. should read all data from inputstream
*/
@Test (timeout=3000)
public void testReadRecordFactory() throws Exception {
// RecordFactory factory, InputStream src, Configuration conf
RecordFactory rf = new FakeRecordFactory();
FakeInputStream input = new FakeInputStream();
ReadRecordFactory test = new ReadRecordFactory(rf, input,
new Configuration());
GridmixKey key = new GridmixKey(GridmixKey.DATA, 100, 2);
GridmixRecord val = new GridmixRecord(200, 2);
while (test.next(key, val)) {
}
// should be read 10* (GridmixKey.size +GridmixRecord.value)
assertEquals(3000, input.getCounter());
// should be -1 because all data readed;
assertEquals(-1, rf.getProgress(), 0.01);
test.close();
}
private class FakeRecordFactory extends RecordFactory {
private int counter = 10;
@Override
public void close() throws IOException {
}
@Override
public boolean next(GridmixKey key, GridmixRecord val) throws IOException {
counter--;
return counter >= 0;
}
@Override
public float getProgress() throws IOException {
return counter;
}
}
private class FakeInputStream extends InputStream implements Seekable,
PositionedReadable {
private long counter;
@Override
public int read() throws IOException {
return 0;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
int realLen = len - off;
counter += realLen;
for (int i = 0; i < b.length; i++) {
b[i] = 0;
}
return realLen;
}
public long getCounter() {
return counter;
}
@Override
public void seek(long pos) throws IOException {
}
@Override
public long getPos() throws IOException {
return counter;
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
return 0;
}
@Override
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
}
@Override
public void readFully(long position, byte[] buffer) throws IOException {
}
}
private class FakeFSDataInputStream extends FSDataInputStream {
public FakeFSDataInputStream(InputStream in) throws IOException {
super(in);
}
}
/*
* test LoadRecordReader. It class reads data from some files.
*/
@Test (timeout=3000)
public void testLoadJobLoadRecordReader() throws Exception {
LoadJob.LoadRecordReader test = new LoadJob.LoadRecordReader();
Configuration conf = new Configuration();
FileSystem fs1 = mock(FileSystem.class);
when(fs1.open((Path) anyObject())).thenReturn(
new FakeFSDataInputStream(new FakeInputStream()));
Path p1 = mock(Path.class);
when(p1.getFileSystem((JobConf) anyObject())).thenReturn(fs1);
FileSystem fs2 = mock(FileSystem.class);
when(fs2.open((Path) anyObject())).thenReturn(
new FakeFSDataInputStream(new FakeInputStream()));
Path p2 = mock(Path.class);
when(p2.getFileSystem((JobConf) anyObject())).thenReturn(fs2);
Path[] paths = {p1, p2};
long[] start = {0, 0};
long[] lengths = {1000, 1000};
String[] locations = {"temp1", "temp2"};
CombineFileSplit cfsplit = new CombineFileSplit(paths, start, lengths,
locations);
double[] reduceBytes = {100, 100};
double[] reduceRecords = {2, 2};
long[] reduceOutputBytes = {500, 500};
long[] reduceOutputRecords = {2, 2};
ResourceUsageMetrics metrics = new ResourceUsageMetrics();
ResourceUsageMetrics[] rMetrics = {new ResourceUsageMetrics(),
new ResourceUsageMetrics()};
LoadSplit input = new LoadSplit(cfsplit, 2, 3, 1500L, 2L, 3000L, 2L,
reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords,
metrics, rMetrics);
TaskAttemptID taskId = new TaskAttemptID();
TaskAttemptContext ctx = new TaskAttemptContextImpl(conf, taskId);
test.initialize(input, ctx);
GridmixRecord gr = test.getCurrentValue();
int counter = 0;
while (test.nextKeyValue()) {
gr = test.getCurrentValue();
if (counter == 0) {
// read first file
assertEquals(0.5, test.getProgress(), 0.001);
} else if (counter == 1) {
// read second file
assertEquals(1.0, test.getProgress(), 0.001);
}
//
assertEquals(1000, gr.getSize());
counter++;
}
assertEquals(1000, gr.getSize());
// Two files have been read
assertEquals(2, counter);
test.close();
}
/*
* test LoadReducer
*/
@Test (timeout=3000)
public void testLoadJobLoadReducer() throws Exception {
LoadJob.LoadReducer test = new LoadJob.LoadReducer();
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(FileOutputFormat.COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskid = new TaskAttemptID();
RawKeyValueIterator input = new FakeRawKeyValueIterator();
Counter counter = new GenericCounter();
Counter inputValueCounter = new GenericCounter();
LoadRecordWriter output = new LoadRecordWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new DummyReporter();
RawComparator<GridmixKey> comparator = new FakeRawComparator();
ReduceContext<GridmixKey, GridmixRecord, NullWritable, GridmixRecord> reduceContext = new ReduceContextImpl<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>(
conf, taskid, input, counter, inputValueCounter, output, committer,
reporter, comparator, GridmixKey.class, GridmixRecord.class);
// read for previous data
reduceContext.nextKeyValue();
org.apache.hadoop.mapreduce.Reducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>.Context context = new WrappedReducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>()
.getReducerContext(reduceContext);
// test.setup(context);
test.run(context);
// have been readed 9 records (-1 for previous)
assertEquals(9, counter.getValue());
assertEquals(10, inputValueCounter.getValue());
assertEquals(1, output.getData().size());
GridmixRecord record = output.getData().values().iterator()
.next();
assertEquals(1593, record.getSize());
}
protected class FakeRawKeyValueIterator implements RawKeyValueIterator {
int counter = 10;
@Override
public DataInputBuffer getKey() throws IOException {
ByteArrayOutputStream dt = new ByteArrayOutputStream();
GridmixKey key = new GridmixKey(GridmixKey.REDUCE_SPEC, 10 * counter, 1L);
Spec spec = new Spec();
spec.rec_in = counter;
spec.rec_out = counter;
spec.bytes_out = counter * 100;
key.setSpec(spec);
key.write(new DataOutputStream(dt));
DataInputBuffer result = new DataInputBuffer();
byte[] b = dt.toByteArray();
result.reset(b, 0, b.length);
return result;
}
@Override
public DataInputBuffer getValue() throws IOException {
ByteArrayOutputStream dt = new ByteArrayOutputStream();
GridmixRecord key = new GridmixRecord(100, 1);
key.write(new DataOutputStream(dt));
DataInputBuffer result = new DataInputBuffer();
byte[] b = dt.toByteArray();
result.reset(b, 0, b.length);
return result;
}
@Override
public boolean next() throws IOException {
counter--;
return counter >= 0;
}
@Override
public void close() throws IOException {
}
@Override
public Progress getProgress() {
return null;
}
}
private class FakeRawComparator implements RawComparator<GridmixKey> {
@Override
public int compare(GridmixKey o1, GridmixKey o2) {
return o1.compareTo(o2);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
if ((l1 - s1) != (l2 - s2)) {
return (l1 - s1) - (l2 - s2);
}
int len = l1 - s1;
for (int i = 0; i < len; i++) {
if (b1[s1 + i] != b2[s2 + i]) {
return b1[s1 + i] - b2[s2 + i];
}
}
return 0;
}
}
/*
* test SerialJobFactory
*/
@Test (timeout=120000)
public void testSerialReaderThread() throws Exception {
Configuration conf = new Configuration();
File fin = new File("src" + File.separator + "test" + File.separator
+ "resources" + File.separator + "data" + File.separator
+ "wordcount2.json");
// read couple jobs from wordcount2.json
JobStoryProducer jobProducer = new ZombieJobProducer(new Path(
fin.getAbsolutePath()), null, conf);
CountDownLatch startFlag = new CountDownLatch(1);
UserResolver resolver = new SubmitterUserResolver();
FakeJobSubmitter submitter = new FakeJobSubmitter();
File ws = new File("target" + File.separator + this.getClass().getName());
if (!ws.exists()) {
Assert.assertTrue(ws.mkdirs());
}
SerialJobFactory jobFactory = new SerialJobFactory(submitter, jobProducer,
new Path(ws.getAbsolutePath()), conf, startFlag, resolver);
Path ioPath = new Path(ws.getAbsolutePath());
jobFactory.setDistCacheEmulator(new DistributedCacheEmulator(conf, ioPath));
Thread test = jobFactory.createReaderThread();
test.start();
Thread.sleep(1000);
// SerialReaderThread waits startFlag
assertEquals(0, submitter.getJobs().size());
// start!
startFlag.countDown();
while (test.isAlive()) {
Thread.sleep(1000);
jobFactory.update(null);
}
// submitter was called twice
assertEquals(2, submitter.getJobs().size());
}
private class FakeJobSubmitter extends JobSubmitter {
// counter for submitted jobs
private List<GridmixJob> jobs = new ArrayList<GridmixJob>();
public FakeJobSubmitter() {
super(null, 1, 1, null, null);
}
@Override
public void add(GridmixJob job) throws InterruptedException {
jobs.add(job);
}
public List<GridmixJob> getJobs() {
return jobs;
}
}
/*
* test SleepMapper
*/
@SuppressWarnings({"unchecked", "rawtypes"})
@Test (timeout=30000)
public void testSleepMapper() throws Exception {
SleepJob.SleepMapper test = new SleepJob.SleepMapper();
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
FakeRecordLLReader reader = new FakeRecordLLReader();
LoadRecordGkNullWriter writer = new LoadRecordGkNullWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter();
SleepSplit split = getSleepSplit();
MapContext<LongWritable, LongWritable, GridmixKey, NullWritable> mapcontext = new MapContextImpl<LongWritable, LongWritable, GridmixKey, NullWritable>(
conf, taskId, reader, writer, committer, reporter, split);
Context context = new WrappedMapper<LongWritable, LongWritable, GridmixKey, NullWritable>()
.getMapContext(mapcontext);
long start = System.currentTimeMillis();
LOG.info("start:" + start);
LongWritable key = new LongWritable(start + 2000);
LongWritable value = new LongWritable(start + 2000);
// should slip 2 sec
test.map(key, value, context);
LOG.info("finish:" + System.currentTimeMillis());
assertTrue(System.currentTimeMillis() >= (start + 2000));
test.cleanup(context);
assertEquals(1, writer.getData().size());
}
private SleepSplit getSleepSplit() throws Exception {
String[] locations = {"locOne", "loctwo"};
long[] reduceDurations = {101L, 102L};
return new SleepSplit(0, 2000L, reduceDurations, 2, locations);
}
/*
* test SleepReducer
*/
@Test (timeout=3000)
public void testSleepReducer() throws Exception {
Configuration conf = new Configuration();
conf.setInt(JobContext.NUM_REDUCES, 2);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(FileOutputFormat.COMPRESS, true);
CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true);
conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
TaskAttemptID taskId = new TaskAttemptID();
RawKeyValueIterator input = new FakeRawKeyValueReducerIterator();
Counter counter = new GenericCounter();
Counter inputValueCounter = new GenericCounter();
RecordWriter<NullWritable, NullWritable> output = new LoadRecordReduceWriter();
OutputCommitter committer = new CustomOutputCommitter();
StatusReporter reporter = new DummyReporter();
RawComparator<GridmixKey> comparator = new FakeRawComparator();
ReduceContext<GridmixKey, NullWritable, NullWritable, NullWritable> reducecontext = new ReduceContextImpl<GridmixKey, NullWritable, NullWritable, NullWritable>(
conf, taskId, input, counter, inputValueCounter, output, committer,
reporter, comparator, GridmixKey.class, NullWritable.class);
org.apache.hadoop.mapreduce.Reducer<GridmixKey, NullWritable, NullWritable, NullWritable>.Context context = new WrappedReducer<GridmixKey, NullWritable, NullWritable, NullWritable>()
.getReducerContext(reducecontext);
SleepReducer test = new SleepReducer();
long start = System.currentTimeMillis();
test.setup(context);
long sleeper = context.getCurrentKey().getReduceOutputBytes();
// status has been changed
assertEquals("Sleeping... " + sleeper + " ms left", context.getStatus());
// should sleep 0.9 sec
assertTrue(System.currentTimeMillis() >= (start + sleeper));
test.cleanup(context);
// status has been changed again
assertEquals("Slept for " + sleeper, context.getStatus());
}
private class LoadRecordReduceWriter extends
RecordWriter<NullWritable, NullWritable> {
@Override
public void write(NullWritable key, NullWritable value) throws IOException,
InterruptedException {
}
@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
}
}
protected class FakeRawKeyValueReducerIterator implements RawKeyValueIterator {
int counter = 10;
@Override
public DataInputBuffer getKey() throws IOException {
ByteArrayOutputStream dt = new ByteArrayOutputStream();
GridmixKey key = new GridmixKey(GridmixKey.REDUCE_SPEC, 10 * counter, 1L);
Spec spec = new Spec();
spec.rec_in = counter;
spec.rec_out = counter;
spec.bytes_out = counter * 100;
key.setSpec(spec);
key.write(new DataOutputStream(dt));
DataInputBuffer result = new DataInputBuffer();
byte[] b = dt.toByteArray();
result.reset(b, 0, b.length);
return result;
}
@Override
public DataInputBuffer getValue() throws IOException {
ByteArrayOutputStream dt = new ByteArrayOutputStream();
NullWritable key = NullWritable.get();
key.write(new DataOutputStream(dt));
DataInputBuffer result = new DataInputBuffer();
byte[] b = dt.toByteArray();
result.reset(b, 0, b.length);
return result;
}
@Override
public boolean next() throws IOException {
counter--;
return counter >= 0;
}
@Override
public void close() throws IOException {
}
@Override
public Progress getProgress() {
return null;
}
}
}