package edu.uci.ics.hivesterix.runtime.factory.evaluator; | |
import java.util.ArrayList; | |
import java.util.Iterator; | |
import java.util.List; | |
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; | |
import edu.uci.ics.hivesterix.logical.expression.ExpressionConstant; | |
import edu.uci.ics.hivesterix.logical.expression.Schema; | |
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.AggregateFunctionFactoryAdapter; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.UnnestingFunctionFactoryAdapter; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; | |
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; | |
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext; | |
import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; | |
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory; | |
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory; | |
import edu.uci.ics.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; | |
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; | |
import edu.uci.ics.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory; | |
public class HiveExpressionRuntimeProvider implements | |
IExpressionRuntimeProvider { | |
public static final IExpressionRuntimeProvider INSTANCE = new HiveExpressionRuntimeProvider(); | |
@Override | |
public IAggregateEvaluatorFactory createAggregateFunctionFactory( | |
AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, | |
IOperatorSchema[] inputSchemas, JobGenContext context) | |
throws AlgebricksException { | |
Schema schema = this.getSchema(inputSchemas[0], env); | |
return new AggregateFunctionFactoryAdapter( | |
new AggregationFunctionFactory(expr, schema, env)); | |
} | |
@Override | |
public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory( | |
AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, | |
IOperatorSchema[] inputSchemas, JobGenContext context) | |
throws AlgebricksException { | |
Schema schema = this.getSchema(inputSchemas[0], env); | |
return new AggregationFunctionSerializableFactory(expr, schema, env); | |
} | |
@Override | |
public IRunningAggregateEvaluatorFactory createRunningAggregateFunctionFactory( | |
StatefulFunctionCallExpression expr, IVariableTypeEnvironment env, | |
IOperatorSchema[] inputSchemas, JobGenContext context) | |
throws AlgebricksException { | |
return null; | |
} | |
@Override | |
public IUnnestingEvaluatorFactory createUnnestingFunctionFactory( | |
UnnestingFunctionCallExpression expr, IVariableTypeEnvironment env, | |
IOperatorSchema[] inputSchemas, JobGenContext context) | |
throws AlgebricksException { | |
Schema schema = this.getSchema(inputSchemas[0], env); | |
return new UnnestingFunctionFactoryAdapter( | |
new UnnestingFunctionFactory(expr, schema, env)); | |
} | |
public IScalarEvaluatorFactory createEvaluatorFactory( | |
ILogicalExpression expr, IVariableTypeEnvironment env, | |
IOperatorSchema[] inputSchemas, JobGenContext context) | |
throws AlgebricksException { | |
switch (expr.getExpressionTag()) { | |
case VARIABLE: { | |
VariableReferenceExpression v = (VariableReferenceExpression) expr; | |
return new ScalarEvaluatorFactoryAdapter( | |
createVariableEvaluatorFactory(v, env, inputSchemas, | |
context)); | |
} | |
case CONSTANT: { | |
ConstantExpression c = (ConstantExpression) expr; | |
return new ScalarEvaluatorFactoryAdapter( | |
createConstantEvaluatorFactory(c, env, inputSchemas, | |
context)); | |
} | |
case FUNCTION_CALL: { | |
AbstractFunctionCallExpression fun = (AbstractFunctionCallExpression) expr; | |
FunctionIdentifier fid = fun.getFunctionIdentifier(); | |
if (fid.getName().equals(ExpressionConstant.FIELDACCESS)) { | |
return new ScalarEvaluatorFactoryAdapter( | |
createFieldExpressionEvaluatorFactory(fun, env, | |
inputSchemas, context)); | |
} | |
if (fid.getName().equals(ExpressionConstant.FIELDACCESS)) { | |
return new ScalarEvaluatorFactoryAdapter( | |
createNullExpressionEvaluatorFactory(fun, env, | |
inputSchemas, context)); | |
} | |
if (fun.getKind() == FunctionKind.SCALAR) { | |
ScalarFunctionCallExpression scalar = (ScalarFunctionCallExpression) fun; | |
return new ScalarEvaluatorFactoryAdapter( | |
createScalarFunctionEvaluatorFactory(scalar, env, | |
inputSchemas, context)); | |
} else { | |
throw new AlgebricksException( | |
"Cannot create evaluator for function " + fun | |
+ " of kind " + fun.getKind()); | |
} | |
} | |
default: { | |
throw new IllegalStateException(); | |
} | |
} | |
} | |
private ICopyEvaluatorFactory createVariableEvaluatorFactory( | |
VariableReferenceExpression expr, IVariableTypeEnvironment env, | |
IOperatorSchema[] inputSchemas, JobGenContext context) | |
throws AlgebricksException { | |
Schema schema = this.getSchema(inputSchemas[0], env); | |
return new ColumnExpressionEvaluatorFactory(expr, schema, env); | |
} | |
private ICopyEvaluatorFactory createScalarFunctionEvaluatorFactory( | |
AbstractFunctionCallExpression expr, IVariableTypeEnvironment env, | |
IOperatorSchema[] inputSchemas, JobGenContext context) | |
throws AlgebricksException { | |
List<String> names = new ArrayList<String>(); | |
List<TypeInfo> types = new ArrayList<TypeInfo>(); | |
for (IOperatorSchema inputSchema : inputSchemas) { | |
Schema schema = this.getSchema(inputSchema, env); | |
names.addAll(schema.getNames()); | |
types.addAll(schema.getTypes()); | |
} | |
Schema inputSchema = new Schema(names, types); | |
return new ScalarFunctionExpressionEvaluatorFactory(expr, inputSchema, | |
env); | |
} | |
private ICopyEvaluatorFactory createFieldExpressionEvaluatorFactory( | |
AbstractFunctionCallExpression expr, IVariableTypeEnvironment env, | |
IOperatorSchema[] inputSchemas, JobGenContext context) | |
throws AlgebricksException { | |
Schema schema = this.getSchema(inputSchemas[0], env); | |
return new FieldExpressionEvaluatorFactory(expr, schema, env); | |
} | |
private ICopyEvaluatorFactory createNullExpressionEvaluatorFactory( | |
AbstractFunctionCallExpression expr, IVariableTypeEnvironment env, | |
IOperatorSchema[] inputSchemas, JobGenContext context) | |
throws AlgebricksException { | |
Schema schema = this.getSchema(inputSchemas[0], env); | |
return new NullExpressionEvaluatorFactory(expr, schema, env); | |
} | |
private ICopyEvaluatorFactory createConstantEvaluatorFactory( | |
ConstantExpression expr, IVariableTypeEnvironment env, | |
IOperatorSchema[] inputSchemas, JobGenContext context) | |
throws AlgebricksException { | |
Schema schema = this.getSchema(inputSchemas[0], env); | |
return new ConstantExpressionEvaluatorFactory(expr, schema, env); | |
} | |
private Schema getSchema(IOperatorSchema inputSchema, | |
IVariableTypeEnvironment env) throws AlgebricksException { | |
List<String> names = new ArrayList<String>(); | |
List<TypeInfo> types = new ArrayList<TypeInfo>(); | |
Iterator<LogicalVariable> variables = inputSchema.iterator(); | |
while (variables.hasNext()) { | |
LogicalVariable var = variables.next(); | |
names.add(var.toString()); | |
types.add((TypeInfo) env.getVarType(var)); | |
} | |
Schema schema = new Schema(names, types); | |
return schema; | |
} | |
} |