blob: 955318c5d891740e310f6fbef1f0b92ec9a5d87f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.mapred;
import java.io.IOException;
import java.io.File;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.Reporter;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.file.DataFileReader;
import org.junit.Test;
import static org.junit.Assert.*;
import test.Weather;
/** Tests mapred API with a specific record. */
public class TestWeather {
/** Uses default mapper with no reduces for a map-only identity job. */
@Test
@SuppressWarnings("deprecation")
public void testMapOnly() throws Exception {
JobConf job = new JobConf();
String inDir = System.getProperty("share.dir","../../../share")+"/test/data";
Path input = new Path(inDir+"/weather.avro");
Path output = new Path(System.getProperty("test.dir","target/test")+"/weather-ident");
output.getFileSystem(job).delete(output);
job.setJobName("identity map weather");
AvroJob.setInputSchema(job, Weather.SCHEMA$);
AvroJob.setOutputSchema(job, Weather.SCHEMA$);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
FileOutputFormat.setCompressOutput(job, true);
job.setNumReduceTasks(0); // map-only
JobClient.runJob(job);
// check output is correct
DatumReader<Weather> reader = new SpecificDatumReader<Weather>();
DataFileReader<Weather> check = new DataFileReader<Weather>
(new File(inDir+"/weather.avro"), reader);
DataFileReader<Weather> sorted = new DataFileReader<Weather>
(new File(output.toString()+"/part-00000.avro"), reader);
for (Weather w : sorted)
assertEquals(check.next(), w);
check.close();
sorted.close();
}
// maps input Weather to Pair<Weather,Void>, to sort by Weather
public static class SortMapper extends AvroMapper<Weather,Pair<Weather,Void>>{
@Override
public void map(Weather w, AvroCollector<Pair<Weather,Void>> collector,
Reporter reporter) throws IOException {
collector.collect(new Pair<Weather,Void>(w, (Void)null));
}
}
// output keys only, since values are empty
public static class SortReducer
extends AvroReducer<Weather, Void, Weather> {
@Override
public void reduce(Weather w, Iterable<Void> ignore,
AvroCollector<Weather> collector,
Reporter reporter) throws IOException {
collector.collect(w);
}
}
@Test
@SuppressWarnings("deprecation")
public void testSort() throws Exception {
JobConf job = new JobConf();
String inDir = System.getProperty("share.dir","../../../share")+"/test/data";
Path input = new Path(inDir+"/weather.avro");
Path output = new Path(System.getProperty("test.dir","target/test")+"/weather-sort");
output.getFileSystem(job).delete(output);
job.setJobName("sort weather");
AvroJob.setInputSchema(job, Weather.SCHEMA$);
AvroJob.setMapOutputSchema
(job, Pair.getPairSchema(Weather.SCHEMA$, Schema.create(Type.NULL)));
AvroJob.setOutputSchema(job, Weather.SCHEMA$);
AvroJob.setMapperClass(job, SortMapper.class);
AvroJob.setReducerClass(job, SortReducer.class);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
FileOutputFormat.setCompressOutput(job, true);
JobClient.runJob(job);
// check output is correct
DatumReader<Weather> reader = new SpecificDatumReader<Weather>();
DataFileReader<Weather> check = new DataFileReader<Weather>
(new File(inDir+"/weather-sorted.avro"), reader);
DataFileReader<Weather> sorted = new DataFileReader<Weather>
(new File(output.toString()+"/part-00000.avro"), reader);
for (Weather w : sorted)
assertEquals(check.next(), w);
check.close();
sorted.close();
}
}