blob: 2c61598b068a93c3194f3463c2cc2d8bb99d4e6f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.mapred;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings("deprecation")
public class TestGenericJob {
private static final String dir =
System.getProperty("test.dir", ".") + "target/testGenericJob";
private static Schema createSchema() {
List<Field> fields = new ArrayList<Schema.Field>();
fields.add(new Field("Optional", createArraySchema(), "", new ArrayList<Object>()));
Schema recordSchema =
Schema.createRecord("Container", "", "org.apache.avro.mapred", false);
recordSchema.setFields(fields);
return recordSchema;
}
private static Schema createArraySchema() {
List<Schema> schemas = new ArrayList<Schema>();
for (int i = 0; i < 5; i++) {
schemas.add(createInnerSchema("optional_field_" + i));
}
Schema unionSchema = Schema.createUnion(schemas);
return Schema.createArray(unionSchema);
}
private static Schema createInnerSchema(String name) {
Schema innerrecord = Schema.createRecord(name, "", "", false);
innerrecord.setFields
(Arrays.asList(new Field(name, Schema.create(Type.LONG), "", 0L)));
return innerrecord;
}
@Before
public void setup() throws IOException {
// needed to satisfy the framework only - input ignored in mapper
File indir = new File(dir);
indir.mkdirs();
File infile = new File(dir + "/in");
RandomAccessFile file = new RandomAccessFile(infile, "rw");
// add some data so framework actually calls our mapper
file.writeChars("aa bb cc\ndd ee ff\n");
file.close();
}
@After
public void tearDown() throws IOException {
FileUtil.fullyDelete(new File(dir));
}
static class AvroTestConverter
extends MapReduceBase
implements Mapper<LongWritable, Text,
AvroWrapper<Pair<Long, GenericData.Record>>, NullWritable> {
public void map(LongWritable key, Text value,
OutputCollector<AvroWrapper<Pair<Long,GenericData.Record>>,NullWritable> out,
Reporter reporter) throws IOException {
GenericData.Record optional_entry =
new GenericData.Record(createInnerSchema("optional_field_1"));
optional_entry.put("optional_field_1", 0l);
GenericData.Array<GenericData.Record> array =
new GenericData.Array<GenericData.Record>(1, createArraySchema());
array.add(optional_entry);
GenericData.Record container = new GenericData.Record(createSchema());
container.put("Optional", array);
out.collect(new AvroWrapper<Pair<Long,GenericData.Record>>
(new Pair<Long,GenericData.Record>(key.get(), container)),
NullWritable.get());
}
}
@Test
public void testJob() throws Exception {
JobConf job = new JobConf();
Path outputPath = new Path(dir + "/out");
outputPath.getFileSystem(job).delete(outputPath);
job.setInputFormat(TextInputFormat.class);
FileInputFormat.setInputPaths(job, dir + "/in");
job.setMapperClass(AvroTestConverter.class);
job.setNumReduceTasks(0);
FileOutputFormat.setOutputPath(job, outputPath);
System.out.println(createSchema());
AvroJob.setOutputSchema(job,
Pair.getPairSchema(Schema.create(Schema.Type.LONG),
createSchema()));
job.setOutputFormat(AvroOutputFormat.class);
JobClient.runJob(job);
}
}