blob: 8d91628bf18fcc27bbaa50e8d1ec70a596fcd4a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.physical.visitor;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.calcite.rel.rules.ProjectRemoveRule;
import org.apache.drill.exec.planner.StarColumnHelper;
import org.apache.drill.exec.planner.physical.LeafPrel;
import org.apache.drill.exec.planner.physical.MetadataControllerPrel;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.ProjectAllowDupPrel;
import org.apache.drill.exec.planner.physical.ProjectPrel;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.planner.physical.ScreenPrel;
import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.planner.physical.UnnestPrel;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.util.Pair;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
public class StarColumnConverter extends BasePrelVisitor<Prel, Void, RuntimeException> {
private static final AtomicLong tableNumber = new AtomicLong(0);
private boolean prefixedForStar;
private boolean prefixedForWriter;
private StarColumnConverter() {
prefixedForStar = false;
prefixedForWriter = false;
}
public static Prel insertRenameProject(Prel root) {
// Prefixing columns for columns expanded from star column. There are two cases.
// 1. star is used in SELECT only.
// - Insert project under screen (PUS) to remove prefix,
// - Insert project above scan (PAS) to add prefix.
// 2. star is used in CTAS
// - Insert project under Writer (PUW) to remove prefix
// - Insert project above scan (PAS) to add prefix.
// - DO NOT insert any project for prefix handling under Screen.
// The following condtions should apply when prefix is required
// Any non-SCAN prel produces regular column / expression AND star column, multiple star columns.
// This is because we have to use prefix to distinguish columns expanded
// from star column, from those regular column referenced in the query.
return root.accept(new StarColumnConverter(), null);
}
@Override
public Prel visitScreen(ScreenPrel prel, Void value) throws RuntimeException {
Prel child = ((Prel) prel.getInput(0)).accept(this, null);
if (prefixedForStar) {
if (prefixedForWriter) {
// Prefix is added under CTAS Writer. We need create a new Screen with the converted child.
return prel.copy(prel.getTraitSet(), Collections.singletonList(child));
} else {
// Prefix is added for SELECT only, not for CTAS writer.
return insertProjUnderScreenOrWriter(prel, prel.getInput().getRowType(), child);
}
} else {
// No prefix is
return prel;
}
}
// Note: the logic of handling * column for Writer is moved to ProjectForWriterVisitor.
@Override
public Prel visitWriter(WriterPrel prel, Void value) throws RuntimeException {
RelNode child = ((Prel) prel.getInput(0)).accept(this, null);
if (prefixedForStar) {
prefixedForWriter = true;
// return insertProjUnderScreenOrWriter(prel, prel.getInput().getRowType(), child);
return prel.copy(prel.getTraitSet(), Collections.singletonList(child));
} else {
return prel;
}
}
// insert PUS or PUW: Project Under Screen/Writer, when necessary.
private Prel insertProjUnderScreenOrWriter(Prel prel, RelDataType origRowType, Prel child) {
ProjectPrel proj;
List<RelNode> children = Lists.newArrayList();
List<RexNode> exprs = Lists.newArrayList();
for (int i = 0; i < origRowType.getFieldCount(); i++) {
RexNode expr = child.getCluster().getRexBuilder().makeInputRef(origRowType.getFieldList().get(i).getType(), i);
exprs.add(expr);
}
RelDataType newRowType = RexUtil.createStructType(child.getCluster().getTypeFactory(),
exprs, origRowType.getFieldNames(), null);
int fieldCount = prel.getRowType().isStruct() ? prel.getRowType().getFieldCount() : 1;
// Insert PUS/PUW : remove the prefix and keep the original field name.
if (fieldCount > 1) { // no point in allowing duplicates if we only have one column
proj = new ProjectAllowDupPrel(child.getCluster(),
child.getTraitSet(),
child,
exprs,
newRowType,
true); //outputProj = true : will allow to build the schema for PUS Project, see ProjectRecordBatch#handleNullInput()
} else {
proj = new ProjectPrel(child.getCluster(),
child.getTraitSet(),
child,
exprs,
newRowType,
true); //outputProj = true : will allow to build the schema for PUS Project, see ProjectRecordBatch#handleNullInput()
}
children.add(proj);
return (Prel) prel.copy(prel.getTraitSet(), children);
}
@Override
public Prel visitProject(ProjectPrel prel, Void value) throws RuntimeException {
// Require prefix rename : there exists other expression, in addition to a star column.
if (!prefixedForStar // not set yet.
&& StarColumnHelper.containsStarColumnInProject(prel.getInput().getRowType(), prel.getProjects())
&& prel.getRowType().getFieldNames().size() > 1) {
prefixedForStar = true;
}
// For project, we need make sure that the project's field name is same as the input,
// when the project expression is RexInPutRef, since we may insert a PAS which will
// rename the projected fields.
Prel child = ((Prel) prel.getInput(0)).accept(this, null);
List<String> fieldNames = Lists.newArrayList();
for (Pair<String, RexNode> pair : Pair.zip(prel.getRowType().getFieldNames(), prel.getProjects())) {
if (pair.right instanceof RexInputRef) {
String name = child.getRowType().getFieldNames().get(((RexInputRef) pair.right).getIndex());
fieldNames.add(name);
} else {
fieldNames.add(pair.left);
}
}
// Make sure the field names are unique : no allow of duplicate field names in a rowType.
fieldNames = makeUniqueNames(fieldNames);
RelDataType rowType = RexUtil.createStructType(prel.getCluster().getTypeFactory(),
prel.getProjects(), fieldNames, null);
ProjectPrel newProj = (ProjectPrel) prel.copy(prel.getTraitSet(), child, prel.getProjects(), rowType);
if (ProjectRemoveRule.isTrivial(newProj)) {
return child;
} else {
return newProj;
}
}
@Override
public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
if (prel instanceof MetadataControllerPrel) {
// disallow renaming projections for analyze command
return prel;
}
// Require prefix rename : there exists other expression, in addition to a star column.
if (!prefixedForStar // not set yet.
&& StarColumnHelper.containsStarColumn(prel.getRowType())
&& prel.getRowType().getFieldNames().size() > 1) {
prefixedForStar = true;
}
List<RelNode> children = Lists.newArrayList();
for (Prel child : prel) {
child = child.accept(this, null);
children.add(child);
}
return (Prel) prel.copy(prel.getTraitSet(), children);
}
private Prel prefixTabNameToStar(Prel prel, Void value) throws RuntimeException {
if (StarColumnHelper.containsStarColumn(prel.getRowType()) && prefixedForStar) {
List<RexNode> exprs = Lists.newArrayList();
for (RelDataTypeField field : prel.getRowType().getFieldList()) {
RexNode expr = prel.getCluster().getRexBuilder().makeInputRef(field.getType(), field.getIndex());
exprs.add(expr);
}
List<String> fieldNames = Lists.newArrayList();
long tableId = tableNumber.getAndIncrement();
for (String name : prel.getRowType().getFieldNames()) {
if (StarColumnHelper.isNonPrefixedStarColumn(name)) {
fieldNames.add("T" + tableId + StarColumnHelper.PREFIX_DELIMITER + name); // Add prefix to * column.
} else {
fieldNames.add(name); // Keep regular column as it is.
}
}
RelDataType rowType = RexUtil.createStructType(prel.getCluster().getTypeFactory(),
exprs, fieldNames, null);
// insert a PAS.
ProjectPrel proj = new ProjectPrel(prel.getCluster(), prel.getTraitSet(), prel, exprs, rowType);
return proj;
} else {
return visitPrel(prel, value);
}
}
@Override
public Prel visitScan(ScanPrel scanPrel, Void value) throws RuntimeException {
return visitLeaf(scanPrel, value);
}
@Override
public Prel visitLeaf(LeafPrel prel, Void value) throws RuntimeException {
return prefixTabNameToStar(prel, value);
}
@Override
public Prel visitUnnest(UnnestPrel unnestPrel, Void value) throws RuntimeException {
return visitLeaf(unnestPrel, value);
}
private List<String> makeUniqueNames(List<String> names) {
// We have to search the set of original names, plus the set of unique names that will be used finally .
// Eg : the original names : ( C1, C1, C10 )
// There are two C1, we may rename C1 to C10, however, this new name will conflict with the original C10.
// That means we should pick a different name that does not conflict with the original names, in additional
// to make sure it's unique in the set of unique names.
HashSet<String> uniqueNames = new HashSet<>();
HashSet<String> origNames = new HashSet<>(names);
List<String> newNames = Lists.newArrayList();
for (String s : names) {
if (uniqueNames.contains(s)) {
for (int i = 0;; i++) {
s = s + i;
if (! origNames.contains(s) && ! uniqueNames.contains(s)) {
break;
}
}
}
uniqueNames.add(s);
newNames.add(s);
}
return newNames;
}
}