blob: 6495af33fbe8da5abc743043a477b8e87461b375 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.plan;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.BuiltinStorages;
import org.apache.tajo.OverridableConf;
import org.apache.tajo.QueryVars;
import org.apache.tajo.SessionVars;
import org.apache.tajo.algebra.*;
import org.apache.tajo.algebra.WindowSpec;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.exception.*;
import org.apache.tajo.plan.LogicalPlan.QueryBlock;
import org.apache.tajo.plan.algebra.BaseAlgebraVisitor;
import org.apache.tajo.plan.expr.*;
import org.apache.tajo.plan.exprrewrite.EvalTreeOptimizer;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.nameresolver.NameResolvingMode;
import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule;
import org.apache.tajo.plan.util.ExprFinder;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.StringUtils;
import javax.annotation.Nullable;
import java.net.URI;
import java.util.*;
import static org.apache.tajo.algebra.CreateTable.PartitionType;
import static org.apache.tajo.plan.ExprNormalizer.ExprNormalizedResult;
import static org.apache.tajo.plan.LogicalPlan.BlockType;
import static org.apache.tajo.plan.verifier.SyntaxErrorUtil.makeSyntaxError;
/**
* This class creates a logical plan from a nested tajo algebra expression ({@link org.apache.tajo.algebra})
*/
public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContext, LogicalNode> {
private static Log LOG = LogFactory.getLog(LogicalPlanner.class);
private final CatalogService catalog;
private final StorageService storage;
private final LogicalPlanPreprocessor preprocessor;
private final EvalTreeOptimizer evalOptimizer;
private final ExprAnnotator exprAnnotator;
private final ExprNormalizer normalizer;
public LogicalPlanner(CatalogService catalog, StorageService storage) {
this.catalog = catalog;
this.storage = storage;
this.exprAnnotator = new ExprAnnotator(catalog);
this.preprocessor = new LogicalPlanPreprocessor(catalog, exprAnnotator);
this.normalizer = new ExprNormalizer();
this.evalOptimizer = new EvalTreeOptimizer();
}
public static class PlanContext {
OverridableConf queryContext;
LogicalPlan plan;
QueryBlock queryBlock;
EvalTreeOptimizer evalOptimizer;
TimeZone timeZone;
List<Expr> unplannedExprs = new ArrayList<>();
boolean debugOrUnitTests;
Integer noNameSubqueryId = 0;
public PlanContext(OverridableConf context, LogicalPlan plan, QueryBlock block, EvalTreeOptimizer evalOptimizer,
boolean debugOrUnitTests) {
this.queryContext = context;
this.plan = plan;
this.queryBlock = block;
this.evalOptimizer = evalOptimizer;
// session's time zone
String timezoneId = context.get(SessionVars.TIMEZONE);
this.timeZone = TimeZone.getTimeZone(timezoneId);
this.debugOrUnitTests = debugOrUnitTests;
}
public PlanContext(PlanContext context, QueryBlock block) {
this.queryContext = context.queryContext;
this.plan = context.plan;
this.queryBlock = block;
this.evalOptimizer = context.evalOptimizer;
this.debugOrUnitTests = context.debugOrUnitTests;
}
public QueryBlock getQueryBlock() {
return queryBlock;
}
public LogicalPlan getPlan() {
return plan;
}
public OverridableConf getQueryContext() {
return queryContext;
}
public String toString() {
return "block=" + queryBlock.getName() + ", relNum=" + queryBlock.getRelations().size() + ", "+
queryBlock.namedExprsMgr.toString();
}
/**
* It generates a unique table subquery name
*/
public String generateUniqueSubQueryName() {
return LogicalPlan.NONAME_SUBQUERY_PREFIX + noNameSubqueryId++;
}
}
/**
* This generates a logical plan.
*
* @param expr A relational algebraic expression for a query.
* @return A logical plan
*/
public LogicalPlan createPlan(OverridableConf context, Expr expr) throws TajoException {
return createPlan(context, expr, false);
}
@VisibleForTesting
public LogicalPlan createPlan(OverridableConf queryContext, Expr expr, boolean debug) throws TajoException {
LogicalPlan plan = new LogicalPlan();
QueryBlock rootBlock = plan.newAndGetBlock(LogicalPlan.ROOT_BLOCK);
PlanContext context = new PlanContext(queryContext, plan, rootBlock, evalOptimizer, debug);
preprocessor.process(context, expr);
plan.resetGeneratedId();
LogicalNode topMostNode = this.visit(context, new Stack<>(), expr);
// Add Root Node
LogicalRootNode root = plan.createNode(LogicalRootNode.class);
root.setInSchema(topMostNode.getOutSchema());
root.setChild(topMostNode);
root.setOutSchema(topMostNode.getOutSchema());
plan.getRootBlock().setRoot(root);
return plan;
}
public ExprAnnotator getExprAnnotator() {
return this.exprAnnotator;
}
public void preHook(PlanContext context, Stack<Expr> stack, Expr expr) throws TajoException {
context.queryBlock.updateCurrentNode(expr);
}
public LogicalNode postHook(PlanContext context, Stack<Expr> stack, Expr expr, LogicalNode current)
throws TajoException {
// Some generated logical nodes (e.g., implicit aggregation) without exprs will pass NULL as a expr parameter.
// We should skip them.
if (expr != null) {
// A relation list including a single ScanNode will return a ScanNode instance that already passed postHook.
// So, it skips the already-visited ScanNode instance.
if (expr.getType() == OpType.RelationList && current.getType() == NodeType.SCAN) {
return current;
}
}
QueryBlock queryBlock = context.queryBlock;
queryBlock.updateLatestNode(current);
// if this node is the topmost
if (stack.size() == 0) {
queryBlock.setRoot(current);
}
if (!stack.empty()) {
queryBlock.updateCurrentNode(stack.peek());
}
return current;
}
@Override
public LogicalNode visitSetSession(PlanContext context, Stack<Expr> stack, SetSession expr) throws TajoException {
QueryBlock block = context.queryBlock;
SetSessionNode setSessionNode = block.getNodeFromExpr(expr);
setSessionNode.init(expr.getName(), expr.getValue());
return setSessionNode;
}
public LogicalNode visitExplain(PlanContext ctx, Stack<Expr> stack, Explain expr) throws TajoException {
ctx.plan.setExplain(expr.isGlobal());
return visit(ctx, stack, expr.getChild());
}
/*===============================================================================================
Data Manupulation Language (DML) SECTION
===============================================================================================*/
/*===============================================================================================
PROJECTION SECTION
===============================================================================================*/
@Override
public LogicalNode visitProjection(PlanContext context, Stack<Expr> stack, Projection projection)
throws TajoException {
QueryBlock block = context.queryBlock;
// If a non-from statement is given
if (!projection.hasChild()) {
return buildPlanForNoneFromStatement(context, stack, projection);
}
String [] referenceNames;
// in prephase, insert all target list into NamedExprManagers.
// Then it gets reference names, each of which points an expression in target list.
Pair<String [], ExprNormalizer.WindowSpecReferences []> referencesPair = doProjectionPrephase(context, projection);
referenceNames = referencesPair.getFirst();
////////////////////////////////////////////////////////
// Visit and Build Child Plan
////////////////////////////////////////////////////////
stack.push(projection);
LogicalNode child = visit(context, stack, projection.getChild());
// check if it is implicit aggregation. If so, it inserts group-by node to its child.
if (block.isAggregationRequired()) {
child = insertGroupbyNode(context, child, stack);
}
if (block.hasWindowSpecs()) {
LogicalNode windowAggNode =
insertWindowAggNode(context, child, stack, referenceNames, referencesPair.getSecond());
if (windowAggNode != null) {
child = windowAggNode;
}
}
stack.pop();
////////////////////////////////////////////////////////
ProjectionNode projectionNode;
List<Target> targets;
targets = buildTargets(context, referenceNames);
// Set ProjectionNode
projectionNode = context.queryBlock.getNodeFromExpr(projection);
projectionNode.init(projection.isDistinct(), targets);
projectionNode.setChild(child);
projectionNode.setInSchema(child.getOutSchema());
projectionNode.setOutSchema(PlannerUtil.targetToSchema(targets));
if (projection.isDistinct() && block.hasNode(NodeType.GROUP_BY)) {
throw makeSyntaxError("Cannot support grouping and distinct at the same time yet");
} else {
if (projection.isDistinct()) {
insertDistinctOperator(context, projectionNode, child, stack);
}
}
// It's for debugging and unit tests purpose.
// It sets raw targets, all of them are raw expressions instead of references.
if (context.debugOrUnitTests) {
setRawTargets(context, targets, referenceNames, projection);
}
verifyProjectedFields(block, projectionNode);
return projectionNode;
}
private void setRawTargets(PlanContext context, List<Target> targets, String[] referenceNames,
Projection projection) throws TajoException {
LogicalPlan plan = context.plan;
QueryBlock block = context.queryBlock;
// It's for debugging or unit tests.
List<Target> rawTargets = new ArrayList<>();
for (int i = 0; i < projection.getNamedExprs().size(); i++) {
NamedExpr namedExpr = projection.getNamedExprs().get(i);
EvalNode evalNode = exprAnnotator.createEvalNode(context, namedExpr.getExpr(),
NameResolvingMode.RELS_AND_SUBEXPRS);
rawTargets.add(new Target(evalNode, referenceNames[i]));
}
// it's for debugging or unit testing
block.setRawTargets(rawTargets);
}
private void insertDistinctOperator(PlanContext context, ProjectionNode projectionNode, LogicalNode child,
Stack<Expr> stack) throws TajoException {
LogicalPlan plan = context.plan;
QueryBlock block = context.queryBlock;
if (child.getType() == NodeType.SORT) {
SortNode sortNode = (SortNode) child;
GroupbyNode dupRemoval = context.plan.createNode(GroupbyNode.class);
dupRemoval.setForDistinctBlock();
dupRemoval.setChild(sortNode.getChild());
dupRemoval.setInSchema(sortNode.getInSchema());
dupRemoval.setTargets(PlannerUtil.schemaToTargets(sortNode.getInSchema()));
dupRemoval.setGroupingColumns(sortNode.getInSchema().toArray());
block.registerNode(dupRemoval);
postHook(context, stack, null, dupRemoval);
sortNode.setChild(dupRemoval);
sortNode.setInSchema(dupRemoval.getOutSchema());
sortNode.setOutSchema(dupRemoval.getOutSchema());
projectionNode.setInSchema(sortNode.getOutSchema());
projectionNode.setChild(sortNode);
} else {
Schema outSchema = projectionNode.getOutSchema();
GroupbyNode dupRemoval = context.plan.createNode(GroupbyNode.class);
dupRemoval.setForDistinctBlock();
dupRemoval.setChild(child);
dupRemoval.setInSchema(projectionNode.getInSchema());
dupRemoval.setTargets(PlannerUtil.schemaToTargets(outSchema));
dupRemoval.setGroupingColumns(outSchema.toArray());
block.registerNode(dupRemoval);
postHook(context, stack, null, dupRemoval);
projectionNode.setChild(dupRemoval);
projectionNode.setInSchema(dupRemoval.getOutSchema());
}
}
private Pair<String [], ExprNormalizer.WindowSpecReferences []> doProjectionPrephase(PlanContext context,
Projection projection)
throws TajoException {
QueryBlock block = context.queryBlock;
int finalTargetNum = projection.size();
String [] referenceNames = new String[finalTargetNum];
ExprNormalizedResult[] normalizedExprList = new ExprNormalizedResult[finalTargetNum];
List<ExprNormalizer.WindowSpecReferences> windowSpecReferencesList = new ArrayList<>();
List<Integer> targetsIds = normalize(context, projection, normalizedExprList, new Matcher() {
@Override
public boolean isMatch(Expr expr) {
return ExprFinder.finds(expr, OpType.WindowFunction).size() == 0;
}
});
// Note: Why separate normalization and add(Named)Expr?
//
// ExprNormalizer internally makes use of the named exprs in NamedExprsManager.
// If we don't separate normalization work and addExprWithName, addExprWithName will find named exprs evaluated
// the same logical node. It will cause impossible evaluation in physical executors.
addNamedExprs(block, referenceNames, normalizedExprList, windowSpecReferencesList, projection, targetsIds);
targetsIds = normalize(context, projection, normalizedExprList, new Matcher() {
@Override
public boolean isMatch(Expr expr) {
return ExprFinder.finds(expr, OpType.WindowFunction).size() > 0;
}
});
addNamedExprs(block, referenceNames, normalizedExprList, windowSpecReferencesList, projection, targetsIds);
return new Pair<>(referenceNames,
windowSpecReferencesList.toArray(new ExprNormalizer.WindowSpecReferences[windowSpecReferencesList.size()]));
}
private interface Matcher {
boolean isMatch(Expr expr);
}
public List<Integer> normalize(PlanContext context, Projection projection, ExprNormalizedResult [] normalizedExprList,
Matcher matcher) throws TajoException {
List<Integer> targetIds = new ArrayList<>();
for (int i = 0; i < projection.size(); i++) {
NamedExpr namedExpr = projection.getNamedExprs().get(i);
if (PlannerUtil.existsAggregationFunction(namedExpr)) {
context.queryBlock.setAggregationRequire();
}
if (matcher.isMatch(namedExpr.getExpr())) {
// If a value is constant value, it adds the constant value with a proper name to the constant map
// of the current block
if (!namedExpr.hasAlias() && OpType.isLiteralType(namedExpr.getExpr().getType())) {
String generatedName = context.plan.generateUniqueColumnName(namedExpr.getExpr());
ConstEval constEval = (ConstEval) exprAnnotator.createEvalNode(context, namedExpr.getExpr(),
NameResolvingMode.RELS_ONLY);
context.getQueryBlock().addConstReference(generatedName, namedExpr.getExpr(), constEval);
normalizedExprList[i] = new ExprNormalizedResult(context, false);
normalizedExprList[i].baseExpr = new ColumnReferenceExpr(generatedName);
} else {
// dissect an expression into multiple parts (at most dissected into three parts)
normalizedExprList[i] = normalizer.normalize(context, namedExpr.getExpr());
}
targetIds.add(i);
}
}
return targetIds;
}
private void addNamedExprs(QueryBlock block, String [] referenceNames, ExprNormalizedResult [] normalizedExprList,
List<ExprNormalizer.WindowSpecReferences> windowSpecReferencesList, Projection projection,
List<Integer> targetIds) throws TajoException {
for (int i : targetIds) {
NamedExpr namedExpr = projection.getNamedExprs().get(i);
// Get all projecting references
if (namedExpr.hasAlias()) {
NamedExpr aliasedExpr = new NamedExpr(normalizedExprList[i].baseExpr, namedExpr.getAlias());
referenceNames[i] = block.namedExprsMgr.addNamedExpr(aliasedExpr);
} else {
referenceNames[i] = block.namedExprsMgr.addExpr(normalizedExprList[i].baseExpr);
}
// Add sub-expressions (i.e., aggregation part and scalar part) from dissected parts.
block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].aggExprs);
block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].scalarExprs);
block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].windowAggExprs);
windowSpecReferencesList.addAll(normalizedExprList[i].windowSpecs);
}
}
/**
* It builds non-from statement (only expressions) like '<code>SELECT 1+3 as plus</code>'.
*/
private EvalExprNode buildPlanForNoneFromStatement(PlanContext context, Stack<Expr> stack, Projection projection)
throws TajoException {
LogicalPlan plan = context.plan;
QueryBlock block = context.queryBlock;
int finalTargetNum = projection.getNamedExprs().size();
List<Target> targets = new ArrayList<>();
for (int i = 0; i < finalTargetNum; i++) {
NamedExpr namedExpr = projection.getNamedExprs().get(i);
EvalNode evalNode = exprAnnotator.createEvalNode(context, namedExpr.getExpr(), NameResolvingMode.RELS_ONLY);
if (namedExpr.hasAlias()) {
targets.add(new Target(evalNode, namedExpr.getAlias()));
} else {
targets.add(new Target(evalNode, context.plan.generateUniqueColumnName(namedExpr.getExpr())));
}
}
EvalExprNode evalExprNode = context.queryBlock.getNodeFromExpr(projection);
evalExprNode.setTargets(targets);
evalExprNode.setOutSchema(PlannerUtil.targetToSchema(targets));
// it's for debugging or unit testing
block.setRawTargets(targets);
return evalExprNode;
}
private List<Target> buildTargets(PlanContext context, String[] referenceNames)
throws TajoException {
QueryBlock block = context.queryBlock;
List<Target> targets = new ArrayList<>();
for (String refName : referenceNames) {
if (block.isConstReference(refName)) {
targets.add(new Target(block.getConstByReference(refName), refName));
} else if (block.namedExprsMgr.isEvaluated(refName)) {
targets.add(block.namedExprsMgr.getTarget(refName));
} else {
NamedExpr namedExpr = block.namedExprsMgr.getNamedExpr(refName);
EvalNode evalNode = exprAnnotator.createEvalNode(context, namedExpr.getExpr(),
NameResolvingMode.RELS_AND_SUBEXPRS);
block.namedExprsMgr.markAsEvaluated(refName, evalNode);
targets.add(new Target(evalNode, refName));
}
}
return targets;
}
/**
* It checks if all targets of Projectable plan node can be evaluated from the child node.
* It can avoid potential errors which possibly occur in physical operators.
*
* @param block QueryBlock which includes the Projectable node
* @param projectable Projectable node to be valid
* @throws TajoException
*/
public static void verifyProjectedFields(QueryBlock block, Projectable projectable) throws TajoException {
if (projectable instanceof GroupbyNode) {
GroupbyNode groupbyNode = (GroupbyNode) projectable;
if (!groupbyNode.isEmptyGrouping()) { // it should be targets instead of
int groupingKeyNum = groupbyNode.getGroupingColumns().length;
for (int i = 0; i < groupingKeyNum; i++) {
Target target = groupbyNode.getTargets().get(i);
if (groupbyNode.getTargets().get(i).getEvalTree().getType() == EvalType.FIELD) {
FieldEval grpKeyEvalNode = target.getEvalTree();
if (!groupbyNode.getInSchema().contains(grpKeyEvalNode.getColumnRef())) {
throwCannotEvaluateException(projectable, grpKeyEvalNode.getName());
}
}
}
}
if (groupbyNode.hasAggFunctions()) {
verifyIfEvalNodesCanBeEvaluated(projectable, (List<EvalNode>)(List<?>) groupbyNode.getAggFunctions());
}
} else if (projectable instanceof WindowAggNode) {
WindowAggNode windowAggNode = (WindowAggNode) projectable;
if (windowAggNode.hasPartitionKeys()) {
verifyIfColumnCanBeEvaluated(projectable.getInSchema(), projectable, windowAggNode.getPartitionKeys());
}
if (windowAggNode.hasAggFunctions()) {
verifyIfEvalNodesCanBeEvaluated(projectable, Arrays.asList(windowAggNode.getWindowFunctions()));
}
if (windowAggNode.hasSortSpecs()) {
Column [] sortKeys = PlannerUtil.sortSpecsToSchema(windowAggNode.getSortSpecs()).toArray();
verifyIfColumnCanBeEvaluated(projectable.getInSchema(), projectable, sortKeys);
}
// verify targets except for function slots
for (int i = 0; i < windowAggNode.getTargets().size() - windowAggNode.getWindowFunctions().length; i++) {
Target target = windowAggNode.getTargets().get(i);
Set<Column> columns = EvalTreeUtil.findUniqueColumns(target.getEvalTree());
for (Column c : columns) {
if (!windowAggNode.getInSchema().contains(c)) {
throwCannotEvaluateException(projectable, c.getQualifiedName());
}
}
}
} else if (projectable instanceof RelationNode) {
RelationNode relationNode = (RelationNode) projectable;
prohibitNestedRecordProjection((Projectable) relationNode);
verifyIfTargetsCanBeEvaluated(relationNode.getLogicalSchema(), (Projectable) relationNode);
} else {
prohibitNestedRecordProjection(projectable);
verifyIfTargetsCanBeEvaluated(projectable.getInSchema(), projectable);
}
}
public static void prohibitNestedRecordProjection(Projectable projectable)
throws TajoException {
for (Target t : projectable.getTargets()) {
if (t.getEvalTree().getValueType().isStruct()) {
throw new NotImplementedException("record field projection");
}
}
}
public static void verifyIfEvalNodesCanBeEvaluated(Projectable projectable, List<EvalNode> evalNodes)
throws TajoException {
for (EvalNode e : evalNodes) {
Set<Column> columns = EvalTreeUtil.findUniqueColumns(e);
for (Column c : columns) {
if (!projectable.getInSchema().contains(c)) {
throwCannotEvaluateException(projectable, c.getQualifiedName());
}
}
}
}
public static void verifyIfTargetsCanBeEvaluated(Schema baseSchema, Projectable projectable)
throws TajoException {
for (Target target : projectable.getTargets()) {
Set<Column> columns = EvalTreeUtil.findUniqueColumns(target.getEvalTree());
for (Column c : columns) {
if (!baseSchema.contains(c)) {
throwCannotEvaluateException(projectable, c.getQualifiedName());
}
}
}
}
public static void verifyIfColumnCanBeEvaluated(Schema baseSchema, Projectable projectable, Column [] columns)
throws TajoException {
for (Column c : columns) {
if (!baseSchema.contains(c)) {
throwCannotEvaluateException(projectable, c.getQualifiedName());
}
}
}
public static void throwCannotEvaluateException(Projectable projectable, String columnName) throws TajoException {
if (projectable instanceof UnaryNode && ((UnaryNode) projectable).getChild().getType() == NodeType.GROUP_BY) {
throw makeSyntaxError(columnName
+ " must appear in the GROUP BY clause or be used in an aggregate function at node ("
+ projectable.getPID() + ")");
} else {
throw makeSyntaxError(String.format("Cannot evaluate the field \"%s\" at node (%d)",
columnName, projectable.getPID()));
}
}
private LogicalNode insertWindowAggNode(PlanContext context, LogicalNode child, Stack<Expr> stack,
String [] referenceNames,
ExprNormalizer.WindowSpecReferences [] windowSpecReferenceses)
throws TajoException {
LogicalPlan plan = context.plan;
QueryBlock block = context.queryBlock;
WindowAggNode windowAggNode = context.plan.createNode(WindowAggNode.class);
if (child.getType() == NodeType.LIMIT) {
LimitNode limitNode = (LimitNode) child;
windowAggNode.setChild(limitNode.getChild());
windowAggNode.setInSchema(limitNode.getChild().getOutSchema());
limitNode.setChild(windowAggNode);
} else if (child.getType() == NodeType.SORT) {
SortNode sortNode = (SortNode) child;
windowAggNode.setChild(sortNode.getChild());
windowAggNode.setInSchema(sortNode.getChild().getOutSchema());
sortNode.setChild(windowAggNode);
} else {
windowAggNode.setChild(child);
windowAggNode.setInSchema(child.getOutSchema());
}
List<String> winFuncRefs = new ArrayList<>();
List<WindowFunctionEval> winFuncs = new ArrayList<>();
List<WindowSpec> rawWindowSpecs = Lists.newArrayList();
for (Iterator<NamedExpr> it = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); it.hasNext();) {
NamedExpr rawTarget = it.next();
try {
EvalNode evalNode = exprAnnotator.createEvalNode(context, rawTarget.getExpr(),
NameResolvingMode.SUBEXPRS_AND_RELS);
if (evalNode.getType() == EvalType.WINDOW_FUNCTION) {
winFuncRefs.add(rawTarget.getAlias());
winFuncs.add((WindowFunctionEval) evalNode);
block.namedExprsMgr.markAsEvaluated(rawTarget.getAlias(), evalNode);
// TODO - Later, we also consider the possibility that a window function contains only a window name.
rawWindowSpecs.add(((WindowFunctionExpr) (rawTarget.getExpr())).getWindowSpec());
}
} catch (UndefinedColumnException uc) {
}
}
// we only consider one window definition.
if (windowSpecReferenceses[0].hasPartitionKeys()) {
Column [] partitionKeyColumns = new Column[windowSpecReferenceses[0].getPartitionKeys().length];
int i = 0;
for (String partitionKey : windowSpecReferenceses[0].getPartitionKeys()) {
if (block.namedExprsMgr.isEvaluated(partitionKey)) {
partitionKeyColumns[i++] = block.namedExprsMgr.getTarget(partitionKey).getNamedColumn();
} else {
throw makeSyntaxError("Each grouping column expression must be a scalar expression.");
}
}
windowAggNode.setPartitionKeys(partitionKeyColumns);
}
SortSpec [][] sortGroups = new SortSpec[rawWindowSpecs.size()][];
for (int winSpecIdx = 0; winSpecIdx < rawWindowSpecs.size(); winSpecIdx++) {
WindowSpec spec = rawWindowSpecs.get(winSpecIdx);
if (spec.hasOrderBy()) {
Sort.SortSpec [] sortSpecs = spec.getSortSpecs();
int sortNum = sortSpecs.length;
String [] sortKeyRefNames = windowSpecReferenceses[winSpecIdx].getOrderKeys();
SortSpec [] annotatedSortSpecs = new SortSpec[sortNum];
Column column;
for (int i = 0; i < sortNum; i++) {
if (block.namedExprsMgr.isEvaluated(sortKeyRefNames[i])) {
column = block.namedExprsMgr.getTarget(sortKeyRefNames[i]).getNamedColumn();
} else {
throw new IllegalStateException("Unexpected State: " + StringUtils.join(sortSpecs));
}
annotatedSortSpecs[i] = new SortSpec(column, sortSpecs[i].isAscending(), sortSpecs[i].isNullsFirst());
}
sortGroups[winSpecIdx] = annotatedSortSpecs;
} else {
sortGroups[winSpecIdx] = null;
}
}
for (int i = 0; i < winFuncRefs.size(); i++) {
WindowFunctionEval winFunc = winFuncs.get(i);
if (sortGroups[i] != null) {
winFunc.setSortSpecs(sortGroups[i]);
}
}
List<Target> targets = new ArrayList<>();
List<Integer> windowFuncIndices = Lists.newArrayList();
Projection projection = (Projection) stack.peek();
int windowFuncIdx = 0;
for (NamedExpr expr : projection.getNamedExprs()) {
if (expr.getExpr().getType() == OpType.WindowFunction) {
windowFuncIndices.add(windowFuncIdx);
}
windowFuncIdx++;
}
windowAggNode.setWindowFunctions(winFuncs.toArray(new WindowFunctionEval[winFuncs.size()]));
for (int i = 0; i < referenceNames.length ; i++) {
if (!windowFuncIndices.contains(i)) {
if (block.isConstReference(referenceNames[i])) {
targets.add(new Target(block.getConstByReference(referenceNames[i]), referenceNames[i]));
} else {
targets.add(block.namedExprsMgr.getTarget(referenceNames[i]));
}
}
}
for (String winFuncRef : winFuncRefs) {
targets.add(block.namedExprsMgr.getTarget(winFuncRef));
}
windowAggNode.setTargets(targets);
verifyProjectedFields(block, windowAggNode);
block.registerNode(windowAggNode);
postHook(context, stack, null, windowAggNode);
if (child.getType() == NodeType.LIMIT) {
LimitNode limitNode = (LimitNode) child;
limitNode.setInSchema(windowAggNode.getOutSchema());
limitNode.setOutSchema(windowAggNode.getOutSchema());
return null;
} else if (child.getType() == NodeType.SORT) {
SortNode sortNode = (SortNode) child;
sortNode.setInSchema(windowAggNode.getOutSchema());
sortNode.setOutSchema(windowAggNode.getOutSchema());
return null;
} else {
return windowAggNode;
}
}
/**
* Insert a group-by operator before a sort or a projection operator.
* It is used only when a group-by clause is not given.
*/
private LogicalNode insertGroupbyNode(PlanContext context, LogicalNode child, Stack<Expr> stack)
throws TajoException {
LogicalPlan plan = context.plan;
QueryBlock block = context.queryBlock;
// The limit operation must affect to the number of results, not the number of input records.
// Thus, the aggregation must be carried out before the limit operation.
if (child.getType() == NodeType.LIMIT) {
child = ((LimitNode)child).getChild();
}
GroupbyNode groupbyNode = context.plan.createNode(GroupbyNode.class);
groupbyNode.setChild(child);
groupbyNode.setInSchema(child.getOutSchema());
groupbyNode.setGroupingColumns(new Column[] {});
Set<String> aggEvalNames = new LinkedHashSet<>();
Set<AggregationFunctionCallEval> aggEvals = new LinkedHashSet<>();
boolean includeDistinctFunction = false;
for (Iterator<NamedExpr> it = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); it.hasNext();) {
NamedExpr rawTarget = it.next();
try {
// check if at least distinct aggregation function
includeDistinctFunction |= PlannerUtil.existsDistinctAggregationFunction(rawTarget.getExpr());
EvalNode evalNode = exprAnnotator.createEvalNode(context, rawTarget.getExpr(),
NameResolvingMode.SUBEXPRS_AND_RELS);
if (evalNode.getType() == EvalType.AGG_FUNCTION) {
aggEvalNames.add(rawTarget.getAlias());
aggEvals.add((AggregationFunctionCallEval) evalNode);
block.namedExprsMgr.markAsEvaluated(rawTarget.getAlias(), evalNode);
}
} catch (UndefinedColumnException ve) {
}
}
groupbyNode.setDistinct(includeDistinctFunction);
groupbyNode.setAggFunctions(new ArrayList<>(aggEvals));
List<Target> targets = ProjectionPushDownRule.buildGroupByTarget(groupbyNode, null,
aggEvalNames.toArray(new String[aggEvalNames.size()]));
groupbyNode.setTargets(targets);
// this inserted group-by node doesn't pass through preprocessor. So manually added.
block.registerNode(groupbyNode);
postHook(context, stack, null, groupbyNode);
verifyProjectedFields(block, groupbyNode);
return groupbyNode;
}
/*===============================================================================================
SORT SECTION
===============================================================================================*/
@Override
public LimitNode visitLimit(PlanContext context, Stack<Expr> stack, Limit limit) throws TajoException {
QueryBlock block = context.queryBlock;
EvalNode firstFetNum;
LogicalNode child;
if (limit.getFetchFirstNum().getType() == OpType.Literal) {
firstFetNum = exprAnnotator.createEvalNode(context, limit.getFetchFirstNum(),
NameResolvingMode.RELS_ONLY);
////////////////////////////////////////////////////////
// Visit and Build Child Plan
////////////////////////////////////////////////////////
stack.push(limit);
child = visit(context, stack, limit.getChild());
stack.pop();
////////////////////////////////////////////////////////
} else {
ExprNormalizedResult normalizedResult = normalizer.normalize(context, limit.getFetchFirstNum());
String referName = block.namedExprsMgr.addExpr(normalizedResult.baseExpr);
block.namedExprsMgr.addNamedExprArray(normalizedResult.aggExprs);
block.namedExprsMgr.addNamedExprArray(normalizedResult.scalarExprs);
////////////////////////////////////////////////////////
// Visit and Build Child Plan
////////////////////////////////////////////////////////
stack.push(limit);
child = visit(context, stack, limit.getChild());
stack.pop();
////////////////////////////////////////////////////////
if (block.namedExprsMgr.isEvaluated(referName)) {
firstFetNum = block.namedExprsMgr.getTarget(referName).getEvalTree();
} else {
NamedExpr namedExpr = block.namedExprsMgr.getNamedExpr(referName);
firstFetNum = exprAnnotator.createEvalNode(context, namedExpr.getExpr(), NameResolvingMode.SUBEXPRS_AND_RELS);
block.namedExprsMgr.markAsEvaluated(referName, firstFetNum);
}
}
LimitNode limitNode = block.getNodeFromExpr(limit);
limitNode.setChild(child);
limitNode.setInSchema(child.getOutSchema());
limitNode.setOutSchema(child.getOutSchema());
firstFetNum.bind(null, null);
limitNode.setFetchFirst(firstFetNum.eval(null).asInt8());
return limitNode;
}
@Override
public LogicalNode visitSort(PlanContext context, Stack<Expr> stack, Sort sort) throws TajoException {
QueryBlock block = context.queryBlock;
int sortKeyNum = sort.getSortSpecs().length;
Sort.SortSpec[] sortSpecs = sort.getSortSpecs();
String [] referNames = new String[sortKeyNum];
ExprNormalizedResult [] normalizedExprList = new ExprNormalizedResult[sortKeyNum];
for (int i = 0; i < sortKeyNum; i++) {
normalizedExprList[i] = normalizer.normalize(context, sortSpecs[i].getKey());
}
for (int i = 0; i < sortKeyNum; i++) {
referNames[i] = block.namedExprsMgr.addExpr(normalizedExprList[i].baseExpr);
block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].aggExprs);
block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].scalarExprs);
}
////////////////////////////////////////////////////////
// Visit and Build Child Plan
////////////////////////////////////////////////////////
stack.push(sort);
LogicalNode child = visit(context, stack, sort.getChild());
if (block.isAggregationRequired()) {
child = insertGroupbyNode(context, child, stack);
}
stack.pop();
////////////////////////////////////////////////////////
SortNode sortNode = block.getNodeFromExpr(sort);
sortNode.setChild(child);
sortNode.setInSchema(child.getOutSchema());
sortNode.setOutSchema(child.getOutSchema());
// Building sort keys
SortSpec[] annotatedSortSpecs = annotateSortSpecs(block, referNames, sortSpecs);
if (annotatedSortSpecs.length == 0) {
return child;
} else {
sortNode.setSortSpecs(annotatedSortSpecs);
return sortNode;
}
}
private static SortSpec[] annotateSortSpecs(QueryBlock block, String [] referNames, Sort.SortSpec[] rawSortSpecs) {
int sortKeyNum = rawSortSpecs.length;
Column column;
List<SortSpec> annotatedSortSpecs = Lists.newArrayList();
for (int i = 0; i < sortKeyNum; i++) {
String refName = referNames[i];
if (block.isConstReference(refName)) {
continue;
} else if (block.namedExprsMgr.isEvaluated(refName)) {
column = block.namedExprsMgr.getTarget(refName).getNamedColumn();
} else {
throw new IllegalStateException("Unexpected State: " + StringUtils.join(rawSortSpecs));
}
annotatedSortSpecs.add(new SortSpec(column, rawSortSpecs[i].isAscending(), rawSortSpecs[i].isNullsFirst()));
}
return annotatedSortSpecs.toArray(new SortSpec[annotatedSortSpecs.size()]);
}
/*===============================================================================================
GROUP BY SECTION
===============================================================================================*/
@Override
public LogicalNode visitHaving(PlanContext context, Stack<Expr> stack, Having expr) throws TajoException {
QueryBlock block = context.queryBlock;
ExprNormalizedResult normalizedResult = normalizer.normalize(context, expr.getQual());
String referName = block.namedExprsMgr.addExpr(normalizedResult.baseExpr);
block.namedExprsMgr.addNamedExprArray(normalizedResult.aggExprs);
block.namedExprsMgr.addNamedExprArray(normalizedResult.scalarExprs);
////////////////////////////////////////////////////////
// Visit and Build Child Plan
////////////////////////////////////////////////////////
stack.push(expr);
LogicalNode child = visit(context, stack, expr.getChild());
stack.pop();
////////////////////////////////////////////////////////
HavingNode having = context.queryBlock.getNodeFromExpr(expr);
having.setChild(child);
having.setInSchema(child.getOutSchema());
having.setOutSchema(child.getOutSchema());
EvalNode havingCondition;
if (block.namedExprsMgr.isEvaluated(referName)) {
havingCondition = block.namedExprsMgr.getTarget(referName).getEvalTree();
} else {
NamedExpr namedExpr = block.namedExprsMgr.getNamedExpr(referName);
havingCondition = exprAnnotator.createEvalNode(context, namedExpr.getExpr(),
NameResolvingMode.SUBEXPRS_AND_RELS);
block.namedExprsMgr.markAsEvaluated(referName, havingCondition);
}
// set having condition
having.setQual(havingCondition);
return having;
}
@Override
public LogicalNode visitGroupBy(PlanContext context, Stack<Expr> stack, Aggregation aggregation)
throws TajoException {
// Initialization Phase:
LogicalPlan plan = context.plan;
QueryBlock block = context.queryBlock;
// Normalize grouping keys and add normalized grouping keys to NamedExprManager
int groupingKeyNum = aggregation.getGroupSet()[0].getGroupingSets().length;
ExprNormalizedResult [] normalizedResults = new ExprNormalizedResult[groupingKeyNum];
for (int i = 0; i < groupingKeyNum; i++) {
Expr groupingKey = aggregation.getGroupSet()[0].getGroupingSets()[i];
normalizedResults[i] = normalizer.normalize(context, groupingKey);
}
String [] groupingKeyRefNames = new String[groupingKeyNum];
for (int i = 0; i < groupingKeyNum; i++) {
groupingKeyRefNames[i] = block.namedExprsMgr.addExpr(normalizedResults[i].baseExpr);
block.namedExprsMgr.addNamedExprArray(normalizedResults[i].aggExprs);
block.namedExprsMgr.addNamedExprArray(normalizedResults[i].scalarExprs);
}
////////////////////////////////////////////////////////
// Visit and Build Child Plan
////////////////////////////////////////////////////////
stack.push(aggregation);
LogicalNode child = visit(context, stack, aggregation.getChild());
stack.pop();
////////////////////////////////////////////////////////
GroupbyNode groupingNode = context.queryBlock.getNodeFromExpr(aggregation);
groupingNode.setChild(child);
groupingNode.setInSchema(child.getOutSchema());
// Set grouping sets
List<Column> groupingColumns = Lists.newArrayList();
for (String refName : groupingKeyRefNames) {
if (context.getQueryBlock().isConstReference(refName)) {
continue;
} else if (block.namedExprsMgr.isEvaluated(refName)) {
groupingColumns.add(block.namedExprsMgr.getTarget(refName).getNamedColumn());
} else {
throw makeSyntaxError("Each grouping column expression must be a scalar expression.");
}
}
int effectiveGroupingKeyNum = groupingColumns.size();
groupingNode.setGroupingColumns(groupingColumns.toArray(new Column[effectiveGroupingKeyNum]));
////////////////////////////////////////////////////////
// Visit and Build Child Plan
////////////////////////////////////////////////////////
// create EvalNodes and check if each EvalNode can be evaluated here.
List<String> aggEvalNames = new ArrayList<>();
List<AggregationFunctionCallEval> aggEvalNodes = new ArrayList<>();
boolean includeDistinctFunction = false;
for (Iterator<NamedExpr> iterator = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); iterator.hasNext();) {
NamedExpr namedExpr = iterator.next();
try {
includeDistinctFunction |= PlannerUtil.existsDistinctAggregationFunction(namedExpr.getExpr());
EvalNode evalNode = exprAnnotator.createEvalNode(context, namedExpr.getExpr(),
NameResolvingMode.SUBEXPRS_AND_RELS);
if (evalNode.getType() == EvalType.AGG_FUNCTION) {
block.namedExprsMgr.markAsEvaluated(namedExpr.getAlias(), evalNode);
aggEvalNames.add(namedExpr.getAlias());
aggEvalNodes.add((AggregationFunctionCallEval) evalNode);
}
} catch (UndefinedColumnException ve) {
}
}
// if there is at least one distinct aggregation function
groupingNode.setDistinct(includeDistinctFunction);
groupingNode.setAggFunctions(aggEvalNodes);
List<Target> targets = new ArrayList<>();
// In target, grouping columns will be followed by aggregation evals.
//
// col1, col2, col3, sum(..), agv(..)
// ^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^
// grouping keys aggregation evals
// Build grouping keys
for (int i = 0; i < effectiveGroupingKeyNum; i++) {
targets.add(block.namedExprsMgr.getTarget(groupingNode.getGroupingColumns()[i].getQualifiedName()));
}
for (int i = 0; i < aggEvalNodes.size(); i++) {
targets.add(block.namedExprsMgr.getTarget(aggEvalNames.get(i)));
}
groupingNode.setTargets(targets);
block.unsetAggregationRequire();
verifyProjectedFields(block, groupingNode);
return groupingNode;
}
private static final Column[] ALL= Lists.newArrayList().toArray(new Column[0]);
public static List<Column[]> generateCuboids(Column[] columns) {
int numCuboids = (int) Math.pow(2, columns.length);
int maxBits = columns.length;
List<Column[]> cube = Lists.newArrayList();
List<Column> cuboidCols;
cube.add(ALL);
for (int cuboidId = 1; cuboidId < numCuboids; cuboidId++) {
cuboidCols = Lists.newArrayList();
for (int j = 0; j < maxBits; j++) {
int bit = 1 << j;
if ((cuboidId & bit) == bit) {
cuboidCols.add(columns[j]);
}
}
cube.add(cuboidCols.toArray(new Column[cuboidCols.size()]));
}
return cube;
}
@Override
public SelectionNode visitFilter(PlanContext context, Stack<Expr> stack, Selection selection)
throws TajoException {
QueryBlock block = context.queryBlock;
ExprNormalizedResult normalizedResult = normalizer.normalize(context, selection.getQual());
block.namedExprsMgr.addExpr(normalizedResult.baseExpr);
if (normalizedResult.aggExprs.size() > 0 || normalizedResult.scalarExprs.size() > 0) {
throw makeSyntaxError("Filter condition cannot include aggregation function");
}
////////////////////////////////////////////////////////
// Visit and Build Child Plan
////////////////////////////////////////////////////////
stack.push(selection);
// Since filter push down will be done later, it is guaranteed that in-subqueries are found at only selection.
for (Expr eachQual : PlannerUtil.extractInSubquery(selection.getQual())) {
InPredicate inPredicate = (InPredicate) eachQual;
visit(context, stack, inPredicate.getInValue());
context.unplannedExprs.add(inPredicate.getInValue());
}
LogicalNode child = visit(context, stack, selection.getChild());
stack.pop();
////////////////////////////////////////////////////////
SelectionNode selectionNode = context.queryBlock.getNodeFromExpr(selection);
selectionNode.setChild(child);
selectionNode.setInSchema(child.getOutSchema());
selectionNode.setOutSchema(child.getOutSchema());
// Create EvalNode for a search condition.
EvalNode searchCondition = exprAnnotator.createEvalNode(context, selection.getQual(),
NameResolvingMode.RELS_AND_SUBEXPRS);
EvalNode simplified = context.evalOptimizer.optimize(context, searchCondition);
// set selection condition
selectionNode.setQual(simplified);
return selectionNode;
}
/*===============================================================================================
JOIN SECTION
===============================================================================================*/
@Override
public LogicalNode visitJoin(PlanContext context, Stack<Expr> stack, Join join)
throws TajoException {
// Phase 1: Init
LogicalPlan plan = context.plan;
QueryBlock block = context.queryBlock;
if (join.hasQual()) {
ExprNormalizedResult normalizedResult = normalizer.normalize(context, join.getQual(), true);
block.namedExprsMgr.addExpr(normalizedResult.baseExpr);
if (normalizedResult.aggExprs.size() > 0 || normalizedResult.scalarExprs.size() > 0) {
throw makeSyntaxError("Filter condition cannot include aggregation function");
}
}
////////////////////////////////////////////////////////
// Visit and Build Child Plan
////////////////////////////////////////////////////////
stack.push(join);
LogicalNode left = visit(context, stack, join.getLeft());
LogicalNode right = visit(context, stack, join.getRight());
stack.pop();
////////////////////////////////////////////////////////
JoinNode joinNode = context.queryBlock.getNodeFromExpr(join);
joinNode.setJoinType(join.getJoinType());
joinNode.setLeftChild(left);
joinNode.setRightChild(right);
// Set A merged input schema
Schema merged;
if (join.isNatural()) {
merged = getNaturalJoinSchema(left, right);
} else {
merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema());
}
joinNode.setInSchema(merged);
// Create EvalNode for a search condition.
EvalNode joinCondition = null;
if (join.hasQual()) {
EvalNode evalNode = exprAnnotator.createEvalNode(context, join.getQual(), NameResolvingMode.LEGACY);
joinCondition = context.evalOptimizer.optimize(context, evalNode);
}
// If the query involves a subquery, the stack can be empty.
// In this case, this join is the top most one within a query block.
boolean isTopMostJoin = stack.isEmpty() ? true : stack.peek().getType() != OpType.Join;
List<String> newlyEvaluatedExprs = getNewlyEvaluatedExprsForJoin(context, joinNode, isTopMostJoin);
List<Target> targets = new ArrayList<>(PlannerUtil.schemaToTargets(merged));
for (String newAddedExpr : newlyEvaluatedExprs) {
targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true));
}
joinNode.setTargets(targets);
// Determine join conditions
if (join.isNatural()) { // if natural join, it should have the equi-join conditions by common column names
EvalNode njCond = getNaturalJoinCondition(joinNode);
joinNode.setJoinQual(njCond);
} else if (join.hasQual()) { // otherwise, the given join conditions are set
joinNode.setJoinQual(joinCondition);
}
return joinNode;
}
private List<String> getNewlyEvaluatedExprsForJoin(PlanContext context, JoinNode joinNode, boolean isTopMostJoin)
throws TajoException {
QueryBlock block = context.queryBlock;
EvalNode evalNode;
List<String> newlyEvaluatedExprs = new ArrayList<>();
for (Iterator<NamedExpr> it = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); it.hasNext();) {
NamedExpr namedExpr = it.next();
try {
evalNode = exprAnnotator.createEvalNode(context, namedExpr.getExpr(), NameResolvingMode.LEGACY);
// the predicates specified in the on clause are already processed in visitJoin()
if (LogicalPlanner.checkIfBeEvaluatedAtJoin(context.queryBlock, evalNode, joinNode, isTopMostJoin)) {
block.namedExprsMgr.markAsEvaluated(namedExpr.getAlias(), evalNode);
newlyEvaluatedExprs.add(namedExpr.getAlias());
}
} catch (UndefinedColumnException ve) {
}
}
return newlyEvaluatedExprs;
}
private static Schema getNaturalJoinSchema(LogicalNode left, LogicalNode right) {
SchemaBuilder joinSchema = SchemaBuilder.uniqueNameBuilder();
joinSchema.addAll(left.getOutSchema().getRootColumns());
joinSchema.addAll(right.getOutSchema().getRootColumns());
return joinSchema.build();
}
private static EvalNode getNaturalJoinCondition(JoinNode joinNode) {
Schema leftSchema = joinNode.getLeftChild().getInSchema();
Schema rightSchema = joinNode.getRightChild().getInSchema();
Schema commons = SchemaUtil.getNaturalJoinColumns(leftSchema, rightSchema);
EvalNode njQual = null;
EvalNode equiQual;
Column leftJoinKey;
Column rightJoinKey;
for (Column common : commons.getRootColumns()) {
leftJoinKey = leftSchema.getColumn(common.getQualifiedName());
rightJoinKey = rightSchema.getColumn(common.getQualifiedName());
equiQual = new BinaryEval(EvalType.EQUAL,
new FieldEval(leftJoinKey), new FieldEval(rightJoinKey));
if (njQual == null) {
njQual = equiQual;
} else {
njQual = new BinaryEval(EvalType.AND, njQual, equiQual);
}
}
return njQual;
}
private LogicalNode createCartesianProduct(PlanContext context, LogicalNode left, LogicalNode right)
throws TajoException {
LogicalPlan plan = context.plan;
QueryBlock block = context.queryBlock;
Schema merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema());
JoinNode join = plan.createNode(JoinNode.class);
join.init(JoinType.CROSS, left, right);
join.setInSchema(merged);
block.addJoinType(join.getJoinType());
EvalNode evalNode;
List<String> newlyEvaluatedExprs = new ArrayList<>();
for (Iterator<NamedExpr> it = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); it.hasNext();) {
NamedExpr namedExpr = it.next();
try {
evalNode = exprAnnotator.createEvalNode(context, namedExpr.getExpr(), NameResolvingMode.LEGACY);
if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() == 0
&& EvalTreeUtil.findWindowFunction(evalNode).size() == 0) {
block.namedExprsMgr.markAsEvaluated(namedExpr.getAlias(), evalNode);
newlyEvaluatedExprs.add(namedExpr.getAlias());
}
} catch (UndefinedColumnException ve) {}
}
List<Target> targets = new ArrayList<>(PlannerUtil.schemaToTargets(merged));
for (String newAddedExpr : newlyEvaluatedExprs) {
targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true));
}
join.setTargets(targets);
return join;
}
@Override
public LogicalNode visitRelationList(PlanContext context, Stack<Expr> stack, RelationList relations)
throws TajoException {
LogicalNode current = visit(context, stack, relations.getRelations()[0]);
LogicalNode left;
LogicalNode right;
if (relations.size() > 1) {
for (int i = 1; i < relations.size(); i++) {
left = current;
right = visit(context, stack, relations.getRelations()[i]);
current = createCartesianProduct(context, left, right);
}
}
context.queryBlock.registerNode(current);
return current;
}
@Override
public ScanNode visitRelation(PlanContext context, Stack<Expr> stack, Relation expr)
throws TajoException {
QueryBlock block = context.queryBlock;
ScanNode scanNode = block.getNodeFromExpr(expr);
// Find expression which can be evaluated at this relation node.
// Except for column references, additional expressions used in select list, where clause, order-by clauses
// can be evaluated here. Their reference names are kept in newlyEvaluatedExprsRef.
Set<String> newlyEvaluatedExprsReferences = new LinkedHashSet<>();
for (Iterator<NamedExpr> iterator = block.namedExprsMgr.getIteratorForUnevaluatedExprs(); iterator.hasNext();) {
NamedExpr rawTarget = iterator.next();
try {
EvalNode evalNode = exprAnnotator.createEvalNode(context, rawTarget.getExpr(),
NameResolvingMode.RELS_ONLY);
if (checkIfBeEvaluatedAtRelation(block, evalNode, scanNode)) {
block.namedExprsMgr.markAsEvaluated(rawTarget.getAlias(), evalNode);
newlyEvaluatedExprsReferences.add(rawTarget.getAlias()); // newly added exr
}
} catch (UndefinedColumnException ve) {
}
}
// Assume that each unique expr is evaluated once.
LinkedHashSet<Target> targets = createFieldTargetsFromRelation(block, scanNode, newlyEvaluatedExprsReferences);
// The fact the some expr is included in newlyEvaluatedExprsReferences means that it is already evaluated.
// So, we get a raw expression and then creates a target.
for (String reference : newlyEvaluatedExprsReferences) {
NamedExpr refrer = block.namedExprsMgr.getNamedExpr(reference);
EvalNode evalNode = exprAnnotator.createEvalNode(context, refrer.getExpr(), NameResolvingMode.RELS_ONLY);
targets.add(new Target(evalNode, reference));
}
scanNode.setTargets(new ArrayList<>(targets));
verifyProjectedFields(block, scanNode);
return scanNode;
}
private static LinkedHashSet<Target> createFieldTargetsFromRelation(QueryBlock block, RelationNode relationNode,
Set<String> newlyEvaluatedRefNames) {
LinkedHashSet<Target> targets = Sets.newLinkedHashSet();
for (Column column : relationNode.getLogicalSchema().getAllColumns()) {
// TODO - Currently, EvalNode has DataType as a return type. So, RECORD cannot be used for any target.
// The following line is a kind of hack, preventing RECORD to be used for target in the logical planning phase.
// This problem should be resolved after TAJO-1402.
if (column.getTypeDesc().getDataType().getType() == TajoDataTypes.Type.RECORD) {
continue;
}
String aliasName = block.namedExprsMgr.checkAndGetIfAliasedColumn(column.getQualifiedName());
if (aliasName != null) {
targets.add(new Target(new FieldEval(column), aliasName));
newlyEvaluatedRefNames.remove(aliasName);
} else {
targets.add(new Target(new FieldEval(column)));
}
}
return targets;
}
@Override
public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<Expr> stack, TablePrimarySubQuery expr)
throws TajoException {
return visitCommonTableSubquery(context, stack, expr);
}
@Override
public TableSubQueryNode visitSimpleTableSubquery(PlanContext context, Stack<Expr> stack, SimpleTableSubquery expr)
throws TajoException {
return visitCommonTableSubquery(context, stack, expr);
}
private TableSubQueryNode visitCommonTableSubquery(PlanContext context, Stack<Expr> stack, CommonSubquery expr)
throws TajoException {
QueryBlock currentBlock = context.queryBlock;
QueryBlock childBlock = context.plan.getBlock(context.plan.getBlockNameByExpr(expr.getSubQuery()));
context.plan.connectBlocks(childBlock, currentBlock, BlockType.TableSubQuery);
PlanContext newContext = new PlanContext(context, childBlock);
context.plan.connectBlocks(childBlock, context.queryBlock, BlockType.TableSubQuery);
LogicalNode child = visit(newContext, new Stack<>(), expr.getSubQuery());
TableSubQueryNode subQueryNode = currentBlock.getNodeFromExpr(expr);
subQueryNode.setSubQuery(child);
setTargetOfTableSubQuery(context, currentBlock, subQueryNode);
return subQueryNode;
}
private void setTargetOfTableSubQuery (PlanContext context, QueryBlock block, TableSubQueryNode subQueryNode)
throws TajoException {
// Add additional expressions required in upper nodes.
Set<String> newlyEvaluatedExprs = new HashSet<>();
for (NamedExpr rawTarget : block.namedExprsMgr.getAllNamedExprs()) {
try {
EvalNode evalNode = exprAnnotator.createEvalNode(context, rawTarget.getExpr(),
NameResolvingMode.RELS_ONLY);
if (checkIfBeEvaluatedAtRelation(block, evalNode, subQueryNode)) {
block.namedExprsMgr.markAsEvaluated(rawTarget.getAlias(), evalNode);
newlyEvaluatedExprs.add(rawTarget.getAlias()); // newly added exr
}
} catch (UndefinedColumnException ve) {
}
}
// Assume that each unique expr is evaluated once.
LinkedHashSet<Target> targets = createFieldTargetsFromRelation(block, subQueryNode, newlyEvaluatedExprs);
for (String newAddedExpr : newlyEvaluatedExprs) {
targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true));
}
subQueryNode.setTargets(new ArrayList<>(targets));
}
/*===============================================================================================
SET OPERATION SECTION
===============================================================================================*/
@Override
public LogicalNode visitUnion(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
throws TajoException {
UnionNode unionNode = (UnionNode)buildSetPlan(context, stack, setOperation);
LogicalNode resultingNode = unionNode;
/**
* if the given node is Union (Distinct), it adds group by node
* change
* from
* union
*
* to
* projection
* |
* group by
* |
* table subquery
* |
* union
*/
if (unionNode.isDistinct()) {
return insertProjectionGroupbyBeforeSetOperation(context, unionNode);
}
return resultingNode;
}
private ProjectionNode insertProjectionGroupbyBeforeSetOperation(PlanContext context,
SetOperationNode setOperationNode)
throws TajoException {
QueryBlock currentBlock = context.queryBlock;
// make table subquery node which has set operation as its subquery
TableSubQueryNode setOpTableSubQueryNode = context.plan.createNode(TableSubQueryNode.class);
setOpTableSubQueryNode.init(CatalogUtil.buildFQName(context.queryContext.get(SessionVars.CURRENT_DATABASE),
context.generateUniqueSubQueryName()), setOperationNode);
setTargetOfTableSubQuery(context, currentBlock, setOpTableSubQueryNode);
currentBlock.registerNode(setOpTableSubQueryNode);
currentBlock.addRelation(setOpTableSubQueryNode);
Schema setOpSchema = setOpTableSubQueryNode.getOutSchema();
List<Target> setOpTarget = setOpTableSubQueryNode.getTargets();
// make group by node whose grouping keys are all columns of set operation
GroupbyNode setOpGroupbyNode = context.plan.createNode(GroupbyNode.class);
setOpGroupbyNode.setInSchema(setOpSchema);
setOpGroupbyNode.setGroupingColumns(setOpSchema.toArray());
setOpGroupbyNode.setTargets(setOpTarget);
setOpGroupbyNode.setChild(setOpTableSubQueryNode);
currentBlock.registerNode(setOpGroupbyNode);
// make projection node which projects all the union columns
ProjectionNode setOpProjectionNode = context.plan.createNode(ProjectionNode.class);
setOpProjectionNode.setInSchema(setOpSchema);
setOpProjectionNode.setTargets(setOpTarget);
setOpProjectionNode.setChild(setOpGroupbyNode);
currentBlock.registerNode(setOpProjectionNode);
// changing query block chain: at below, ( ) indicates query block
// (... + set operation ) - (...) ==> (... + projection + group by + table subquery) - (set operation) - (...)
QueryBlock setOpBlock = context.plan.newQueryBlock();
setOpBlock.registerNode(setOperationNode);
setOpBlock.setRoot(setOperationNode);
QueryBlock leftBlock = context.plan.getBlock(setOperationNode.getLeftChild());
QueryBlock rightBlock = context.plan.getBlock(setOperationNode.getRightChild());
context.plan.disconnectBlocks(leftBlock, context.queryBlock);
context.plan.disconnectBlocks(rightBlock, context.queryBlock);
context.plan.connectBlocks(setOpBlock, context.queryBlock, BlockType.TableSubQuery);
context.plan.connectBlocks(leftBlock, setOpBlock, BlockType.TableSubQuery);
context.plan.connectBlocks(rightBlock, setOpBlock, BlockType.TableSubQuery);
// projection node (not original set operation node) will be a new child of parent node
return setOpProjectionNode;
}
@Override
public LogicalNode visitExcept(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
throws TajoException {
return buildSetPlan(context, stack, setOperation);
}
@Override
public LogicalNode visitIntersect(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
throws TajoException {
return buildSetPlan(context, stack, setOperation);
}
private LogicalNode buildSetPlan(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
throws TajoException {
// 1. Init Phase
LogicalPlan plan = context.plan;
QueryBlock block = context.queryBlock;
////////////////////////////////////////////////////////
// Visit and Build Left Child Plan
////////////////////////////////////////////////////////
QueryBlock leftBlock = context.plan.getBlockByExpr(setOperation.getLeft());
PlanContext leftContext = new PlanContext(context, leftBlock);
stack.push(setOperation);
LogicalNode leftChild = visit(leftContext, new Stack<>(), setOperation.getLeft());
stack.pop();
// Connect left child and current blocks
context.plan.connectBlocks(leftContext.queryBlock, context.queryBlock, BlockType.TableSubQuery);
////////////////////////////////////////////////////////
// Visit and Build Right Child Plan
////////////////////////////////////////////////////////
QueryBlock rightBlock = context.plan.getBlockByExpr(setOperation.getRight());
PlanContext rightContext = new PlanContext(context, rightBlock);
stack.push(setOperation);
LogicalNode rightChild = visit(rightContext, new Stack<>(), setOperation.getRight());
stack.pop();
// Connect right child and current blocks
context.plan.connectBlocks(rightContext.queryBlock, context.queryBlock, BlockType.TableSubQuery);
BinaryNode setOp;
if (setOperation.getType() == OpType.Union) {
setOp = block.getNodeFromExpr(setOperation);
} else if (setOperation.getType() == OpType.Except) {
setOp = block.getNodeFromExpr(setOperation);
} else if (setOperation.getType() == OpType.Intersect) {
setOp = block.getNodeFromExpr(setOperation);
} else {
throw new TajoInternalError("Unknown set type: " + setOperation.getType());
}
setOp.setLeftChild(leftChild);
setOp.setRightChild(rightChild);
// An union statement can be derived from two query blocks.
// For one union statement between both relations, we can ensure that each corresponding data domain of both
// relations are the same. However, if necessary, the schema of left query block will be used as a base schema.
List<Target> leftStrippedTargets = PlannerUtil.stripTarget(
PlannerUtil.schemaToTargets(leftBlock.getRoot().getOutSchema()));
setOp.setInSchema(leftChild.getOutSchema());
Schema outSchema = PlannerUtil.targetToSchema(leftStrippedTargets);
setOp.setOutSchema(outSchema);
return setOp;
}
/*===============================================================================================
INSERT SECTION
===============================================================================================*/
public LogicalNode visitInsert(PlanContext context, Stack<Expr> stack, Insert expr) throws TajoException {
stack.push(expr);
LogicalNode subQuery = super.visitInsert(context, stack, expr);
stack.pop();
InsertNode insertNode = context.queryBlock.getNodeFromExpr(expr);
insertNode.setOverwrite(expr.isOverwrite());
insertNode.setSubQuery(subQuery);
if (expr.hasTableName()) { // INSERT (OVERWRITE) INTO TABLE ...
return buildInsertIntoTablePlan(context, insertNode, expr);
} else if (expr.hasLocation()) { // INSERT (OVERWRITE) INTO LOCATION ...
return buildInsertIntoLocationPlan(context, insertNode, expr);
} else {
throw new IllegalStateException("Invalid Query");
}
}
/**
* Builds a InsertNode with a target table.
*
* ex) INSERT OVERWRITE INTO TABLE ...
* <br />
*
* We use the following terms, such target table, target column
* <pre>
* INSERT INTO [DATABASE_NAME.]TB_NAME (col1, col2) SELECT c1, c2 FROM ...
* ^^^^^^^^^^^^^^ ^^^^^^^ ^^^^^^^^^^^^ ^^^^^^^^^^^^
* target database target table target columns (or schema) projected columns (or schema)
* </pre>
*/
private InsertNode buildInsertIntoTablePlan(PlanContext context, InsertNode insertNode, Insert expr)
throws TajoException {
// Get and set a target table
String databaseName;
String tableName;
if (CatalogUtil.isFQTableName(expr.getTableName())) {
databaseName = CatalogUtil.extractQualifier(expr.getTableName());
tableName = CatalogUtil.extractSimpleName(expr.getTableName());
} else {
databaseName = context.queryContext.get(SessionVars.CURRENT_DATABASE);
tableName = expr.getTableName();
}
TableDesc desc = catalog.getTableDesc(databaseName, tableName);
insertNode.setTargetTable(desc);
//
// When we use 'INSERT (OVERWIRTE) INTO TABLE statements, there are two cases.
//
// First, when a user specified target columns
// INSERT (OVERWRITE)? INTO table_name (col1 type, col2 type) SELECT ...
//
// Second, when a user do not specified target columns
// INSERT (OVERWRITE)? INTO table_name SELECT ...
//
// In the former case is, target columns' schema and corresponding projected columns' schema
// must be equivalent or be available to cast implicitly.
//
// In the later case, the target table's schema and projected column's
// schema of select clause can be different to each other. In this case,
// we use only a sequence of preceding columns of target table's schema
// as target columns.
//
// For example, consider a target table and an 'insert into' query are given as follows:
//
// CREATE TABLE TB1 (col1 int, col2 int, col3 long);
// || ||
// INSERT OVERWRITE INTO TB1 SELECT order_key, part_num FROM ...
//
// In this example, only col1 and col2 are used as target columns.
if (expr.hasTargetColumns()) { // when a user specified target columns
if (expr.getTargetColumns().length > insertNode.getChild().getOutSchema().size()) {
throw makeSyntaxError("Target columns and projected columns are mismatched to each other");
}
// See PreLogicalPlanVerifier.visitInsert.
// It guarantees that the equivalence between the numbers of target and projected columns.
ColumnReferenceExpr [] targets = expr.getTargetColumns();
final SchemaBuilder targetColumnsBld = SchemaBuilder.builder();
for (ColumnReferenceExpr target : targets) {
Column targetColumn = desc.getLogicalSchema().getColumn(target.getCanonicalName().replace(".", "/"));
if (targetColumn == null) {
throw makeSyntaxError("column '" + target + "' of relation '" + desc.getName() + "' does not exist");
}
targetColumnsBld.add(targetColumn);
}
final Schema targetColumns = targetColumnsBld.build();
insertNode.setTargetSchema(targetColumns);
insertNode.setOutSchema(targetColumns);
buildProjectedInsert(context, insertNode);
} else { // when a user do not specified target columns
// The output schema of select clause determines the target columns.
Schema tableSchema = desc.getLogicalSchema();
Schema projectedSchema = insertNode.getChild().getOutSchema();
SchemaBuilder targetColumns = SchemaBuilder.builder();
for (int i = 0; i < projectedSchema.size(); i++) {
targetColumns.add(tableSchema.getColumn(i));
}
insertNode.setTargetSchema(targetColumns.build());
buildProjectedInsert(context, insertNode);
}
if (desc.hasPartition()) {
insertNode.setPartitionMethod(desc.getPartitionMethod());
}
return insertNode;
}
private void buildProjectedInsert(PlanContext context, InsertNode insertNode) {
Schema tableSchema = insertNode.getTableSchema();
Schema targetColumns = insertNode.getTargetSchema();
LogicalNode child = insertNode.getChild();
if (child.getType() == NodeType.UNION) {
child = makeProjectionForInsertUnion(context, insertNode);
}
if (child instanceof Projectable) {
Projectable projectionNode = insertNode.getChild();
// Modifying projected columns by adding NULL constants
// It is because that table appender does not support target columns to be written.
List<Target> targets = new ArrayList<>();
for (Column column : tableSchema.getAllColumns()) {
int idxInProjectionNode = targetColumns.getIndex(column);
// record type itself cannot be projected yet.
if (column.getDataType().getType() == TajoDataTypes.Type.RECORD) {
continue;
}
if (idxInProjectionNode >= 0 && idxInProjectionNode < projectionNode.getTargets().size()) {
targets.add(projectionNode.getTargets().get(idxInProjectionNode));
} else {
targets.add(new Target(new ConstEval(NullDatum.get()), column.getSimpleName()));
}
}
projectionNode.setTargets(targets);
insertNode.setInSchema(projectionNode.getOutSchema());
insertNode.setOutSchema(projectionNode.getOutSchema());
insertNode.setProjectedSchema(PlannerUtil.targetToSchema(targets));
} else {
throw new RuntimeException("Wrong child node type: " + child.getType() + " for insert");
}
}
private ProjectionNode makeProjectionForInsertUnion(PlanContext context, InsertNode insertNode) {
LogicalNode child = insertNode.getChild();
// add (projection - subquery) to RootBlock and create new QueryBlock for UnionNode
TableSubQueryNode subQueryNode = context.plan.createNode(TableSubQueryNode.class);
subQueryNode.init(context.queryBlock.getName(), child);
subQueryNode.setTargets(PlannerUtil.schemaToTargets(subQueryNode.getOutSchema()));
ProjectionNode projectionNode = context.plan.createNode(ProjectionNode.class);
projectionNode.setChild(subQueryNode);
projectionNode.setInSchema(subQueryNode.getInSchema());
projectionNode.setTargets(subQueryNode.getTargets());
context.queryBlock.registerNode(projectionNode);
context.queryBlock.registerNode(subQueryNode);
// add child QueryBlock to the UnionNode's QueryBlock
UnionNode unionNode = (UnionNode)child;
context.queryBlock.unregisterNode(unionNode);
QueryBlock unionBlock = context.plan.newQueryBlock();
unionBlock.registerNode(unionNode);
unionBlock.setRoot(unionNode);
QueryBlock leftBlock = context.plan.getBlock(unionNode.getLeftChild());
QueryBlock rightBlock = context.plan.getBlock(unionNode.getRightChild());
context.plan.disconnectBlocks(leftBlock, context.queryBlock);
context.plan.disconnectBlocks(rightBlock, context.queryBlock);
context.plan.connectBlocks(unionBlock, context.queryBlock, BlockType.TableSubQuery);
context.plan.connectBlocks(leftBlock, unionBlock, BlockType.TableSubQuery);
context.plan.connectBlocks(rightBlock, unionBlock, BlockType.TableSubQuery);
// set InsertNode's child with ProjectionNode which is created.
insertNode.setChild(projectionNode);
return projectionNode;
}
/**
* Build a InsertNode with a location.
*
* ex) INSERT OVERWRITE INTO LOCATION 'hdfs://....' ..
*/
private InsertNode buildInsertIntoLocationPlan(PlanContext context, InsertNode insertNode, Insert expr) {
// INSERT (OVERWRITE)? INTO LOCATION path (USING file_type (param_clause)?)? query_expression
LogicalNode child = insertNode.getChild();
if (child.getType() == NodeType.UNION) {
child = makeProjectionForInsertUnion(context, insertNode);
}
Schema childSchema = child.getOutSchema();
insertNode.setInSchema(childSchema);
insertNode.setOutSchema(childSchema);
insertNode.setTableSchema(childSchema);
// Rewrite
URI targetUri = URI.create(expr.getLocation());
if (targetUri.getScheme() == null) {
targetUri = URI.create(context.getQueryContext().get(QueryVars.DEFAULT_SPACE_ROOT_URI) + "/" + targetUri);
}
insertNode.setUri(targetUri);
if (expr.hasStorageType()) {
insertNode.setDataFormat(CatalogUtil.getBackwardCompitableDataFormat(expr.getDataFormat()));
}
if (expr.hasParams()) {
KeyValueSet options = new KeyValueSet();
options.putAll(expr.getParams());
insertNode.setOptions(options);
}
return insertNode;
}
/*===============================================================================================
Data Definition Language (DDL) SECTION
===============================================================================================*/
@Override
public LogicalNode visitCreateDatabase(PlanContext context, Stack<Expr> stack, CreateDatabase expr)
throws TajoException {
CreateDatabaseNode createDatabaseNode = context.queryBlock.getNodeFromExpr(expr);
createDatabaseNode.init(expr.getDatabaseName(), expr.isIfNotExists());
return createDatabaseNode;
}
@Override
public LogicalNode visitDropDatabase(PlanContext context, Stack<Expr> stack, DropDatabase expr)
throws TajoException {
DropDatabaseNode dropDatabaseNode = context.queryBlock.getNodeFromExpr(expr);
dropDatabaseNode.init(expr.getDatabaseName(), expr.isIfExists());
return dropDatabaseNode;
}
public LogicalNode handleCreateTableLike(PlanContext context, CreateTable expr, CreateTableNode createTableNode)
throws TajoException {
String parentTableName = expr.getLikeParentTableName();
if (CatalogUtil.isFQTableName(parentTableName) == false) {
parentTableName = CatalogUtil.buildFQName(context.queryContext.get(SessionVars.CURRENT_DATABASE),
parentTableName);
}
TableDesc baseTable = catalog.getTableDesc(parentTableName);
if(baseTable == null) {
throw new UndefinedTableException(parentTableName);
}
PartitionMethodDesc partitionDesc = baseTable.getPartitionMethod();
createTableNode.setTableSchema(baseTable.getSchema());
createTableNode.setPartitionMethod(partitionDesc);
createTableNode.setDataFormat(CatalogUtil.getBackwardCompitableDataFormat(baseTable.getMeta().getDataFormat()));
createTableNode.setOptions(baseTable.getMeta().getPropertySet());
createTableNode.setExternal(baseTable.isExternal());
if(baseTable.isExternal()) {
createTableNode.setUri(baseTable.getUri());
}
return createTableNode;
}
@Override
public LogicalNode visitCreateTable(PlanContext context, Stack<Expr> stack, CreateTable expr)
throws TajoException {
CreateTableNode createTableNode = context.queryBlock.getNodeFromExpr(expr);
createTableNode.setIfNotExists(expr.isIfNotExists());
// Set a table name to be created.
if (CatalogUtil.isFQTableName(expr.getTableName())) {
createTableNode.setTableName(expr.getTableName());
} else {
createTableNode.setTableName(
CatalogUtil.buildFQName(context.queryContext.get(SessionVars.CURRENT_DATABASE), expr.getTableName()));
}
// This is CREATE TABLE <tablename> LIKE <parentTable>
if(expr.getLikeParentTableName() != null) {
return handleCreateTableLike(context, expr, createTableNode);
}
if (expr.hasTableSpaceName()) {
createTableNode.setTableSpaceName(expr.getTableSpaceName());
}
createTableNode.setUri(getCreatedTableURI(context, expr));
if (expr.hasStorageType()) { // If storage type (using clause) is specified
createTableNode.setDataFormat(CatalogUtil.getBackwardCompitableDataFormat(expr.getStorageType()));
} else { // otherwise, default type
createTableNode.setDataFormat(BuiltinStorages.TEXT);
}
// Set default storage properties to table
createTableNode.setOptions(
CatalogUtil.newDefaultProperty(createTableNode.getStorageType(), context.getQueryContext().getConf()));
// Priority to apply table properties
// 1. Explicit table properties specified in WITH clause
// 2. Session variables
// Set session variables to properties
TablePropertyUtil.setTableProperty(context.queryContext, createTableNode);
// Set table property specified in WITH clause and it will override all others
if (expr.hasParams()) {
createTableNode.getOptions().putAll(expr.getParams());
}
if (expr.hasPartition()) {
if (expr.getPartitionMethod().getPartitionType().equals(PartitionType.COLUMN)) {
createTableNode.setPartitionMethod(getPartitionMethod(context, expr.getTableName(), expr.getPartitionMethod()));
} else {
throw ExceptionUtil.makeNotSupported(
String.format("PartitonType " + expr.getPartitionMethod().getPartitionType()));
}
}
if (expr.hasSubQuery()) { // CREATE TABLE .. AS SELECT
stack.add(expr);
LogicalNode subQuery = visit(context, stack, expr.getSubQuery());
stack.pop();
createTableNode.setChild(subQuery);
createTableNode.setInSchema(subQuery.getOutSchema());
// If the table schema is defined
// ex) CREATE TABLE tbl(col1 type, col2 type) AS SELECT ...
if (expr.hasTableElements()) {
createTableNode.setOutSchema(convertTableElementsSchema(expr.getTableElements()));
createTableNode.setTableSchema(convertTableElementsSchema(expr.getTableElements()));
} else {
// if no table definition, the select clause's output schema will be used.
// ex) CREATE TABLE tbl AS SELECT ...
if (expr.hasPartition()) {
PartitionMethodDesc partitionMethod = createTableNode.getPartitionMethod();
Schema queryOutputSchema = subQuery.getOutSchema();
Schema partitionExpressionSchema = partitionMethod.getExpressionSchema();
if (partitionMethod.getPartitionType() == CatalogProtos.PartitionType.COLUMN &&
queryOutputSchema.size() < partitionExpressionSchema.size()) {
throw makeSyntaxError("Partition columns cannot be more than table columns.");
}
SchemaBuilder tableSchemaBld = SchemaBuilder.builder();
for (int i = 0; i < queryOutputSchema.size() - partitionExpressionSchema.size(); i++) {
tableSchemaBld.add(queryOutputSchema.getColumn(i));
}
Schema tableSchema = tableSchemaBld.build();
createTableNode.setOutSchema(tableSchema);
createTableNode.setTableSchema(tableSchema);
} else {
// Convert the schema of subquery into the target table's one.
Schema schema = SchemaBuilder.builder().addAll(subQuery.getOutSchema().getRootColumns()).build();
schema.setQualifier(createTableNode.getTableName());
createTableNode.setOutSchema(schema);
createTableNode.setTableSchema(schema);
}
}
return createTableNode;
} else { // if CREATE AN EMPTY TABLE
if (!expr.hasSelfDescSchema()) {
Schema tableSchema = convertColumnsToSchema(expr.getTableElements());
createTableNode.setTableSchema(tableSchema);
} else {
createTableNode.setSelfDescSchema(true);
}
if (expr.isExternal()) {
createTableNode.setExternal(true);
}
return createTableNode;
}
}
/**
* Return a table uri to be created
*
* @param context PlanContext
* @param createTable An algebral expression for create table
* @return a Table uri to be created on a given table space
*/
private URI getCreatedTableURI(PlanContext context, CreateTable createTable) {
if (createTable.hasLocation()) {
URI tableUri = URI.create(createTable.getLocation());
if (tableUri.getScheme() == null) { // if a given table URI is a just path, the default tablespace will be added.
tableUri = URI.create(context.queryContext.get(QueryVars.DEFAULT_SPACE_ROOT_URI) + createTable.getLocation());
}
return tableUri;
} else {
String tableName = createTable.getTableName();
String databaseName = CatalogUtil.isFQTableName(tableName) ?
CatalogUtil.extractQualifier(tableName) : context.queryContext.get(SessionVars.CURRENT_DATABASE);
return storage.getTableURI(
createTable.getTableSpaceName(), databaseName, CatalogUtil.extractSimpleName(tableName));
}
}
private PartitionMethodDesc getPartitionMethod(PlanContext context,
String tableName,
CreateTable.PartitionMethodDescExpr expr) throws TajoException {
PartitionMethodDesc partitionMethodDesc;
if(expr.getPartitionType() == PartitionType.COLUMN) {
CreateTable.ColumnPartition partition = (CreateTable.ColumnPartition) expr;
String partitionExpression = Joiner.on(',').join(partition.getColumns());
partitionMethodDesc = new PartitionMethodDesc(context.queryContext.get(SessionVars.CURRENT_DATABASE), tableName,
CatalogProtos.PartitionType.COLUMN, partitionExpression, convertColumnsToSchema(partition.getColumns()));
} else {
throw new NotImplementedException("partition type '" + expr.getPartitionType() + "'");
}
return partitionMethodDesc;
}
/**
* It transforms table definition elements to schema.
*
* @param elements to be transformed
* @return schema transformed from table definition elements
*/
private Schema convertColumnsToSchema(ColumnDefinition[] elements) {
return SchemaBuilder.builder().addAll(elements, new Function<ColumnDefinition, Column>() {
@Override
public Column apply(@Nullable ColumnDefinition input) {
return convertColumn(input);
}
}).build();
}
/**
* It transforms table definition elements to schema.
*
* @param elements to be transformed
* @return schema transformed from table definition elements
*/
private static Schema convertTableElementsSchema(ColumnDefinition[] elements) {
return SchemaBuilder.builder().addAll(elements, new Function<ColumnDefinition, Column>() {
@Override
public Column apply(@Nullable ColumnDefinition input) {
return convertColumn(input);
}
}).build();
}
/**
* It transforms ColumnDefinition array to String array.
*
* @param columnReferenceExprs
* @return
*/
private static String[] convertColumnsToStrings(ColumnReferenceExpr[] columnReferenceExprs) {
int columnCount = columnReferenceExprs.length;
String[] columns = new String[columnCount];
for(int i = 0; i < columnCount; i++) {
ColumnReferenceExpr columnReferenceExpr = columnReferenceExprs[i];
columns[i] = columnReferenceExpr.getName();
}
return columns;
}
/**
* It transforms Expr array to String array.
*
* @param exprs
* @return
*/
private static String[] convertExprsToPartitionTableStringValues(Expr[] exprs) {
int exprCount = exprs.length;
String[] values = new String[exprCount];
for(int i = 0; i < exprCount; i++) {
if (exprs[i].getType() == OpType.NullLiteral) {
values[i] = StorageConstants.DEFAULT_PARTITION_NAME;
} else {
LiteralValue expr = (LiteralValue) exprs[i];
values[i] = expr.getValue();
}
}
return values;
}
private static Column convertColumn(ColumnDefinition columnDefinition) {
return new Column(columnDefinition.getColumnName(), convertDataType(columnDefinition));
}
public static TypeDesc convertDataType(DataTypeExpr dataType) {
TajoDataTypes.Type type = TajoDataTypes.Type.valueOf(dataType.getTypeName());
TajoDataTypes.DataType.Builder builder = TajoDataTypes.DataType.newBuilder();
builder.setType(type);
if (dataType.hasLengthOrPrecision()) {
builder.setLength(dataType.getLengthOrPrecision());
} else {
if (type == TajoDataTypes.Type.CHAR) {
builder.setLength(1);
}
}
TypeDesc typeDesc;
if (type == TajoDataTypes.Type.RECORD) {
Schema nestedRecordSchema = convertTableElementsSchema(dataType.getNestedRecordTypes());
typeDesc = new TypeDesc(nestedRecordSchema);
} else {
typeDesc = new TypeDesc(builder.build());
}
return typeDesc;
}
@Override
public LogicalNode visitDropTable(PlanContext context, Stack<Expr> stack, DropTable dropTable) {
DropTableNode dropTableNode = context.queryBlock.getNodeFromExpr(dropTable);
String qualified;
if (CatalogUtil.isFQTableName(dropTable.getTableName())) {
qualified = dropTable.getTableName();
} else {
qualified = CatalogUtil.buildFQName(
context.queryContext.get(SessionVars.CURRENT_DATABASE), dropTable.getTableName());
}
dropTableNode.init(qualified, dropTable.isIfExists(), dropTable.isPurge());
return dropTableNode;
}
public LogicalNode visitAlterTablespace(PlanContext context, Stack<Expr> stack, AlterTablespace alterTablespace) {
AlterTablespaceNode alter = context.queryBlock.getNodeFromExpr(alterTablespace);
alter.setTablespaceName(alterTablespace.getTablespaceName());
alter.setLocation(alterTablespace.getLocation());
return alter;
}
@Override
public LogicalNode visitAlterTable(PlanContext context, Stack<Expr> stack, AlterTable alterTable) {
AlterTableNode alterTableNode = context.queryBlock.getNodeFromExpr(alterTable);
alterTableNode.setTableName(alterTable.getTableName());
alterTableNode.setNewTableName(alterTable.getNewTableName());
alterTableNode.setColumnName(alterTable.getColumnName());
alterTableNode.setNewColumnName(alterTable.getNewColumnName());
alterTableNode.setProperties(new KeyValueSet(alterTable.getParams()));
if (null != alterTable.getAddNewColumn()) {
alterTableNode.setAddNewColumn(convertColumn(alterTable.getAddNewColumn()));
}
if (alterTable.getColumns() != null) {
alterTableNode.setPartitionColumns(convertColumnsToStrings(alterTable.getColumns()));
}
if (alterTable.getValues() != null) {
alterTableNode.setPartitionValues(convertExprsToPartitionTableStringValues(alterTable.getValues()));
}
if (alterTable.getLocation() != null) {
alterTableNode.setLocation(alterTable.getLocation());
}
alterTableNode.setPurge(alterTable.isPurge());
alterTableNode.setIfNotExists(alterTable.isIfNotExists());
alterTableNode.setIfExists(alterTable.isIfExists());
alterTableNode.setAlterTableOpType(alterTable.getAlterTableOpType());
return alterTableNode;
}
private static URI getIndexPath(PlanContext context, String databaseName, String indexName) {
return new Path(TajoConf.getWarehouseDir(context.queryContext.getConf()),
databaseName + "/" + indexName + "/").toUri();
}
@Override
public LogicalNode visitCreateIndex(PlanContext context, Stack<Expr> stack, CreateIndex createIndex)
throws TajoException {
stack.push(createIndex);
LogicalNode child = visit(context, stack, createIndex.getChild());
stack.pop();
QueryBlock block = context.queryBlock;
CreateIndexNode createIndexNode = block.getNodeFromExpr(createIndex);
if (CatalogUtil.isFQTableName(createIndex.getIndexName())) {
createIndexNode.setIndexName(createIndex.getIndexName());
} else {
createIndexNode.setIndexName(
CatalogUtil.buildFQName(context.queryContext.get(SessionVars.CURRENT_DATABASE), createIndex.getIndexName()));
}
createIndexNode.setUnique(createIndex.isUnique());
Sort.SortSpec[] sortSpecs = createIndex.getSortSpecs();
int sortKeyNum = sortSpecs.length;
String[] referNames = new String[sortKeyNum];
ExprNormalizedResult[] normalizedExprList = new ExprNormalizedResult[sortKeyNum];
for (int i = 0; i < sortKeyNum; i++) {
normalizedExprList[i] = normalizer.normalize(context, sortSpecs[i].getKey());
}
for (int i = 0; i < sortKeyNum; i++) {
// even if base expressions don't have their name,
// reference names should be identifiable for the later sort spec creation.
referNames[i] = block.namedExprsMgr.addExpr(normalizedExprList[i].baseExpr);
block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].aggExprs);
block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].scalarExprs);
}
Collection<RelationNode> relations = block.getRelations();
if (relations.size() > 1) {
throw new UnsupportedException("Index on multiple relations");
} else if (relations.size() == 0) {
throw new TajoInternalError("No relation for indexing");
}
RelationNode relationNode = relations.iterator().next();
if (!(relationNode instanceof ScanNode)) {
throw new UnsupportedException("Index on subquery");
}
createIndexNode.setExternal(createIndex.isExternal());
createIndexNode.setKeySortSpecs(relationNode.getLogicalSchema(), annotateSortSpecs(block, referNames, sortSpecs));
createIndexNode.setIndexMethod(IndexMethod.valueOf(createIndex.getMethodSpec().getName().toUpperCase()));
if (createIndex.isExternal()) {
createIndexNode.setIndexPath(new Path(createIndex.getIndexPath()).toUri());
} else {
createIndexNode.setIndexPath(
getIndexPath(context, context.queryContext.get(SessionVars.CURRENT_DATABASE), createIndex.getIndexName()));
}
if (createIndex.getParams() != null) {
KeyValueSet keyValueSet = new KeyValueSet();
keyValueSet.putAll(createIndex.getParams());
createIndexNode.setOptions(keyValueSet);
}
createIndexNode.setChild(child);
return createIndexNode;
}
@Override
public LogicalNode visitDropIndex(PlanContext context, Stack<Expr> stack, DropIndex dropIndex) {
DropIndexNode dropIndexNode = context.queryBlock.getNodeFromExpr(dropIndex);
dropIndexNode.setIndexName(dropIndex.getIndexName());
return dropIndexNode;
}
@Override
public LogicalNode visitTruncateTable(PlanContext context, Stack<Expr> stack, TruncateTable truncateTable)
throws TajoException {
TruncateTableNode truncateTableNode = context.queryBlock.getNodeFromExpr(truncateTable);
truncateTableNode.setTableNames(truncateTable.getTableNames());
return truncateTableNode;
}
/*===============================================================================================
Util SECTION
===============================================================================================*/
public static boolean checkIfBeEvaluatedAtWindowAgg(EvalNode evalNode, WindowAggNode node) {
Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
if (columnRefs.size() > 0 && !node.getInSchema().containsAll(columnRefs)) {
return false;
}
if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
return false;
}
return true;
}
public static boolean checkIfBeEvaluatedAtGroupBy(EvalNode evalNode, GroupbyNode node) {
Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
if (columnRefs.size() > 0 && !node.getInSchema().containsAll(columnRefs)) {
return false;
}
if (EvalTreeUtil.findEvalsByType(evalNode, EvalType.WINDOW_FUNCTION).size() > 0) {
return false;
}
return true;
}
public static boolean isEvaluatableJoinQual(QueryBlock block, EvalNode evalNode, JoinNode node,
boolean isOnPredicate, boolean isTopMostJoin) {
if (checkIfBeEvaluatedAtJoin(block, evalNode, node, isTopMostJoin)) {
if (isNonEquiThetaJoinQual(block, node, evalNode)) {
return false;
}
if (PlannerUtil.isOuterJoinType(node.getJoinType())) {
/*
* For outer joins, only predicates which are specified at the on clause can be evaluated during processing join.
* Other predicates from the where clause must be evaluated after the join.
* The below code will be modified after improving join operators to keep join filters by themselves (TAJO-1310).
*/
if (!isOnPredicate) {
return false;
}
} else {
/*
* Only join predicates should be evaluated at join if the join type is inner or cross. (TAJO-1445)
*/
if (!EvalTreeUtil.isJoinQual(block, node.getLeftChild().getOutSchema(), node.getRightChild().getOutSchema(),
evalNode, false)) {
return false;
}
}
return true;
}
return false;
}
public static boolean isNonEquiThetaJoinQual(final LogicalPlan.QueryBlock block,
final JoinNode joinNode,
final EvalNode evalNode) {
if (EvalTreeUtil.isJoinQual(block, joinNode.getLeftChild().getOutSchema(),
joinNode.getRightChild().getOutSchema(), evalNode, true) &&
evalNode.getType() != EvalType.EQUAL) {
return true;
} else {
return false;
}
}
public static boolean checkIfBeEvaluatedAtJoin(QueryBlock block, EvalNode evalNode, JoinNode node,
boolean isTopMostJoin) {
Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
return false;
}
if (EvalTreeUtil.findEvalsByType(evalNode, EvalType.WINDOW_FUNCTION).size() > 0) {
return false;
}
if (columnRefs.size() > 0 && !node.getInSchema().containsAll(columnRefs)) {
return false;
}
// When a 'case-when' is used with outer join, the case-when expression must be evaluated
// at the topmost join operator.
// TODO - It's also valid that case-when is evalauted at the topmost outer operator.
// But, how can we know there is no further outer join operator after this node?
if (containsOuterJoin(block)) {
if (!isTopMostJoin) {
Collection<EvalNode> found = EvalTreeUtil.findOuterJoinSensitiveEvals(evalNode);
if (found.size() > 0) {
return false;
}
}
}
return true;
}
public static boolean containsOuterJoin(QueryBlock block) {
return block.containsJoinType(JoinType.LEFT_OUTER) || block.containsJoinType(JoinType.RIGHT_OUTER) ||
block.containsJoinType(JoinType.FULL_OUTER);
}
/**
* It checks if evalNode can be evaluated at this @{link RelationNode}.
*/
public static boolean checkIfBeEvaluatedAtRelation(QueryBlock block, EvalNode evalNode, RelationNode node) {
Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
// aggregation functions cannot be evaluated in scan node
if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
return false;
}
// aggregation functions cannot be evaluated in scan node
if (EvalTreeUtil.findEvalsByType(evalNode, EvalType.WINDOW_FUNCTION).size() > 0) {
return false;
}
if (columnRefs.size() > 0 && !node.getLogicalSchema().containsAll(columnRefs)) {
return false;
}
// Why? - When a {case when} is used with outer join, case when must be evaluated at topmost outer join.
if (containsOuterJoin(block)) {
Collection<EvalNode> found = EvalTreeUtil.findOuterJoinSensitiveEvals(evalNode);
if (found.size() > 0) {
return false;
}
}
return true;
}
public static boolean checkIfBeEvaluatedAtThis(EvalNode evalNode, LogicalNode node) {
Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
if (columnRefs.size() > 0 && !node.getInSchema().containsAll(columnRefs)) {
return false;
}
return true;
}
}