blob: d6864c1412ebc8b9f03203c3d6d01dc3d2f92142 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.api.common;
import java.io.IOException;
import java.io.PrintWriter;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.List;
import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator;
import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslatorFactory;
import org.apache.asterix.api.common.Job.SubmissionMode;
import org.apache.asterix.common.config.AsterixCompilerProperties;
import org.apache.asterix.common.config.AsterixExternalProperties;
import org.apache.asterix.common.config.OptimizationConfUtil;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.dataflow.data.common.AqlExpressionTypeComputer;
import org.apache.asterix.dataflow.data.common.AqlMergeAggregationExpressionFactory;
import org.apache.asterix.dataflow.data.common.AqlMissableTypeComputer;
import org.apache.asterix.dataflow.data.common.AqlPartialAggregationTypeComputer;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen;
import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory;
import org.apache.asterix.lang.common.base.IQueryRewriter;
import org.apache.asterix.lang.common.base.IRewriterFactory;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.rewrites.LangRewritingContext;
import org.apache.asterix.lang.common.statement.FunctionDecl;
import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.metadata.declared.AqlMetadataProvider;
import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.asterix.optimizer.base.RuleCollections;
import org.apache.asterix.result.ResultUtils;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder;
import org.apache.hyracks.algebricks.compiler.api.ICompiler;
import org.apache.hyracks.algebricks.compiler.api.ICompilerFactory;
import org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialFixpointRuleController;
import org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
import org.apache.hyracks.algebricks.core.algebra.expressions.IMissableTypeComputer;
import org.apache.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.PlanPlotter;
import org.apache.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
import org.apache.hyracks.algebricks.core.rewriter.base.AlgebricksOptimizationContext;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory;
import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.json.JSONException;
/**
* Provides helper methods for compilation of a query into a JobSpec and submission
* to Hyracks through the Hyracks client interface.
*/
public class APIFramework {
public static final String HTML_STATEMENT_SEPARATOR = "<!-- BEGIN -->";
private final IRewriterFactory rewriterFactory;
private final IAstPrintVisitorFactory astPrintVisitorFactory;
private final ILangExpressionToPlanTranslatorFactory translatorFactory;
public APIFramework(ILangCompilationProvider compilationProvider) {
this.rewriterFactory = compilationProvider.getRewriterFactory();
this.astPrintVisitorFactory = compilationProvider.getAstPrintVisitorFactory();
this.translatorFactory = compilationProvider.getExpressionToPlanTranslatorFactory();
}
private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> buildDefaultLogicalRewrites() {
List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> defaultLogicalRewrites = new ArrayList<>();
SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(false);
SequentialFixpointRuleController seqCtrlFullDfs = new SequentialFixpointRuleController(true);
SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
defaultLogicalRewrites.add(new Pair<>(seqOnceCtrl, RuleCollections.buildInitialTranslationRuleCollection()));
defaultLogicalRewrites.add(new Pair<>(seqOnceCtrl, RuleCollections.buildTypeInferenceRuleCollection()));
defaultLogicalRewrites.add(new Pair<>(seqOnceCtrl, RuleCollections.buildAutogenerateIDRuleCollection()));
defaultLogicalRewrites.add(new Pair<>(seqCtrlFullDfs, RuleCollections.buildNormalizationRuleCollection()));
defaultLogicalRewrites
.add(new Pair<>(seqCtrlNoDfs, RuleCollections.buildCondPushDownAndJoinInferenceRuleCollection()));
defaultLogicalRewrites.add(new Pair<>(seqCtrlFullDfs, RuleCollections.buildLoadFieldsRuleCollection()));
// fj
defaultLogicalRewrites.add(new Pair<>(seqCtrlFullDfs, RuleCollections.buildFuzzyJoinRuleCollection()));
//
defaultLogicalRewrites.add(new Pair<>(seqCtrlFullDfs, RuleCollections.buildNormalizationRuleCollection()));
defaultLogicalRewrites
.add(new Pair<>(seqCtrlNoDfs, RuleCollections.buildCondPushDownAndJoinInferenceRuleCollection()));
defaultLogicalRewrites.add(new Pair<>(seqCtrlFullDfs, RuleCollections.buildLoadFieldsRuleCollection()));
defaultLogicalRewrites.add(new Pair<>(seqOnceCtrl, RuleCollections.buildDataExchangeRuleCollection()));
defaultLogicalRewrites.add(new Pair<>(seqCtrlNoDfs, RuleCollections.buildConsolidationRuleCollection()));
defaultLogicalRewrites.add(new Pair<>(seqCtrlNoDfs, RuleCollections.buildAccessMethodRuleCollection()));
defaultLogicalRewrites.add(new Pair<>(seqCtrlNoDfs, RuleCollections.buildPlanCleanupRuleCollection()));
//put TXnRuleCollection!
return defaultLogicalRewrites;
}
private static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> buildDefaultPhysicalRewrites() {
List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> defaultPhysicalRewrites = new ArrayList<>();
SequentialOnceRuleController seqOnceCtrl = new SequentialOnceRuleController(true);
SequentialOnceRuleController seqOnceTopLevel = new SequentialOnceRuleController(false);
defaultPhysicalRewrites
.add(new Pair<>(seqOnceCtrl, RuleCollections.buildPhysicalRewritesAllLevelsRuleCollection()));
defaultPhysicalRewrites
.add(new Pair<>(seqOnceTopLevel, RuleCollections.buildPhysicalRewritesTopLevelRuleCollection()));
defaultPhysicalRewrites.add(new Pair<>(seqOnceCtrl, RuleCollections.prepareForJobGenRuleCollection()));
return defaultPhysicalRewrites;
}
private static class AqlOptimizationContextFactory implements IOptimizationContextFactory {
public static final AqlOptimizationContextFactory INSTANCE = new AqlOptimizationContextFactory();
private AqlOptimizationContextFactory() {
}
@Override
public IOptimizationContext createOptimizationContext(int varCounter,
IExpressionEvalSizeComputer expressionEvalSizeComputer,
IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer missableTypeComputer,
PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations) {
return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer,
mergeAggregationExpressionFactory, expressionTypeComputer, missableTypeComputer,
physicalOptimizationConfig, clusterLocations);
}
}
private void printPlanPrefix(SessionConfig conf, String planName) {
if (conf.is(SessionConfig.FORMAT_HTML)) {
conf.out().println("<h4>" + planName + ":</h4>");
conf.out().println("<pre>");
} else {
conf.out().println("----------" + planName + ":");
}
}
private void printPlanPostfix(SessionConfig conf) {
if (conf.is(SessionConfig.FORMAT_HTML)) {
conf.out().println("</pre>");
}
}
public Pair<Query, Integer> reWriteQuery(List<FunctionDecl> declaredFunctions, AqlMetadataProvider metadataProvider,
Query q, SessionConfig conf) throws AsterixException {
if (q == null) {
return null;
}
if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_EXPR_TREE)) {
conf.out().println();
printPlanPrefix(conf, "Expression tree");
q.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0);
printPlanPostfix(conf);
}
IQueryRewriter rw = rewriterFactory.createQueryRewriter();
rw.rewrite(declaredFunctions, q, metadataProvider, new LangRewritingContext(q.getVarCounter()));
return new Pair<>(q, q.getVarCounter());
}
public JobSpecification compileQuery(List<FunctionDecl> declaredFunctions,
AqlMetadataProvider queryMetadataProvider, Query rwQ, int varCounter, String outputDatasetName,
SessionConfig conf, ICompiledDmlStatement statement)
throws AlgebricksException, JSONException, RemoteException, ACIDException {
if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_REWRITTEN_EXPR_TREE)) {
conf.out().println();
printPlanPrefix(conf, "Rewritten expression tree");
if (rwQ != null) {
rwQ.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0);
}
printPlanPostfix(conf);
}
org.apache.asterix.common.transactions.JobId asterixJobId = JobIdFactory.generateJobId();
queryMetadataProvider.setJobId(asterixJobId);
ILangExpressionToPlanTranslator t =
translatorFactory.createExpressionToPlanTranslator(queryMetadataProvider, varCounter);
ILogicalPlan plan;
// statement = null when it's a query
if (statement == null || statement.getKind() != Statement.Kind.LOAD) {
plan = t.translate(rwQ, outputDatasetName, statement);
} else {
plan = t.translateLoad(statement);
}
if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_LOGICAL_PLAN)) {
conf.out().println();
printPlanPrefix(conf, "Logical plan");
if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) {
LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(conf.out());
PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
}
printPlanPostfix(conf);
}
//print the plot for the logical plan
AsterixExternalProperties xProps = AsterixAppContextInfo.getInstance().getExternalProperties();
Boolean plot = xProps.getIsPlottingEnabled();
if (plot) {
PlanPlotter.printLogicalPlan(plan);
}
AsterixCompilerProperties compilerProperties = AsterixAppContextInfo.getInstance().getCompilerProperties();
int frameSize = compilerProperties.getFrameSize();
int sortFrameLimit = (int) (compilerProperties.getSortMemorySize() / frameSize);
int groupFrameLimit = (int) (compilerProperties.getGroupMemorySize() / frameSize);
int joinFrameLimit = (int) (compilerProperties.getJoinMemorySize() / frameSize);
OptimizationConfUtil.getPhysicalOptimizationConfig().setFrameSize(frameSize);
OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalSort(sortFrameLimit);
OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesExternalGroupBy(groupFrameLimit);
OptimizationConfUtil.getPhysicalOptimizationConfig().setMaxFramesForJoin(joinFrameLimit);
HeuristicCompilerFactoryBuilder builder =
new HeuristicCompilerFactoryBuilder(AqlOptimizationContextFactory.INSTANCE);
builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig());
builder.setLogicalRewrites(buildDefaultLogicalRewrites());
builder.setPhysicalRewrites(buildDefaultPhysicalRewrites());
IDataFormat format = queryMetadataProvider.getFormat();
ICompilerFactory compilerFactory = builder.create();
builder.setExpressionEvalSizeComputer(format.getExpressionEvalSizeComputer());
builder.setIMergeAggregationExpressionFactory(new AqlMergeAggregationExpressionFactory());
builder.setPartialAggregationTypeComputer(new AqlPartialAggregationTypeComputer());
builder.setExpressionTypeComputer(AqlExpressionTypeComputer.INSTANCE);
builder.setMissableTypeComputer(AqlMissableTypeComputer.INSTANCE);
builder.setClusterLocations(queryMetadataProvider.getClusterLocations());
ICompiler compiler = compilerFactory.createCompiler(plan, queryMetadataProvider, t.getVarCounter());
if (conf.isOptimize()) {
compiler.optimize();
//plot optimized logical plan
if (plot) {
PlanPlotter.printOptimizedLogicalPlan(plan);
}
if (conf.is(SessionConfig.OOB_OPTIMIZED_LOGICAL_PLAN)) {
if (conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS)) {
// For Optimizer tests.
AlgebricksAppendable buffer = new AlgebricksAppendable(conf.out());
PlanPrettyPrinter.printPhysicalOps(plan, buffer, 0);
} else {
printPlanPrefix(conf, "Optimized logical plan");
if (rwQ != null || (statement != null && statement.getKind() == Statement.Kind.LOAD)) {
LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(conf.out());
PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
}
printPlanPostfix(conf);
}
}
}
if (rwQ != null && rwQ.isExplain()) {
try {
LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
ResultUtils.displayResults(pvisitor.get().toString(), conf, new ResultUtils.Stats(), null);
return null;
} catch (IOException e) {
throw new AlgebricksException(e);
}
}
if (!conf.isGenerateJobSpec()) {
return null;
}
builder.setBinaryBooleanInspectorFactory(format.getBinaryBooleanInspectorFactory());
builder.setBinaryIntegerInspectorFactory(format.getBinaryIntegerInspectorFactory());
builder.setComparatorFactoryProvider(format.getBinaryComparatorFactoryProvider());
builder.setExpressionRuntimeProvider(
new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(QueryLogicalExpressionJobGen.INSTANCE));
builder.setHashFunctionFactoryProvider(format.getBinaryHashFunctionFactoryProvider());
builder.setHashFunctionFamilyProvider(format.getBinaryHashFunctionFamilyProvider());
builder.setMissingWriterFactory(format.getMissingWriterFactory());
builder.setPredicateEvaluatorFactoryProvider(format.getPredicateEvaluatorFactoryProvider());
switch (conf.fmt()) {
case LOSSLESS_JSON:
builder.setPrinterProvider(format.getLosslessJSONPrinterFactoryProvider());
break;
case CSV:
builder.setPrinterProvider(format.getCSVPrinterFactoryProvider());
break;
case ADM:
builder.setPrinterProvider(format.getADMPrinterFactoryProvider());
break;
case CLEAN_JSON:
builder.setPrinterProvider(format.getCleanJSONPrinterFactoryProvider());
break;
default:
throw new RuntimeException("Unexpected OutputFormat!");
}
builder.setSerializerDeserializerProvider(format.getSerdeProvider());
builder.setTypeTraitProvider(format.getTypeTraitProvider());
builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider());
JobEventListenerFactory jobEventListenerFactory =
new JobEventListenerFactory(asterixJobId, queryMetadataProvider.isWriteTransaction());
JobSpecification spec = compiler.createJob(AsterixAppContextInfo.getInstance(), jobEventListenerFactory);
if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
printPlanPrefix(conf, "Hyracks job");
if (rwQ != null) {
conf.out().println(spec.toJSON().toString(1));
conf.out().println(spec.getUserConstraints());
}
printPlanPostfix(conf);
}
return spec;
}
public void executeJobArray(IHyracksClientConnection hcc, JobSpecification[] specs, PrintWriter out)
throws Exception {
for (JobSpecification spec : specs) {
spec.setMaxReattempts(0);
JobId jobId = hcc.startJob(spec);
long startTime = System.currentTimeMillis();
hcc.waitForCompletion(jobId);
long endTime = System.currentTimeMillis();
double duration = (endTime - startTime) / 1000.00;
out.println("<pre>Duration: " + duration + " sec</pre>");
}
}
public void executeJobArray(IHyracksClientConnection hcc, Job[] jobs, PrintWriter out) throws Exception {
for (Job job : jobs) {
job.getJobSpec().setMaxReattempts(0);
long startTime = System.currentTimeMillis();
try {
JobId jobId = hcc.startJob(job.getJobSpec());
if (job.getSubmissionMode() == SubmissionMode.ASYNCHRONOUS) {
continue;
}
hcc.waitForCompletion(jobId);
} catch (Exception e) {
e.printStackTrace();
continue;
}
long endTime = System.currentTimeMillis();
double duration = (endTime - startTime) / 1000.00;
out.println("<pre>Duration: " + duration + " sec</pre>");
}
}
}