blob: 3f1dd21458788e88e56cf74d8836251b47d58030 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.optimizer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Stack;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.common.type.Date;
import org.apache.hadoop.hive.common.type.Timestamp;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree.Operator;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.SemanticNodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.PrunerOperatorFactory.FilterPruner;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import com.google.common.base.Preconditions;
/**
* Fixed bucket pruning optimizer goes through all the table scans and annotates them
* with a bucketing inclusion bit-set.
*/
public class FixedBucketPruningOptimizer extends Transform {
private final boolean compat;
public FixedBucketPruningOptimizer(boolean compat) {
this.compat = compat;
}
public class NoopWalker implements SemanticNodeProcessor {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
// do nothing
return null;
}
}
public static class BucketBitsetGenerator extends FilterPruner {
@Override
protected void generatePredicate(NodeProcessorCtx procCtx,
FilterOperator fop, TableScanOperator top) throws SemanticException{
FixedBucketPruningOptimizerCtxt ctxt = ((FixedBucketPruningOptimizerCtxt) procCtx);
Table tbl = top.getConf().getTableMetadata();
int numBuckets = tbl.getNumBuckets();
if (numBuckets <= 0 || tbl.getBucketCols().size() != 1) {
// bucketing isn't consistent or there are >1 bucket columns
// optimizer does not extract multiple column predicates for this
return;
}
if (tbl.isPartitioned()) {
// Make sure all the partitions have same bucket count.
PrunedPartitionList prunedPartList =
PartitionPruner.prune(top, ctxt.pctx, top.getConf().getAlias());
if (prunedPartList != null) {
for (Partition p : prunedPartList.getPartitions()) {
if (numBuckets != p.getBucketCount()) {
// disable feature
return;
}
}
}
}
ExprNodeGenericFuncDesc filter = top.getConf().getFilterExpr();
if (filter == null) {
return;
}
// the sargs are closely tied to hive.optimize.index.filter
SearchArgument sarg = ConvertAstToSearchArg.create(ctxt.pctx.getConf(), filter);
if (sarg == null) {
return;
}
final String bucketCol = tbl.getBucketCols().get(0);
StructField bucketField = null;
for (StructField fs : tbl.getFields()) {
if(fs.getFieldName().equals(bucketCol)) {
bucketField = fs;
break;
}
}
Preconditions.checkArgument(bucketField != null);
List<Object> literals = new ArrayList<Object>();
List<PredicateLeaf> leaves = sarg.getLeaves();
Set<PredicateLeaf> bucketLeaves = new HashSet<PredicateLeaf>();
for (PredicateLeaf l : leaves) {
if (bucketCol.equals(l.getColumnName())) {
switch (l.getOperator()) {
case EQUALS:
case IN:
// supported
break;
case IS_NULL:
// TODO: (a = 1) and NOT (a is NULL) can be potentially folded earlier into a NO-OP
// fall through
case BETWEEN:
// TODO: for ordinal types you can produce a range (BETWEEN 1444442100 1444442107)
// fall through
default:
// cannot optimize any others
return;
}
bucketLeaves.add(l);
}
}
if (bucketLeaves.size() == 0) {
return;
}
// TODO: Add support for AND clauses under OR clauses
// first-cut takes a known minimal tree and no others.
// $expr = (a=1)
// (a=1 or a=2)
// (a in (1,2))
// ($expr and *)
// (* and $expr)
ExpressionTree expr = sarg.getExpression();
if (expr.getOperator() == Operator.LEAF) {
PredicateLeaf l = leaves.get(expr.getLeaf());
if (!addLiteral(literals, l)) {
return;
}
} else if (expr.getOperator() == Operator.AND) {
boolean found = false;
for (ExpressionTree subExpr : expr.getChildren()) {
if (subExpr.getOperator() == Operator.LEAF) {
// one of the branches is definitely a bucket-leaf
PredicateLeaf l = leaves.get(subExpr.getLeaf());
if (bucketLeaves.contains(l)) {
if (!addLiteral(literals, l)) {
return;
}
found = true;
}
}
}
if (!found) {
return;
}
} else if (expr.getOperator() == Operator.OR) {
for (ExpressionTree subExpr : expr.getChildren()) {
if (subExpr.getOperator() != Operator.LEAF) {
return;
}
PredicateLeaf l = leaves.get(subExpr.getLeaf());
if (bucketLeaves.contains(l)) {
if (!addLiteral(literals, l)) {
return;
}
} else {
// all of the OR branches need to be bucket-leaves
return;
}
}
} else if (expr.getOperator() == Operator.NOT) {
// TODO: think we can handle NOT IS_NULL?
return;
}
// invariant: bucket-col IN literals of type bucketField
BitSet bs = new BitSet(numBuckets);
bs.clear();
PrimitiveObjectInspector bucketOI = (PrimitiveObjectInspector)bucketField.getFieldObjectInspector();
PrimitiveObjectInspector constOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(bucketOI.getTypeInfo());
// Fetch the bucketing version from table scan operator
int bucketingVersion = top.getConf().getTableMetadata().getBucketingVersion();
for (Object literal: literals) {
PrimitiveObjectInspector origOI = PrimitiveObjectInspectorFactory.getPrimitiveObjectInspectorFromClass(literal.getClass());
Converter conv = ObjectInspectorConverters.getConverter(origOI, constOI);
// exact type conversion or get out
if (conv == null) {
return;
}
Object convCols[] = new Object[] {conv.convert(literal)};
int n = bucketingVersion == 2 ?
ObjectInspectorUtils.getBucketNumber(convCols, new ObjectInspector[]{constOI}, numBuckets) :
ObjectInspectorUtils.getBucketNumberOld(convCols, new ObjectInspector[]{constOI}, numBuckets);
bs.set(n);
if (bucketingVersion == 1 && ctxt.isCompat()) {
int h = ObjectInspectorUtils.getBucketHashCodeOld(convCols, new ObjectInspector[]{constOI});
// -ve hashcodes had conversion to positive done in different ways in the past
// abs() is now obsolete and all inserts now use & Integer.MAX_VALUE
// the compat mode assumes that old data could've been loaded using the other conversion
n = ObjectInspectorUtils.getBucketNumber(Math.abs(h), numBuckets);
bs.set(n);
}
}
if (bs.cardinality() < numBuckets) {
// there is a valid bucket pruning filter
top.getConf().setIncludedBuckets(bs);
top.getConf().setNumBuckets(numBuckets);
}
}
private boolean addLiteral(List<Object> literals, PredicateLeaf leaf) {
switch (leaf.getOperator()) {
case EQUALS:
return literals.add(
convertLiteral(leaf.getLiteral()));
case IN:
return literals.addAll(
leaf.getLiteralList().stream().map(l -> convertLiteral(l)).collect(Collectors.toList()));
default:
return false;
}
}
private Object convertLiteral(Object o) {
// This is a bit hackish to fix mismatch between SARG and Hive types
// for Timestamp and Date. TODO: Move those types to storage-api.
if (o instanceof java.sql.Date) {
java.sql.Date sqlDate = (java.sql.Date)o;
return Date.ofEpochDay(
DateWritable.millisToDays(sqlDate.getTime()));
} else if (o instanceof java.sql.Timestamp) {
java.sql.Timestamp sqlTimestamp = (java.sql.Timestamp)o;
return Timestamp.ofEpochMilli(sqlTimestamp.getTime(), sqlTimestamp.getNanos());
}
return o;
}
}
public final class FixedBucketPruningOptimizerCtxt implements
NodeProcessorCtx {
public final ParseContext pctx;
private final boolean compat;
private int numBuckets;
private PrunedPartitionList partitions;
private List<String> bucketCols;
private List<StructField> schema;
public FixedBucketPruningOptimizerCtxt(boolean compat, ParseContext pctx) {
this.compat = compat;
this.pctx = pctx;
}
public void setSchema(ArrayList<StructField> fields) {
this.schema = fields;
}
public List<StructField> getSchema() {
return this.schema;
}
public void setBucketCols(List<String> bucketCols) {
this.bucketCols = bucketCols;
}
public List<String> getBucketCols() {
return this.bucketCols;
}
public void setPartitions(PrunedPartitionList partitions) {
this.partitions = partitions;
}
public PrunedPartitionList getPartitions() {
return this.partitions;
}
public int getNumBuckets() {
return numBuckets;
}
public void setNumBuckets(int numBuckets) {
this.numBuckets = numBuckets;
}
// compatibility mode enabled
public boolean isCompat() {
return this.compat;
}
}
@Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
// create a the context for walking operators
FixedBucketPruningOptimizerCtxt opPartWalkerCtx = new FixedBucketPruningOptimizerCtxt(compat,
pctx);
// walk operator tree to create expression tree for filter buckets
PrunerUtils.walkOperatorTree(pctx, opPartWalkerCtx,
new BucketBitsetGenerator(), new NoopWalker());
return pctx;
}
}