blob: 1b6ab68599414eb87f78a5af819b9dda5473f7d9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.mapred;
import java.io.IOException;
import java.io.File;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.BufferedInputStream;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.Reporter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.reflect.ReflectDatumReader;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestReflectJob {
/** The input class. */
public static class Text {
private String text = "";
public Text() {}
public Text(String text) { this.text = text; }
public String toString() { return text; }
}
/** The intermediate data class. */
public static class Count {
private long count;
public Count() {}
public Count(long count) { this.count = count; }
}
/** The output class. */
public static class WordCount {
private String word;
private long count;
public WordCount() {}
public WordCount(String word, long count) {
this.word = word;
this.count = count;
}
}
public static class MapImpl extends AvroMapper<Text, Pair<Text,Count>> {
@Override
public void map(Text text, AvroCollector<Pair<Text,Count>> collector,
Reporter reporter) throws IOException {
StringTokenizer tokens = new StringTokenizer(text.toString());
while (tokens.hasMoreTokens())
collector.collect(new Pair<Text,Count>(new Text(tokens.nextToken()),
new Count(1L)));
}
}
public static class ReduceImpl
extends AvroReducer<Text, Count, WordCount> {
@Override
public void reduce(Text word, Iterable<Count> counts,
AvroCollector<WordCount> collector,
Reporter reporter) throws IOException {
long sum = 0;
for (Count count : counts)
sum += count.count;
collector.collect(new WordCount(word.text, sum));
}
}
@Test
@SuppressWarnings("deprecation")
public void testJob() throws Exception {
JobConf job = new JobConf();
String dir = System.getProperty("test.dir", ".") + "target/testReflectJob";
Path inputPath = new Path(dir + "/in");
Path outputPath = new Path(dir + "/out");
outputPath.getFileSystem(job).delete(outputPath);
inputPath.getFileSystem(job).delete(inputPath);
writeLinesFile(new File(dir+"/in"));
job.setJobName("reflect");
AvroJob.setInputSchema(job, ReflectData.get().getSchema(Text.class));
AvroJob.setMapOutputSchema
(job, new Pair(new Text(""), new Count(0L)).getSchema());
AvroJob.setOutputSchema(job, ReflectData.get().getSchema(WordCount.class));
AvroJob.setMapperClass(job, MapImpl.class);
//AvroJob.setCombinerClass(job, ReduceImpl.class);
AvroJob.setReducerClass(job, ReduceImpl.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
AvroJob.setReflect(job); // use reflection
JobClient.runJob(job);
validateCountsFile(new File(new File(dir, "out"), "part-00000.avro"));
}
private void writeLinesFile(File dir) throws IOException {
DatumWriter<Text> writer = new ReflectDatumWriter<Text>();
DataFileWriter<Text> out = new DataFileWriter<Text>(writer);
File linesFile = new File(dir+"/lines.avro");
dir.mkdirs();
out.create(ReflectData.get().getSchema(Text.class), linesFile);
for (String line : WordCountUtil.LINES)
out.append(new Text(line));
out.close();
}
private void validateCountsFile(File file) throws Exception {
DatumReader<WordCount> reader = new ReflectDatumReader<WordCount>();
InputStream in = new BufferedInputStream(new FileInputStream(file));
DataFileStream<WordCount> counts = new DataFileStream<WordCount>(in,reader);
int numWords = 0;
for (WordCount wc : counts) {
assertEquals(wc.word,
WordCountUtil.COUNTS.get(wc.word),
(Long)wc.count);
numWords++;
}
in.close();
assertEquals(WordCountUtil.COUNTS.size(), numWords);
}
}