blob: 8e8aa3249d115a3999f99437a1afa0bbe3453eb7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.builtin;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.FunctionInfo;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.Collector;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.shims.Hadoop23Shims;
import org.apache.hadoop.hive.shims.HadoopShimsSecure;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.hive.HiveShims;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.impl.util.hive.HiveUtils;
import org.apache.pig.tools.pigstats.PigStatusReporter;
import com.esotericsoftware.kryo.Serializer;
abstract class HiveUDFBase extends EvalFunc<Object> {
static protected class ConstantObjectInspectInfo {
ConstantObjectInspector[] constants;
static ConstantObjectInspectInfo parse(String params) throws IOException {
ConstantObjectInspectInfo info = new ConstantObjectInspectInfo();
params = params.replaceAll("\"", "'");
Object constant = Utils.parseConstant(params);
if (DataType.findType(constant) == DataType.TUPLE) {
Tuple t = (Tuple)constant;
info.constants = new ConstantObjectInspector[t.size()];
for (int i=0;i<t.size();i++) {
if (t.get(i) != null) {
info.constants[i] = HiveUtils.getConstantObjectInspector(t.get(i));
}
}
} else {
info.constants = new ConstantObjectInspector[1];
info.constants[0] = HiveUtils.getConstantObjectInspector(constant);
}
return info;
}
boolean isEmpty() {
return (constants == null);
}
int size() {
return constants.length;
}
ConstantObjectInspector get(int i) {
return constants[i];
}
void injectConstantObjectInspector(StructObjectInspector inputObjectInspector) {
if (!isEmpty()) {
for (int i=0;i<size();i++) {
if (get(i)!=null) {
StructField origField = inputObjectInspector.getAllStructFieldRefs().get(i);
StructField newfield = new HiveUtils.Field(origField.getFieldName(), get(i), i);
((List<HiveUtils.Field>)inputObjectInspector.getAllStructFieldRefs()).set(i, (HiveUtils.Field)newfield);
}
}
}
}
}
static protected Class resolveFunc(String funcName) throws IOException {
String className = funcName;
Class udfClass;
if (FunctionRegistry.getFunctionNames().contains(funcName)) {
FunctionInfo func;
try {
func = FunctionRegistry.getFunctionInfo(funcName);
} catch (SemanticException e) {
throw new IOException(e);
}
udfClass = func.getFunctionClass();
} else {
udfClass = PigContext.resolveClassName(className);
if (udfClass == null) {
throw new IOException("Cannot find Hive UDF " + funcName);
}
}
return udfClass;
}
/**
* A constant of Reporter type that does nothing.
*/
static protected class HiveReporter implements Reporter {
PigStatusReporter rep;
HiveReporter(PigStatusReporter rep) {
this.rep = rep;
}
public void setStatus(String s) {
rep.setStatus(s);
}
public void progress() {
rep.progress();
}
public Counter getCounter(Enum<?> name) {
try {
Counters counters = new Counters();
counters.incrCounter(name, rep.getCounter(name).getValue());
return counters.findCounter(name);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public Counter getCounter(String group, String name) {
try {
Counters counters = new Counters();
counters.incrCounter(group, name, rep.getCounter(group, name).getValue());
return counters.findCounter(group, name);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void incrCounter(Enum<?> key, long amount) {
rep.incrCounter(key, amount);
}
public void incrCounter(String group, String counter, long amount) {
rep.incrCounter(group, counter, amount);
}
public InputSplit getInputSplit() throws UnsupportedOperationException {
throw new UnsupportedOperationException("NULL reporter has no input");
}
public float getProgress() {
return 0;
}
};
protected static MapredContext instantiateMapredContext() {
Configuration conf = UDFContext.getUDFContext().getJobConf();
boolean isMap = conf.getBoolean(MRConfiguration.TASK_IS_MAP, false);
if (conf.get("exectype").startsWith("TEZ")) {
isMap = true;
HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_ENGINE, "tez");
}
MapredContext context = MapredContext.init(isMap, new JobConf(UDFContext.getUDFContext().getJobConf()));
context.setReporter(new HiveReporter(PigStatusReporter.getInstance()));
return context;
}
@Override
public List<String> getShipFiles() {
List<String> files = FuncUtils.getShipFiles(HiveShims.getHiveUDFDependentClasses(Hadoop23Shims.class));
return files;
}
static protected String getErrorMessage(Class c) {
StringBuffer message = new StringBuffer("Please declare " + c.getName() + " as ");
if (UDF.class.isAssignableFrom(c) || GenericUDF.class.isAssignableFrom(c)) {
message.append(HiveUDF.class.getName());
} else if (GenericUDTF.class.isAssignableFrom(c)) {
message.append(HiveUDTF.class.getName());
} else if (UDAF.class.isAssignableFrom(c) || GenericUDAFResolver.class.isAssignableFrom(c)) {
message.append(HiveUDAF.class.getName());
} else {
message = new StringBuffer(c.getName() + " is not Hive UDF");
}
return message.toString();
}
}