blob: 4110d5d67178484a4cb91cddeca63f0b9efc7f42 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.odpi.specs.runtime.hive;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;
public class HCatalogMR extends Configured implements Tool {
private final static String INPUT_SCHEMA = "odpi.test.hcat.schema.input";
private final static String OUTPUT_SCHEMA = "odpi.test.hcat.schema.output";
@Override
public int run(String[] args) throws Exception {
String inputTable = null;
String outputTable = null;
String inputSchemaStr = null;
String outputSchemaStr = null;
for(int i = 0; i < args.length; i++){
if(args[i].equalsIgnoreCase("-it")){
inputTable = args[i+1];
}else if(args[i].equalsIgnoreCase("-ot")){
outputTable = args[i+1];
}else if(args[i].equalsIgnoreCase("-is")){
inputSchemaStr = args[i+1];
}else if(args[i].equalsIgnoreCase("-os")){
outputSchemaStr = args[i+1];
}
}
Configuration conf = getConf();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
conf.set(INPUT_SCHEMA, inputSchemaStr);
conf.set(OUTPUT_SCHEMA, outputSchemaStr);
Job job = new Job(conf, "odpi_hcat_test");
HCatInputFormat.setInput(job, "default", inputTable);
job.setInputFormatClass(HCatInputFormat.class);
job.setJarByClass(HCatalogMR.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(WritableComparable.class);
job.setOutputValueClass(HCatRecord.class);
HCatOutputFormat.setOutput(job, OutputJobInfo.create("default", outputTable, null));
HCatOutputFormat.setSchema(job, HCatSchemaUtils.getHCatSchema(outputSchemaStr));
job.setOutputFormatClass(HCatOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static class Map extends Mapper<WritableComparable,
HCatRecord, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private HCatSchema inputSchema = null;
@Override
protected void map(WritableComparable key, HCatRecord value, Context context)
throws IOException, InterruptedException {
if (inputSchema == null) {
inputSchema =
HCatSchemaUtils.getHCatSchema(context.getConfiguration().get(INPUT_SCHEMA));
}
String line = value.getString("line", inputSchema);
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, WritableComparable, HCatRecord> {
private HCatSchema outputSchema = null;
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws
IOException, InterruptedException {
if (outputSchema == null) {
outputSchema =
HCatSchemaUtils.getHCatSchema(context.getConfiguration().get(OUTPUT_SCHEMA));
}
int sum = 0;
for (IntWritable i : values) {
sum += i.get();
}
HCatRecord output = new DefaultHCatRecord(2);
output.set("word", outputSchema, key);
output.set("count", outputSchema, sum);
context.write(null, output);
}
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new HCatalogMR(), args);
System.exit(exitCode);
}
}