blob: 8523e75ee368214bb6c0ebb42109aa289e935ef1 [file] [log] [blame]
package edu.uci.ics.asterix.jobgen;
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.functions.FunctionDescriptorTag;
import edu.uci.ics.asterix.external.library.ExternalFunctionDescriptorProvider;
import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.functions.IExternalFunctionInfo;
import edu.uci.ics.asterix.om.functions.IFunctionDescriptor;
import edu.uci.ics.asterix.runtime.evaluators.comparisons.ComparisonEvalFactory;
import edu.uci.ics.asterix.runtime.formats.FormatUtils;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ILogicalExpressionJobGen;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions.ComparisonKind;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyRunningAggregateFunctionFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyUnnestingFunctionFactory;
import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
public class AqlLogicalExpressionJobGen implements ILogicalExpressionJobGen {
public static final AqlLogicalExpressionJobGen INSTANCE = new AqlLogicalExpressionJobGen();
private AqlLogicalExpressionJobGen() {
}
@Override
public ICopyAggregateFunctionFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
throws AlgebricksException {
ICopyEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
IFunctionDescriptor fd = getFunctionDescriptor(expr, env, context);
switch (fd.getFunctionDescriptorTag()) {
case SERIALAGGREGATE:
return null;
case AGGREGATE:
return fd.createAggregateFunctionFactory(args);
default:
throw new IllegalStateException("Invalid function descriptor " + fd.getFunctionDescriptorTag()
+ " expected " + FunctionDescriptorTag.SERIALAGGREGATE + " or "
+ FunctionDescriptorTag.AGGREGATE);
}
}
@Override
public ICopyRunningAggregateFunctionFactory createRunningAggregateFunctionFactory(
StatefulFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,
JobGenContext context) throws AlgebricksException {
ICopyEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
return getFunctionDescriptor(expr, env, context).createRunningAggregateFunctionFactory(args);
}
@Override
public ICopyUnnestingFunctionFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
throws AlgebricksException {
ICopyEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
return getFunctionDescriptor(expr, env, context).createUnnestingFunctionFactory(args);
}
@Override
public ICopyEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env,
IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
ICopyEvaluatorFactory copyEvaluatorFactory = null;
switch (expr.getExpressionTag()) {
case VARIABLE: {
VariableReferenceExpression v = (VariableReferenceExpression) expr;
copyEvaluatorFactory = createVariableEvaluatorFactory(v, inputSchemas, context);
return copyEvaluatorFactory;
}
case CONSTANT: {
ConstantExpression c = (ConstantExpression) expr;
copyEvaluatorFactory = createConstantEvaluatorFactory(c, inputSchemas, context);
return copyEvaluatorFactory;
}
case FUNCTION_CALL: {
copyEvaluatorFactory = createScalarFunctionEvaluatorFactory((AbstractFunctionCallExpression) expr, env,
inputSchemas, context);
return copyEvaluatorFactory;
}
default:
throw new IllegalStateException();
}
}
private ICopyEvaluatorFactory createVariableEvaluatorFactory(VariableReferenceExpression expr,
IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
LogicalVariable variable = expr.getVariableReference();
for (IOperatorSchema scm : inputSchemas) {
int pos = scm.findVariable(variable);
if (pos >= 0) {
return new ColumnAccessEvalFactory(pos);
}
}
throw new AlgebricksException("Variable " + variable + " could not be found in any input schema.");
}
private ICopyEvaluatorFactory createScalarFunctionEvaluatorFactory(AbstractFunctionCallExpression expr,
IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context)
throws AlgebricksException {
ICopyEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
FunctionIdentifier fi = expr.getFunctionIdentifier();
ComparisonKind ck = AlgebricksBuiltinFunctions.getComparisonType(fi);
if (ck != null) {
return new ComparisonEvalFactory(args[0], args[1], ck);
}
IFunctionDescriptor fd = null;
if (!(expr.getFunctionInfo() instanceof IExternalFunctionInfo)) {
AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
IDataFormat format = FormatUtils.getDefaultFormat();
fd = format.resolveFunction(expr, env);
} else {
try {
fd = ExternalFunctionDescriptorProvider.getExternalFunctionDescriptor((IExternalFunctionInfo) expr
.getFunctionInfo());
} catch (AsterixException ae) {
throw new AlgebricksException(ae);
}
}
return fd.createEvaluatorFactory(args);
}
private ICopyEvaluatorFactory createConstantEvaluatorFactory(ConstantExpression expr,
IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
IDataFormat format = FormatUtils.getDefaultFormat();
return format.getConstantEvalFactory(expr.getValue());
}
private ICopyEvaluatorFactory[] codegenArguments(AbstractFunctionCallExpression expr, IVariableTypeEnvironment env,
IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException {
List<Mutable<ILogicalExpression>> arguments = expr.getArguments();
int n = arguments.size();
ICopyEvaluatorFactory[] args = new ICopyEvaluatorFactory[n];
int i = 0;
for (Mutable<ILogicalExpression> a : arguments) {
args[i++] = createEvaluatorFactory(a.getValue(), env, inputSchemas, context);
}
return args;
}
@Override
public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas,
JobGenContext context) throws AlgebricksException {
ICopyEvaluatorFactory[] args = codegenArguments(expr, env, inputSchemas, context);
IFunctionDescriptor fd = getFunctionDescriptor(expr, env, context);
switch (fd.getFunctionDescriptorTag()) {
case AGGREGATE: {
if (AsterixBuiltinFunctions.isAggregateFunctionSerializable(fd.getIdentifier())) {
AggregateFunctionCallExpression serialAggExpr = AsterixBuiltinFunctions
.makeSerializableAggregateFunctionExpression(fd.getIdentifier(), expr.getArguments());
IFunctionDescriptor afdd = getFunctionDescriptor(serialAggExpr, env, context);
return afdd.createSerializableAggregateFunctionFactory(args);
} else {
throw new AlgebricksException(
"Trying to create a serializable aggregate from a non-serializable aggregate function descriptor. (fi="
+ expr.getFunctionIdentifier() + ")");
}
}
case SERIALAGGREGATE: {
return fd.createSerializableAggregateFunctionFactory(args);
}
default:
throw new IllegalStateException("Invalid function descriptor " + fd.getFunctionDescriptorTag()
+ " expected " + FunctionDescriptorTag.SERIALAGGREGATE + " or "
+ FunctionDescriptorTag.AGGREGATE);
}
}
private IFunctionDescriptor getFunctionDescriptor(AbstractFunctionCallExpression expr,
IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
IFunctionDescriptor fd;
AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
fd = FormatUtils.getDefaultFormat().resolveFunction(expr, env);
return fd;
}
}