blob: 35ee01c53dc5fc0a43f1083cb626f94f28f0acfc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.mapreduce;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import java.util.Random;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hcatalog.mapreduce.MultiOutputFormat.JobConfigurer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestMultiOutputFormat {
private static final Logger LOG = LoggerFactory.getLogger(TestMultiOutputFormat.class);
private static File workDir;
private static Configuration mrConf = null;
private static FileSystem fs = null;
private static MiniMRCluster mrCluster = null;
@BeforeClass
public static void setup() throws IOException {
createWorkDir();
Configuration conf = new Configuration(true);
fs = FileSystem.get(conf);
System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath());
// LocalJobRunner does not work with mapreduce OutputCommitter. So need
// to use MiniMRCluster. MAPREDUCE-2350
mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null,
new JobConf(conf));
mrConf = mrCluster.createJobConf();
}
private static void createWorkDir() throws IOException {
String testDir = System.getProperty("test.data.dir", "./");
testDir = testDir + "/test_multiout_" + Math.abs(new Random().nextLong()) + "/";
workDir = new File(new File(testDir).getCanonicalPath());
FileUtil.fullyDelete(workDir);
workDir.mkdirs();
}
@AfterClass
public static void tearDown() throws IOException {
if (mrCluster != null) {
mrCluster.shutdown();
}
FileUtil.fullyDelete(workDir);
}
/**
* A test job that reads a input file and outputs each word and the index of
* the word encountered to a text file and sequence file with different key
* values.
*/
@Test
public void testMultiOutputFormatWithoutReduce() throws Throwable {
Job job = new Job(mrConf, "MultiOutNoReduce");
job.setMapperClass(MultiOutWordIndexMapper.class);
job.setJarByClass(this.getClass());
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(MultiOutputFormat.class);
job.setNumReduceTasks(0);
JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
configurer.addOutputFormat("out1", TextOutputFormat.class, IntWritable.class, Text.class);
configurer.addOutputFormat("out2", SequenceFileOutputFormat.class, Text.class,
IntWritable.class);
Path outDir = new Path(workDir.getPath(), job.getJobName());
FileOutputFormat.setOutputPath(configurer.getJob("out1"), new Path(outDir, "out1"));
FileOutputFormat.setOutputPath(configurer.getJob("out2"), new Path(outDir, "out2"));
String fileContent = "Hello World";
String inputFile = createInputFile(fileContent);
FileInputFormat.setInputPaths(job, new Path(inputFile));
//Test for merging of configs
DistributedCache.addFileToClassPath(new Path(inputFile), job.getConfiguration(), fs);
String dummyFile = createInputFile("dummy file");
DistributedCache.addFileToClassPath(new Path(dummyFile), configurer.getJob("out1")
.getConfiguration(), fs);
// duplicate of the value. Merging should remove duplicates
DistributedCache.addFileToClassPath(new Path(inputFile), configurer.getJob("out2")
.getConfiguration(), fs);
configurer.configure();
// Verify if the configs are merged
Path[] fileClassPaths = DistributedCache.getFileClassPaths(job.getConfiguration());
Assert.assertArrayEquals(new Path[]{new Path(inputFile), new Path(dummyFile)},
fileClassPaths);
URI[] expectedCacheFiles = new URI[]{new Path(inputFile).makeQualified(fs).toUri(),
new Path(dummyFile).makeQualified(fs).toUri()};
URI[] cacheFiles = DistributedCache.getCacheFiles(job.getConfiguration());
Assert.assertArrayEquals(expectedCacheFiles, cacheFiles);
Assert.assertTrue(job.waitForCompletion(true));
Path textOutPath = new Path(outDir, "out1/part-m-00000");
String[] textOutput = readFully(textOutPath).split("\n");
Path seqOutPath = new Path(outDir, "out2/part-m-00000");
SequenceFile.Reader reader = new SequenceFile.Reader(fs, seqOutPath, mrConf);
Text key = new Text();
IntWritable value = new IntWritable();
String[] words = fileContent.split(" ");
Assert.assertEquals(words.length, textOutput.length);
LOG.info("Verifying file contents");
for (int i = 0; i < words.length; i++) {
Assert.assertEquals((i + 1) + "\t" + words[i], textOutput[i]);
reader.next(key, value);
Assert.assertEquals(words[i], key.toString());
Assert.assertEquals((i + 1), value.get());
}
Assert.assertFalse(reader.next(key, value));
}
/**
* A word count test job that reads a input file and outputs the count of
* words to a text file and sequence file with different key values.
*/
@Test
public void testMultiOutputFormatWithReduce() throws Throwable {
Job job = new Job(mrConf, "MultiOutWithReduce");
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(MultiOutWordCountReducer.class);
job.setJarByClass(this.getClass());
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(MultiOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
configurer.addOutputFormat("out1", TextOutputFormat.class, IntWritable.class, Text.class);
configurer.addOutputFormat("out2", SequenceFileOutputFormat.class, Text.class,
IntWritable.class);
configurer.addOutputFormat("out3", NullOutputFormat.class, Text.class,
IntWritable.class);
Path outDir = new Path(workDir.getPath(), job.getJobName());
FileOutputFormat.setOutputPath(configurer.getJob("out1"), new Path(outDir, "out1"));
FileOutputFormat.setOutputPath(configurer.getJob("out2"), new Path(outDir, "out2"));
configurer.configure();
String fileContent = "Hello World Hello World World";
String inputFile = createInputFile(fileContent);
FileInputFormat.setInputPaths(job, new Path(inputFile));
Assert.assertTrue(job.waitForCompletion(true));
Path textOutPath = new Path(outDir, "out1/part-r-00000");
String[] textOutput = readFully(textOutPath).split("\n");
Path seqOutPath = new Path(outDir, "out2/part-r-00000");
SequenceFile.Reader reader = new SequenceFile.Reader(fs, seqOutPath, mrConf);
Text key = new Text();
IntWritable value = new IntWritable();
String[] words = "Hello World".split(" ");
Assert.assertEquals(words.length, textOutput.length);
for (int i = 0; i < words.length; i++) {
Assert.assertEquals((i + 2) + "\t" + words[i], textOutput[i]);
reader.next(key, value);
Assert.assertEquals(words[i], key.toString());
Assert.assertEquals((i + 2), value.get());
}
Assert.assertFalse(reader.next(key, value));
}
/**
* Create a file for map input
*
* @return absolute path of the file.
* @throws IOException if any error encountered
*/
private String createInputFile(String content) throws IOException {
File f = File.createTempFile("input", "txt");
FileWriter writer = new FileWriter(f);
writer.write(content);
writer.close();
return f.getAbsolutePath();
}
private String readFully(Path file) throws IOException {
FSDataInputStream in = fs.open(file);
byte[] b = new byte[in.available()];
in.readFully(b);
in.close();
return new String(b);
}
private static class MultiOutWordIndexMapper extends
Mapper<LongWritable, Text, Writable, Writable> {
private IntWritable index = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
MultiOutputFormat.write("out1", index, word, context);
MultiOutputFormat.write("out2", word, index, context);
index.set(index.get() + 1);
}
}
}
private static class WordCountMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
private static class MultiOutWordCountReducer extends
Reducer<Text, IntWritable, Writable, Writable> {
private IntWritable count = new IntWritable();
@Override
protected void reduce(Text word, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
count.set(sum);
MultiOutputFormat.write("out1", count, word, context);
MultiOutputFormat.write("out2", word, count, context);
MultiOutputFormat.write("out3", word, count, context);
}
}
private static class NullOutputFormat<K, V> extends
org.apache.hadoop.mapreduce.lib.output.NullOutputFormat<K, V> {
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
return new OutputCommitter() {
public void abortTask(TaskAttemptContext taskContext) {
}
public void cleanupJob(JobContext jobContext) {
}
public void commitJob(JobContext jobContext) {
}
public void commitTask(TaskAttemptContext taskContext) {
Assert.fail("needsTaskCommit is false but commitTask was called");
}
public boolean needsTaskCommit(TaskAttemptContext taskContext) {
return false;
}
public void setupJob(JobContext jobContext) {
}
public void setupTask(TaskAttemptContext taskContext) {
}
};
}
}
}