blob: 2e40f72fdd23b45bc5dfb52519412fc424cc59f8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
import junit.framework.TestCase;
import org.apache.commons.logging.*;
public class TestMRSequenceFileAsBinaryOutputFormat extends TestCase {
private static final Log LOG =
LogFactory.getLog(TestMRSequenceFileAsBinaryOutputFormat.class.getName());
private static final int RECORDS = 10000;
public void testBinary() throws IOException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
Path outdir = new Path(System.getProperty("test.build.data", "/tmp"),
"outseq");
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
FileOutputFormat.setOutputPath(job, outdir);
SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job,
IntWritable.class );
SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job,
DoubleWritable.class );
SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
BytesWritable bkey = new BytesWritable();
BytesWritable bval = new BytesWritable();
TaskAttemptContext context =
MapReduceTestUtil.createDummyMapTaskAttemptContext(job.getConfiguration());
OutputFormat<BytesWritable, BytesWritable> outputFormat =
new SequenceFileAsBinaryOutputFormat();
OutputCommitter committer = outputFormat.getOutputCommitter(context);
committer.setupJob(job);
RecordWriter<BytesWritable, BytesWritable> writer = outputFormat.
getRecordWriter(context);
IntWritable iwritable = new IntWritable();
DoubleWritable dwritable = new DoubleWritable();
DataOutputBuffer outbuf = new DataOutputBuffer();
LOG.info("Creating data by SequenceFileAsBinaryOutputFormat");
try {
for (int i = 0; i < RECORDS; ++i) {
iwritable = new IntWritable(r.nextInt());
iwritable.write(outbuf);
bkey.set(outbuf.getData(), 0, outbuf.getLength());
outbuf.reset();
dwritable = new DoubleWritable(r.nextDouble());
dwritable.write(outbuf);
bval.set(outbuf.getData(), 0, outbuf.getLength());
outbuf.reset();
writer.write(bkey, bval);
}
} finally {
writer.close(context);
}
committer.commitTask(context);
committer.commitJob(job);
InputFormat<IntWritable, DoubleWritable> iformat =
new SequenceFileInputFormat<IntWritable, DoubleWritable>();
int count = 0;
r.setSeed(seed);
SequenceFileInputFormat.setInputPaths(job, outdir);
LOG.info("Reading data by SequenceFileInputFormat");
for (InputSplit split : iformat.getSplits(job)) {
RecordReader<IntWritable, DoubleWritable> reader =
iformat.createRecordReader(split, context);
MapContext<IntWritable, DoubleWritable, BytesWritable, BytesWritable>
mcontext = new MapContextImpl<IntWritable, DoubleWritable,
BytesWritable, BytesWritable>(job.getConfiguration(),
context.getTaskAttemptID(), reader, null, null,
MapReduceTestUtil.createDummyReporter(),
split);
reader.initialize(split, mcontext);
try {
int sourceInt;
double sourceDouble;
while (reader.nextKeyValue()) {
sourceInt = r.nextInt();
sourceDouble = r.nextDouble();
iwritable = reader.getCurrentKey();
dwritable = reader.getCurrentValue();
assertEquals(
"Keys don't match: " + "*" + iwritable.get() + ":" +
sourceInt + "*",
sourceInt, iwritable.get());
assertTrue(
"Vals don't match: " + "*" + dwritable.get() + ":" +
sourceDouble + "*",
Double.compare(dwritable.get(), sourceDouble) == 0 );
++count;
}
} finally {
reader.close();
}
}
assertEquals("Some records not found", RECORDS, count);
}
public void testSequenceOutputClassDefaultsToMapRedOutputClass()
throws IOException {
Job job = Job.getInstance();
// Setting Random class to test getSequenceFileOutput{Key,Value}Class
job.setOutputKeyClass(FloatWritable.class);
job.setOutputValueClass(BooleanWritable.class);
assertEquals("SequenceFileOutputKeyClass should default to ouputKeyClass",
FloatWritable.class,
SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(job));
assertEquals("SequenceFileOutputValueClass should default to "
+ "ouputValueClass",
BooleanWritable.class,
SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(job));
SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job,
IntWritable.class );
SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job,
DoubleWritable.class );
assertEquals("SequenceFileOutputKeyClass not updated",
IntWritable.class,
SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(job));
assertEquals("SequenceFileOutputValueClass not updated",
DoubleWritable.class,
SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(job));
}
public void testcheckOutputSpecsForbidRecordCompression()
throws IOException {
Job job = Job.getInstance();
FileSystem fs = FileSystem.getLocal(job.getConfiguration());
Path outputdir = new Path(System.getProperty("test.build.data", "/tmp")
+ "/output");
fs.delete(outputdir, true);
// Without outputpath, FileOutputFormat.checkoutputspecs will throw
// InvalidJobConfException
FileOutputFormat.setOutputPath(job, outputdir);
// SequenceFileAsBinaryOutputFormat doesn't support record compression
// It should throw an exception when checked by checkOutputSpecs
SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
try {
new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(job);
} catch (Exception e) {
fail("Block compression should be allowed for "
+ "SequenceFileAsBinaryOutputFormat:Caught " + e.getClass().getName());
}
SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job,
CompressionType.RECORD);
try {
new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(job);
fail("Record compression should not be allowed for "
+ "SequenceFileAsBinaryOutputFormat");
} catch (InvalidJobConfException ie) {
// expected
} catch (Exception e) {
fail("Expected " + InvalidJobConfException.class.getName()
+ "but caught " + e.getClass().getName() );
}
}
}