blob: d1d2548a48981820ea64649206bd9a16778b9500 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.metadata
import org.apache.flink.table.functions.sql.internal.{SqlRuntimeFilterBuilderFunction, SqlRuntimeFilterFunction}
import org.apache.flink.table.plan.metadata.SelectivityEstimator._
import org.apache.flink.table.plan.schema.{FlinkTable, TableSourceTable}
import org.apache.flink.table.plan.stats._
import org.apache.flink.table.plan.util.{FlinkRelOptUtil, FlinkRexUtil, PartitionPredicateExtractor, RexNodeExtractor}
import org.apache.flink.table.sources.PartitionableTableSource
import org.apache.calcite.avatica.util.DateTimeUtils
import org.apache.calcite.plan.{RelOptPredicateList, RelOptUtil}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFamily}
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.metadata.RelMdUtil
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeFamily
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
import org.apache.calcite.sql.{SqlKind, SqlOperator}
import org.apache.calcite.util.{ImmutableBitSet, TimeString}
import java.lang.{Double => JDouble}
import scala.collection.JavaConversions._
import scala.collection.mutable
/**
* Estimates selectivity of rows meeting a filter predicate on a RelNode.
*
* <p>For some unsupported cases, we use the default selectivity values
* referred to [[org.apache.calcite.rel.metadata.RelMdUtil.guessSelectivity]].
*
* <p><strong>NOTES:</strong>
* In current stage, we don't have advanced statistics such as sketches or histograms,
* we have to assume uniform distribution.
* We should update estimation methods in the future if advanced statistics is ready.
*
* <p>Part of the code is referred from Apache Spark.
*
* @param rel RelNode
* @param mq Metadata query
*/
class SelectivityEstimator(rel: RelNode, mq: FlinkRelMetadataQuery)
extends RexVisitorImpl[Option[Double]](true) {
private val rexBuilder = rel.getCluster.getRexBuilder
private val maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(rel)
// these default values is referred to RelMdUtil#guessSelectivity
private[flink] val defaultComparisonSelectivity = Some(0.5d)
private[flink] val defaultEqualsSelectivity = Some(0.15d)
private[flink] val defaultIsNullSelectivity = Some(0.1d)
private[flink] val defaultIsNotNullSelectivity = Some(1.0 - defaultIsNullSelectivity.get)
private[flink] val defaultLikeSelectivity = Some(0.05d)
private[flink] val defaultSelectivity = Some(0.25d)
/**
* Returns a percentage of rows meeting a filter predicate on TableScan node.
*
* @param predicate predicate whose selectivity is to be estimated against scan's output.
* @return estimated selectivity (between 0.0 and 1.0),
* or None if no reliable estimate can be determined.
*/
def evaluate(predicate: RexNode): Option[Double] = {
try {
if (predicate == null) {
Some(1.0)
} else {
val rexSimplify = new RexSimplify(
rexBuilder, RelOptPredicateList.EMPTY, true, RexUtil.EXECUTOR)
val simplifiedPredicate = rexSimplify.simplify(predicate)
if (simplifiedPredicate.isAlwaysTrue) {
Some(1.0)
} else if (simplifiedPredicate.isAlwaysFalse) {
Some(0.0)
} else {
simplifiedPredicate.accept(this)
}
}
} catch {
// if found unsupported operations, fallback
case _: Throwable => None
}
}
/**
* Get partition field names from PartitionableTableSource.
*
* @return partition field names if the TableSource in scan is PartitionableTableSource,
* otherwise None.
*/
private def getPartitionFieldNames(scan: TableScan): Option[Array[String]] = {
val table = scan.getTable.unwrap(classOf[TableSourceTable])
table match {
case t: TableSourceTable =>
t.tableSource match {
case p: PartitionableTableSource =>
Some(p.getPartitionFieldNames)
case _ => None
}
case _ => None
}
}
override def visitCall(call: RexCall): Option[Double] = {
call.getOperator match {
case AND =>
val predicates = splitAndPredicate(call)
// predicates.size == 1 indicates that
// the RexInputRef of each sub-condition in this AND call are the same.
if (predicates.size == 1) {
estimateAndPredicate(call)
} else {
val selectivity = predicates.map(estimateOperand)
Some(selectivity.product)
}
case OR =>
val predicates = splitOrPredicate(call)
if (predicates.size == 1) {
predicates.head match {
case c: RexCall if c.getOperator == IN => estimateSinglePredicate(c)
case _ => estimateOrPredicate(call)
}
} else {
val selectivity = predicates.map(estimateOperand)
Some(math.min(1.0, selectivity.sum - selectivity.product))
}
case NOT =>
val operands = call.getOperands
val selectivity = estimateOperand(operands.head)
Some(1.0 - selectivity)
case _ =>
estimateSinglePredicate(call)
}
}
private def estimateOperand(operand: RexNode): Double = {
val subSelectivity = operand.accept(this)
if (subSelectivity != null) subSelectivity.getOrElse(1.0) else 1.0
}
/**
* Returns a percentage of rows meeting a single condition in Filter node.
*
* We will do partition pruning with those supported partition-pruning predicates,
* and the rowCount in [[org.apache.flink.table.plan.stats.TableStats]] should exclude the
* rows of the pruned partitions. So we should ignore partition predicate here.
*
* @param singlePredicate predicate whose selectivity is to be estimated against scan's output.
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if the condition is not supported.
*/
private def estimateSinglePredicate(singlePredicate: RexCall): Option[Double] = {
if (isSupportedPartitionPredicate(singlePredicate)) {
return estimatePartitionPredicate(singlePredicate)
}
val operands = singlePredicate.getOperands
singlePredicate.getOperator match {
case EQUALS =>
estimateComparison(EQUALS, operands.head, operands.last)
case NOT_EQUALS =>
val selectivity = estimateComparison(EQUALS, operands.head, operands.last)
Some(1.0 - selectivity.getOrElse(1.0))
case GREATER_THAN =>
estimateComparison(GREATER_THAN, operands.head, operands.last)
case GREATER_THAN_OR_EQUAL =>
estimateComparison(GREATER_THAN_OR_EQUAL, operands.head, operands.last)
case LESS_THAN =>
estimateComparison(LESS_THAN, operands.head, operands.last)
case LESS_THAN_OR_EQUAL =>
estimateComparison(LESS_THAN_OR_EQUAL, operands.head, operands.last)
case IS_NULL =>
estimateIsNull(operands.head)
case IS_NOT_NULL =>
estimateIsNotNull(operands.head)
case IN =>
estimateIn(operands.head, operands.slice(1, operands.size))
case RelMdUtil.ARTIFICIAL_SELECTIVITY_FUNC =>
Option(RelMdUtil.getSelectivityValue(singlePredicate))
case LIKE =>
defaultLikeSelectivity
// RuntimeFilter should not break Cost's estimate.
// (Because it may be useless, just advance Join's filtering only).
case _: SqlRuntimeFilterBuilderFunction | _: SqlRuntimeFilterFunction =>
Some(1.0D)
case _ =>
defaultSelectivity
}
}
/**
* Decomposes a predicate into a list of expressions that are AND'ed together,
* and the expressions with same RexInputRefs will be converted into an AND.
* e.g.
* input predicate: `a > 10 AND b < 40.0 AND a < 20 AND CAST(b AS INTEGER) > 30`
* output expressions: `a > 10 AND a < 20` and `b < 40.0` and `CAST(b AS INTEGER) > 30`
*/
private def splitAndPredicate(call: RexCall): Seq[RexNode] = {
val cnf = FlinkRexUtil.toCnf(rexBuilder, maxCnfNodeCount, call)
val conjunctions = RelOptUtil.conjunctions(cnf)
val combinations = new mutable.HashMap[String, mutable.ListBuffer[RexNode]]()
val nonCombinations = new mutable.ListBuffer[RexNode]()
val visitor = new RexVisitorImpl[Unit](true) {
override def visitCall(call: RexCall): Unit = {
call.getOperator match {
case AND => throw new RuntimeException("This should not happen.")
// TODO supports more operators
case GREATER_THAN | GREATER_THAN_OR_EQUAL | LESS_THAN | LESS_THAN_OR_EQUAL =>
val (left, right) = (call.operands.get(0), call.operands.get(1))
(left, right) match {
case (i: RexInputRef, _: RexLiteral) =>
combinations.getOrElseUpdate(i.toString, mutable.ListBuffer[RexNode]()) += call
case (_: RexLiteral, i: RexInputRef) =>
combinations.getOrElseUpdate(i.toString, mutable.ListBuffer[RexNode]()) += call
case _ => nonCombinations += call
}
case _ => nonCombinations += call
}
}
}
conjunctions.foreach(_.accept(visitor))
val combined = combinations.values.map { r =>
RexUtil.composeConjunction(rexBuilder, r, false)
}.toSeq
combined ++ nonCombinations
}
/**
* Decomposes a predicate into a list of expressions that are OR'ed together,
* and the expressions with same RexInputRefs connected with EQUALS will be converted into an IN.
* e.g.
* input predicate: `a = 10 or a = 20 OR a > 40.0 OR a < 0 OR b > 30`
* output expressions: `a IN (10, 20)` and `(a > 40 OR a < 0)` and `b > 30`
*/
private def splitOrPredicate(call: RexCall): Seq[RexNode] = {
val combinations = new mutable.HashMap[String, mutable.ListBuffer[RexNode]]()
val nonCombinations = new mutable.ListBuffer[RexNode]()
val visitor = new RexVisitorImpl[Unit](true) {
override def visitCall(call: RexCall): Unit = {
call.getOperator match {
// TODO supports more operators
case GREATER_THAN | GREATER_THAN_OR_EQUAL | LESS_THAN | LESS_THAN_OR_EQUAL | EQUALS =>
val (left, right) = (call.operands.get(0), call.operands.get(1))
(left, right) match {
case (i: RexInputRef, _: RexLiteral) =>
combinations.getOrElseUpdate(i.toString, mutable.ListBuffer[RexNode]()) += call
case (l: RexLiteral, i: RexInputRef) =>
val newCall = if (call.getOperator == EQUALS) {
call.clone(call.getType, List(i, l))
} else {
call
}
combinations.getOrElseUpdate(i.toString, mutable.ListBuffer[RexNode]()) += newCall
case _ => nonCombinations += call
}
case _ => nonCombinations += call
}
}
}
val disjunctions = RelOptUtil.disjunctions(call)
disjunctions.foreach(_.accept(visitor))
val combined = combinations.values.flatMap { r =>
val (equalsNodes, nonEqualsNodes) = r.partition(_.asInstanceOf[RexCall].getOperator == EQUALS)
if (equalsNodes.size > 1) {
val inputRef = equalsNodes.head.asInstanceOf[RexCall].getOperands.head
val valuesNode = equalsNodes.map(_.asInstanceOf[RexCall].getOperands.last)
val inNode = rexBuilder.makeCall(IN, List(inputRef) ++ valuesNode)
if (nonEqualsNodes.isEmpty) {
Seq(inNode)
} else {
Seq(inNode, RexUtil.composeDisjunction(rexBuilder, nonEqualsNodes, false))
}
} else {
Seq(RexUtil.composeDisjunction(rexBuilder, r, false))
}
}.toSeq
combined ++ nonCombinations
}
/**
* Returns a percentage of rows meeting an AND condition with same RexInputRef in Filter node.
* Each sub-condition in AND is a binary comparison condition,
* and only binary comparison operator <, <=, >, >= are supported now.
*
* The condition is like: `a > 10 AND a < 20`
*
* @param andPredicate predicate whose selectivity is to be estimated against scan's output.
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if the condition is not supported.
*/
private def estimateAndPredicate(andPredicate: RexCall): Option[Double] = {
val cnf = FlinkRexUtil.toCnf(rexBuilder, maxCnfNodeCount, andPredicate)
val conjunctions = RelOptUtil.conjunctions(cnf)
val inputRefs = mutable.HashSet[Int]()
val supported = conjunctions.forall {
case c: RexCall =>
c.getOperator match {
case GREATER_THAN | GREATER_THAN_OR_EQUAL | LESS_THAN | LESS_THAN_OR_EQUAL =>
val (left, right) = (c.operands.get(0), c.operands.get(1))
(left, right) match {
case (i: RexInputRef, _: RexLiteral) => inputRefs += i.getIndex
case (_: RexLiteral, i: RexInputRef) => inputRefs += i.getIndex
case _ => throw new RuntimeException("This should not happen.")
}
// TODO support partition predicate and non-numeric type
!isSupportedPartitionPredicate(c) &&
canConvertToNumericType(left.getType) &&
canConvertToNumericType(right.getType)
case _ => throw new RuntimeException("This should not happen.")
}
case _ => throw new RuntimeException("This should not happen.")
}
require(inputRefs.size == 1)
if (!supported) {
val selectivity = conjunctions.map(estimateOperand)
return Some(selectivity.product)
}
val predicateIntervals = conjunctions.map {
case c: RexCall =>
val (left, right) = (c.operands.get(0), c.operands.get(1))
c.getOperator match {
case GREATER_THAN =>
(left, right) match {
case (i: RexInputRef, l: RexLiteral) =>
ValueInterval(literalToDouble(l), null, includeLower = false, includeUpper = false)
case (l: RexLiteral, _: RexInputRef) =>
ValueInterval(null, literalToDouble(l), includeLower = false, includeUpper = false)
case _ => throw new RuntimeException("This should not happen.")
}
case GREATER_THAN_OR_EQUAL =>
(left, right) match {
case (_: RexInputRef, l: RexLiteral) =>
ValueInterval(literalToDouble(l), null, includeLower = true, includeUpper = false)
case (l: RexLiteral, _: RexInputRef) =>
ValueInterval(null, literalToDouble(l), includeLower = false, includeUpper = true)
case _ => throw new RuntimeException("This should not happen.")
}
case LESS_THAN =>
(left, right) match {
case (_: RexInputRef, l: RexLiteral) =>
ValueInterval(null, literalToDouble(l), includeLower = false, includeUpper = false)
case (l: RexLiteral, _: RexInputRef) =>
ValueInterval(literalToDouble(l), null, includeLower = false, includeUpper = false)
case _ => throw new RuntimeException("This should not happen.")
}
case LESS_THAN_OR_EQUAL =>
(left, right) match {
case (_: RexInputRef, l: RexLiteral) =>
ValueInterval(null, literalToDouble(l), includeLower = false, includeUpper = true)
case (l: RexLiteral, _: RexInputRef) =>
ValueInterval(literalToDouble(l), null, includeLower = true, includeUpper = false)
case _ => throw new RuntimeException("This should not happen.")
}
case _ => throw new RuntimeException("This should not happen.")
}
case _ => throw new RuntimeException("This should not happen.")
}
estimateAndNumericComparison(inputRefs.head, predicateIntervals)
}
/**
* Returns a percentage of rows meeting an AND condition and
* each sub-condition is a binary numeric comparison expression.
* This method evaluate expression for Numeric/Boolean/Date/Time/Timestamp columns.
*/
private def estimateAndNumericComparison(
inputRefIndex: Int,
intervals: Seq[ValueInterval]): Option[Double] = {
require(intervals.nonEmpty)
val default = Some(math.pow(defaultComparisonSelectivity.get, intervals.size))
val columnInterval = mq.getColumnInterval(rel, inputRefIndex)
if (columnInterval == null) {
return default
}
// convert values in columnInterval to double type
val convertedInterval = convertValueInterval(columnInterval, SqlTypeFamily.NUMERIC)
val andInterval = intervals.foldLeft(convertedInterval)(ValueInterval.intersect)
columnInterval match {
case ValueInterval.empty => Some(0.0)
case ValueInterval.infinite => default
case _: LeftSemiInfiniteValueInterval => default
case _: RightSemiInfiniteValueInterval => default
case v: FiniteValueInterval =>
andInterval match {
case ValueInterval.empty => Some(0.0)
case lv: FiniteValueInterval =>
// TODO not take include min/max into consideration now
val (min, max) = (comparableToDouble(v.lower), comparableToDouble(v.upper))
val (litMin, litMax) = (comparableToDouble(lv.lower), comparableToDouble(lv.upper))
Some((litMax - litMin) / (max - min))
case _ =>
// predicate like `a > 10 AND a > 20` had been simplified in `evaluate` method
throw new RuntimeException("This should not happen.")
}
}
}
/**
* Returns a percentage of rows meeting an OR condition with same RexInputRef in Filter node.
* Each sub-condition in OR is a binary comparison condition,
* and only binary comparison operator <, <=, >, >= are supported now.
*
* The condition is like: `a > 20 OR a < 10`
*
* @param orPredicate predicate whose selectivity is to be estimated against scan's output.
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if the condition is not supported.
*/
private def estimateOrPredicate(orPredicate: RexCall): Option[Double] = {
val disjunctions = RelOptUtil.disjunctions(orPredicate)
val result = disjunctions.map(estimateOperand)
Some(math.min(1.0, result.sum))
}
/**
* Returns whether a predicate is supported partition-pruning predicate.
*
* @param singlePredicate predicate whose selectivity is to be estimated against scan's output.
* @return true if the TableSource in scan is PartitionableTableSource and the columns in the
* predicate are all partition columns, otherwise false.
*/
private def isSupportedPartitionPredicate(singlePredicate: RexCall): Boolean = {
rel match {
case ts: TableScan =>
val partitionFieldNames = getPartitionFieldNames(ts)
partitionFieldNames match {
// partitionable table
case Some(names) if names.nonEmpty =>
val (convertedPredicates, unconvertedRexNodes) =
RexNodeExtractor.extractConjunctiveConditions(
singlePredicate,
maxCnfNodeCount,
ts.getRowType.getFieldNames,
ts.getCluster.getRexBuilder,
FlinkRelOptUtil.getFunctionCatalog(ts))
val (_, nonPartitionPredicates) =
PartitionPredicateExtractor.extractPartitionPredicates(convertedPredicates, names)
unconvertedRexNodes.isEmpty && nonPartitionPredicates.isEmpty
// non-partitionable table
case _ => false
}
case _ => false
}
}
/**
* Returns a percentage of rows meeting a partition expression.
*
* @param partitionPredicate partition predicate whose selectivity is to be estimated against
* scan's output.
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if the condition is not supported.
*/
private def estimatePartitionPredicate(partitionPredicate: RexCall): Option[Double] = {
assert(this.rel.isInstanceOf[TableScan])
val table = rel.asInstanceOf[TableScan].getTable.unwrap(classOf[TableSourceTable])
val partitionableTableSource = table match {
case t: TableSourceTable =>
t.tableSource match {
case p: PartitionableTableSource => p
case _ => throw new RuntimeException("This should not happen.")
}
case _ => throw new RuntimeException("This should not happen.")
}
// TODO deal with filter partial push-down
if (partitionableTableSource.isFilterPushedDown) {
// Ignore any predicates on partition columns because we have already
// accounted for these in the Table row count.
Some(1.0)
} else {
// We do not use PartitionableTableSource#getAllPartitions method to get the number of
// partitions, because this method may involve some heavy work(e.g. reads metadata using I/O).
// We assume the number of partitions is 100 here.
val numOfPartitions = 100
partitionPredicate.getOperator match {
case EQUALS => Some(1.0 / numOfPartitions)
case NOT_EQUALS => Some(1.0 - 1.0 / numOfPartitions)
case IS_NULL => Some(0.0)
case IS_NOT_NULL => Some(1.0)
case IN =>
val inSet = partitionPredicate.getOperands.subList(1, partitionPredicate.getOperands.size)
checkInSet(inSet)
val sizeOfInSet = inSet.size()
Some(Math.min(1.0, sizeOfInSet * 1.0 / numOfPartitions))
case _ => Some(5 * (1.0 / numOfPartitions))
}
}
}
/**
* Returns a percentage of rows meeting a binary comparison expression containing two columns.
*
* We will use default comparison selectivity(0.5), in the following cases:
* 1. the type is not supported
* 2. the operator is not supported
* 3. the specified column's stats is empty
*
* @param op a binary comparison operator, including =, <=>, <, <=, >, >=
* @param left the left RexInputRef
* @param right the right RexInputRef
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if no statistics collected for a given column.
*/
private def estimateComparison(op: SqlOperator, left: RexNode, right: RexNode): Option[Double] = {
if (!isSupportedComparisonType(left.getType) || !isSupportedComparisonType(right.getType)) {
val default = op match {
case EQUALS => defaultEqualsSelectivity
case _ => defaultComparisonSelectivity
}
return default
}
op match {
case EQUALS => (left, right) match {
case (i: RexInputRef, l: RexLiteral) => estimateEquals(i, l)
case (l: RexLiteral, i: RexInputRef) => estimateEquals(i, l)
case (l: RexInputRef, r: RexInputRef) => estimateComparison(EQUALS, l, r)
case _ => defaultEqualsSelectivity
}
case LESS_THAN => (left, right) match {
case (i: RexInputRef, l: RexLiteral) => estimateComparison(LESS_THAN, i, l)
case (l: RexLiteral, i: RexInputRef) => estimateComparison(GREATER_THAN, i, l)
case (l: RexInputRef, r: RexInputRef) => estimateComparison(LESS_THAN, l, r)
case _ => defaultComparisonSelectivity
}
case LESS_THAN_OR_EQUAL => (left, right) match {
case (i: RexInputRef, l: RexLiteral) => estimateComparison(LESS_THAN_OR_EQUAL, i, l)
case (l: RexLiteral, i: RexInputRef) => estimateComparison(GREATER_THAN_OR_EQUAL, i, l)
case (l: RexInputRef, r: RexInputRef) => estimateComparison(LESS_THAN_OR_EQUAL, l, r)
case _ => defaultComparisonSelectivity
}
case GREATER_THAN => (left, right) match {
case (i: RexInputRef, l: RexLiteral) => estimateComparison(GREATER_THAN, i, l)
case (l: RexLiteral, i: RexInputRef) => estimateComparison(LESS_THAN, i, l)
case (l: RexInputRef, r: RexInputRef) => estimateComparison(GREATER_THAN, l, r)
case _ => defaultComparisonSelectivity
}
case GREATER_THAN_OR_EQUAL => (left, right) match {
case (i: RexInputRef, l: RexLiteral) => estimateComparison(GREATER_THAN_OR_EQUAL, i, l)
case (l: RexLiteral, i: RexInputRef) => estimateComparison(LESS_THAN_OR_EQUAL, i, l)
case (l: RexInputRef, r: RexInputRef) => estimateComparison(GREATER_THAN_OR_EQUAL, l, r)
case _ => defaultComparisonSelectivity
}
case _ => defaultComparisonSelectivity
}
}
/**
* Returns a percentage of rows meeting an equality (=) expression.
*
* @param inputRef a RexInputRef
* @param literal a literal value (or constant)
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if no statistics collected for a given column.
*/
private def estimateEquals(inputRef: RexInputRef, literal: RexLiteral): Option[Double] = {
if (literal.isNull) {
return estimateIsNull(inputRef)
}
val columnInterval = mq.getColumnInterval(rel, inputRef.getIndex)
if (columnInterval == null) {
return defaultEqualsSelectivity
}
val convertedInterval = convertValueInterval(columnInterval, inputRef.getType)
convertedInterval match {
case ValueInterval.infinite => defaultEqualsSelectivity
case ValueInterval.empty => Some(0.0)
case _ =>
if (ValueInterval.contains(convertedInterval, literalToComparable(literal))) {
val bitSetOfInputRef = ImmutableBitSet.of(inputRef.getIndex)
val ndv = mq.getDistinctRowCount(rel, bitSetOfInputRef, null)
if (ndv == null) {
defaultEqualsSelectivity
} else {
// there is no possible that ndv < 1.0 because of the protection in RelMetadataQuery.
Some(1.0 / ndv)
}
} else {
Some(0.0)
}
}
}
/**
* Returns a percentage of rows meeting a binary comparison expression.
*
* @param op a binary comparison operator, including <, <=, >, >=
* @param inputRef a RexInputRef
* @param literal a literal value (or constant)
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if no statistics collected for a given column.
*/
private def estimateComparison(
op: SqlOperator,
inputRef: RexInputRef,
literal: RexLiteral): Option[Double] = {
if (literal.isNull) {
throw new IllegalArgumentException("Numeric comparison does not support null literal here.")
}
if (canConvertToNumericType(inputRef.getType)) {
estimateNumericComparison(op, inputRef, literal)
} else {
// TODO: It is difficult to support binary comparisons for non-numeric type
// without advanced statistics like histogram.
defaultComparisonSelectivity
}
}
/**
* Returns a percentage of rows meeting a binary numeric comparison expression.
* This method evaluate expression for Numeric/Boolean/Date/Time/Timestamp columns.
*
* @param op a binary comparison operator, including <, <=, >, >=
* @param inputRef a RexInputRef
* @param literal a literal value (or constant)
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if no statistics collected for a given column.
*/
private def estimateNumericComparison(
op: SqlOperator,
inputRef: RexInputRef,
literal: RexLiteral): Option[Double] = {
val columnInterval = mq.getColumnInterval(rel, inputRef.getIndex)
if (columnInterval == null) {
return defaultComparisonSelectivity
}
columnInterval match {
case ValueInterval.infinite => defaultComparisonSelectivity
case ValueInterval.empty => Some(0.0)
case _ =>
val (min, includeMin) = columnInterval match {
case hasLower: WithLower => (comparableToDouble(hasLower.lower), hasLower.includeLower)
case _ => (null, true)
}
val (max, includeMax) = columnInterval match {
case hasUpper: WithUpper => (comparableToDouble(hasUpper.upper), hasUpper.includeUpper)
case _ => (null, true)
}
val lit = literalToDouble(literal)
val (noOverlap, completeOverlap) = op match {
case LESS_THAN => (
greaterThanOrEqualTo(min, lit),
if (includeMax) lessThan(max, lit) else lessThanOrEqualTo(max, lit))
case LESS_THAN_OR_EQUAL => (
if (includeMin) greaterThan(min, lit) else greaterThanOrEqualTo(min, lit),
lessThanOrEqualTo(max, lit))
case GREATER_THAN => (
lessThanOrEqualTo(max, lit),
if (includeMin) greaterThan(min, lit) else greaterThanOrEqualTo(min, lit))
case GREATER_THAN_OR_EQUAL => (
if (includeMax) lessThan(max, lit) else lessThanOrEqualTo(max, lit),
greaterThanOrEqualTo(min, lit))
}
val selectivity = if (noOverlap) {
0.0
} else if (completeOverlap) {
1.0
} else if (min != null && max != null) {
val bitSetOfInputRef = ImmutableBitSet.of(inputRef.getIndex)
val ndv = mq.getDistinctRowCount(rel, bitSetOfInputRef, null)
op match {
// there is no possible that ndv < 1.0 because of the protection in RelMetadataQuery.
case LESS_THAN =>
if (doubleEquals(lit, max)) {
if (includeMax) {
Option(ndv).map(v => 1.0 - (1.0 / v))
.getOrElse(defaultComparisonSelectivity.get)
} else {
1.0
}
} else {
// TODO not take includeMin into consideration now
(lit - min) / (max - min)
}
case LESS_THAN_OR_EQUAL =>
if (doubleEquals(lit, min)) {
if (includeMin) {
// The boundary value is the only satisfying value.
Option(ndv).map(1.0 / _).getOrElse(defaultComparisonSelectivity.get)
} else {
0.0
}
} else {
// TODO not take includeMax into consideration now
(lit - min) / (max - min)
}
case GREATER_THAN =>
if (doubleEquals(lit, min)) {
if (includeMin) {
Option(ndv).map(v => 1.0 - (1.0 / v))
.getOrElse(defaultComparisonSelectivity.get)
} else {
1.0
}
} else {
// TODO not take includeMin into consideration now
(max - lit) / (max - min)
}
case GREATER_THAN_OR_EQUAL =>
if (doubleEquals(lit, max)) {
if (includeMax) {
Option(ndv).map(1.0 / _).getOrElse(defaultComparisonSelectivity.get)
} else {
0.0
}
} else {
// TODO not take includeMax into consideration now
(max - lit) / (max - min)
}
}
} else {
defaultComparisonSelectivity.get
}
Some(selectivity)
}
}
/**
* Returns a percentage of rows meeting a binary comparison expression containing two columns.
* In SQL queries, we also see predicate expressions involving two columns
* such as "column-1 (op) column-2" where column-1 and column-2 belong to same table.
* Note that, if column-1 and column-2 belong to different tables, then it is a join
* operator's work, NOT a filter operator's work.
*
* @param op a binary comparison operator, including =, <, <=, >, >=
* @param left the left RexInputRef
* @param right the right RexInputRef
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if no statistics collected for a given column.
*/
private def estimateComparison(
op: SqlOperator,
left: RexInputRef,
right: RexInputRef): Option[Double] = {
if (canConvertToNumericType(left.getType) && canConvertToNumericType(right.getType)) {
estimateNumericComparison(op, left, right)
} else {
// TODO: It is difficult to support binary comparisons for non-numeric type
// without advanced statistics like histogram.
op match {
case EQUALS => defaultEqualsSelectivity
case _ => defaultComparisonSelectivity
}
}
}
/**
* Returns a percentage of rows meeting a binary numeric comparison expression
* containing two columns.
* This method evaluate expression for Numeric/Boolean/Date/Time/Timestamp columns.
*
* @param op a binary comparison operator, including =, <, <=, >, >=
* @param left the left RexInputRef
* @param right the right RexInputRef
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if no statistics collected for a given column.
*/
private def estimateNumericComparison(
op: SqlOperator,
left: RexInputRef,
right: RexInputRef): Option[Double] = {
val selectivityWithoutStats = op match {
case EQUALS => defaultEqualsSelectivity
case _ => defaultComparisonSelectivity
}
val leftInterval = mq.getColumnInterval(this.rel, left.getIndex)
val rightInterval = mq.getColumnInterval(this.rel, right.getIndex)
if (leftInterval == null ||
leftInterval == ValueInterval.infinite ||
rightInterval == null ||
rightInterval == ValueInterval.infinite) {
return selectivityWithoutStats
} else if (leftInterval == ValueInterval.empty || rightInterval == ValueInterval.empty) {
return Some(0.0)
}
val leftNullCount = mq.getColumnNullCount(this.rel, left.getIndex)
val rightNullCount = mq.getColumnNullCount(this.rel, right.getIndex)
if (leftNullCount == null || rightNullCount == null) {
return selectivityWithoutStats
}
val bitSetOfLeftInputRef = ImmutableBitSet.of(left.getIndex)
val leftNdv = mq.getDistinctRowCount(this.rel, bitSetOfLeftInputRef, null)
val bitSetOfRightInputRef = ImmutableBitSet.of(right.getIndex)
val rightNdv = mq.getDistinctRowCount(this.rel, bitSetOfRightInputRef, null)
if (op.equals(EQUALS) &&
(!(leftInterval.isInstanceOf[FiniteValueInterval]
&& rightInterval.isInstanceOf[FiniteValueInterval])) ||
leftNdv == null || rightNdv == null) {
return defaultEqualsSelectivity
}
// left interval
val (leftMin, leftIncludeMin) = leftInterval match {
case hasLower: WithLower => (comparableToDouble(hasLower.lower), hasLower.includeLower)
case _ => (null, true)
}
val (leftMax, leftIncludeMax) = leftInterval match {
case hasUpper: WithUpper => (comparableToDouble(hasUpper.upper), hasUpper.includeUpper)
case _ => (null, true)
}
val (rightMin, rightIncludeMin) = rightInterval match {
case hasLower: WithLower => (comparableToDouble(hasLower.lower), hasLower.includeLower)
case _ => (null, true)
}
val (rightMax, rightIncludeMax) = rightInterval match {
case hasUpper: WithUpper => (comparableToDouble(hasUpper.upper), hasUpper.includeUpper)
case _ => (null, true)
}
// determine the overlapping degree between predicate interval and column's interval
val (noOverlap, completeOverlap) = op match {
// Left < Right or Left <= Right
// - no overlap:
// rightMin rightMax leftMin leftMax
// --------+------------------+------------+-------------+------->
// - complete overlap: (If null values exists, we set it to partial overlap.)
// leftMin leftMax rightMin rightMax
// --------+------------------+------------+-------------+------->
case LESS_THAN =>
(
greaterThanOrEqualTo(leftMin, rightMax),
if (leftIncludeMax && rightIncludeMin) {
lessThan(leftMax, rightMin)
} else {
lessThanOrEqualTo(leftMax, rightMin)
})
case LESS_THAN_OR_EQUAL =>
(
if (leftIncludeMin && rightIncludeMax) {
greaterThan(leftMin, rightMax)
} else {
greaterThanOrEqualTo(leftMin, rightMax)
},
lessThanOrEqualTo(leftMax, rightMin))
// Left > Right or Left >= Right
// - no overlap:
// leftMin leftMax rightMin rightMax
// --------+------------------+------------+-------------+------->
// - complete overlap: (If null values exists, we set it to partial overlap.)
// rightMin rightMax leftMin leftMax
// --------+------------------+------------+-------------+------->
case GREATER_THAN =>
(
lessThanOrEqualTo(leftMax, rightMin),
if (leftIncludeMin && rightIncludeMax) {
greaterThan(leftMin, rightMax)
} else {
greaterThanOrEqualTo(leftMin, rightMax)
})
case GREATER_THAN_OR_EQUAL =>
(
if (leftIncludeMax && rightIncludeMin) {
lessThan(leftMax, rightMin)
} else {
lessThanOrEqualTo(leftMax, rightMin)
},
greaterThanOrEqualTo(leftMin, rightMax))
// Left = Right
// - no overlap:
// leftMin leftMax rightMin rightMax
// --------+------------------+------------+-------------+------->
// rightMin rightMax leftMin leftMax
// --------+------------------+------------+-------------+------->
// - complete overlap:
// leftMin leftMin
// rightMin rightMax
// --------+------------------+------->
case EQUALS =>
// now, min/max/ndv is not null and ndv >= 0
val isLeftSmallerThanRight = if (leftIncludeMax && rightIncludeMin) {
leftMax < rightMin
} else {
leftMax <= rightMin
}
val isLeftBiggerThanRight = if (rightIncludeMax && leftIncludeMin) {
rightMax < leftMin
} else {
rightMax <= leftMin
}
(
isLeftSmallerThanRight || isLeftBiggerThanRight,
(leftMin == rightMin) && (leftMax == rightMax)
&& (leftIncludeMin == rightIncludeMin)
&& (leftIncludeMax == rightIncludeMax)
&& (leftNdv == rightNdv))
}
val allNotNull = (leftNullCount == 0) && (rightNullCount == 0)
val selectivity = if (noOverlap) {
0.0
} else if (completeOverlap && allNotNull) {
1.0
} else {
// For partial overlap, we use an empirical value 1/3 as suggested by the book
// "Database Systems, the complete book".
1.0 / 3.0
}
Some(selectivity)
}
/**
* Returns a percentage of rows meeting "IS NULL" condition.
*
* We will use default is_null selectivity(0.1), if the column stats is empty.
*
* @param input a RexNode
* @return an optional double value to show the percentage of rows meeting a given condition
* It returns None if no statistics collected for a given column.
*/
private def estimateIsNull(input: RexNode): Option[Double] = {
val inputRef = convertToRexInputRef(input)
val nullCount = mq.getColumnNullCount(this.rel, inputRef.getIndex)
if (nullCount == null) {
return defaultIsNullSelectivity
}
val rowCount = mq.getRowCount(this.rel)
if (rowCount == null) {
defaultIsNullSelectivity
} else {
// there is no possible that rowCount is 0 because of the protection in RelMetadataQuery.
Some(Math.min(nullCount / rowCount, 1.0))
}
}
/**
* Returns a percentage of rows meeting "IS NOT NULL" condition.
*
* We will use default is_not_null selectivity(0.9), if the column stats is empty.
*
* @param input a RexNode
* @return an optional double value to show the percentage of rows meeting a given condition
* It returns None if no statistics collected for a given column.
*/
private def estimateIsNotNull(input: RexNode): Option[Double] = {
val inputRef = convertToRexInputRef(input)
val nullCount = mq.getColumnNullCount(this.rel, inputRef.getIndex)
if (nullCount == null) {
return defaultIsNotNullSelectivity
}
val rowCount = mq.getRowCount(this.rel)
val selectivity = if (rowCount == 0) {
0.0
} else {
1.0 - (nullCount / rowCount)
}
Some(selectivity)
}
/**
* Returns a percentage of rows meeting "IN" operator expression.
*
* @param input a RexNode
* @param inSet a set of literal values
* @return an optional double value to show the percentage of rows meeting a given condition
* It returns None if no statistics exists for a given column.
*/
private def estimateIn(input: RexNode, inSet: Seq[RexNode]): Option[Double] = {
val inputRef = convertToRexInputRef(input)
checkInSet(inSet)
val defaultInSelectivity = Some(Math.min(1.0, defaultEqualsSelectivity.get * inSet.size))
if (!isSupportedComparisonType(input.getType)) {
return defaultInSelectivity
}
val columnInterval = mq.getColumnInterval(this.rel, inputRef.getIndex)
if (columnInterval == null) {
val ndv = mq.getDistinctRowCount(rel, ImmutableBitSet.of(inputRef.getIndex), null)
return if (ndv == null) {
defaultInSelectivity
} else {
Some(inSet.size / ndv)
}
}
val interval = convertValueInterval(columnInterval, inputRef.getType)
interval match {
case ValueInterval.infinite => defaultInSelectivity
case ValueInterval.empty => Some(0.0)
case _ =>
val validInSet = inSet.filter(
v => ValueInterval.contains(interval, literalToComparable(v.asInstanceOf[RexLiteral])))
if (validInSet.isEmpty) {
Some(0.0)
} else {
val bitSetOfInputRef = ImmutableBitSet.of(inputRef.getIndex)
val ndv = mq.getDistinctRowCount(rel, bitSetOfInputRef, null)
if (ndv == null) {
defaultInSelectivity
} else {
// there is no possible that ndv < 1.0 because of the protection in RelMetadataQuery.
// newNdv should not be greater than the old ndv. For example, column has only 2 values
// 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validSet.size is 5.
val newNdv = Math.min(ndv, validInSet.size)
// return the filter selectivity. Without advanced statistics such as histograms,
// we have to assume uniform distribution.
Some(Math.min(1.0, newNdv * 1.0 / ndv))
}
}
}
}
/**
* Checks whether all values are literal. If contains non-literal values, throw an exception.
*/
private def checkInSet(inSet: Seq[RexNode]): Unit = {
if (inSet.exists(!_.isA(SqlKind.LITERAL))) {
throw new IllegalArgumentException(s"${
inSet.mkString(",")
} contain some non-literal values.")
}
}
/**
* If a rexNode is RexInputRef type, convert it to RexInputRef; else throw an exception.
*/
private def convertToRexInputRef(rexNode: RexNode): RexInputRef = {
rexNode match {
case i: RexInputRef => i
case _ => throw new IllegalArgumentException(s"$rexNode is not a RexInputRef.")
}
}
private def doubleEquals(v1: Double, v2: Double): Boolean = Math.abs(v1 - v2) < 1e-6
}
object SelectivityEstimator {
/**
* Convert ValueInterval,
* for Numeric/Boolean/Date/Time/Timestamp, min/max will be converted to Double,
* for CHARACTER, min/max will be converted to String.
*/
def convertValueInterval(interval: ValueInterval, dataType: RelDataType): ValueInterval = {
require(interval != null && dataType != null)
convertValueInterval(interval, dataType.getFamily)
}
/**
* Convert ValueInterval,
* for Numeric/Boolean/Date/Time/Timestamp, min/max will be converted to Double,
* for CHARACTER, min/max will be converted to String.
*/
def convertValueInterval(
interval: ValueInterval,
typeFamily: RelDataTypeFamily): ValueInterval = {
require(interval != null && typeFamily != null)
interval match {
case ValueInterval.empty | ValueInterval.infinite => interval
case _ =>
val (lower, includeLower) = interval match {
case li: WithLower => (li.lower, li.includeLower)
case _ => (null, false)
}
val (upper, includeUpper) = interval match {
case ui: WithUpper => (ui.upper, ui.includeUpper)
case _ => (null, false)
}
typeFamily match {
case SqlTypeFamily.NUMERIC | SqlTypeFamily.BOOLEAN | SqlTypeFamily.DATE |
SqlTypeFamily.TIME | SqlTypeFamily.TIMESTAMP =>
ValueInterval(
comparableToDouble(lower),
comparableToDouble(upper),
includeLower,
includeUpper)
case SqlTypeFamily.CHARACTER =>
ValueInterval(
lower.toString,
upper.toString,
includeLower,
includeUpper)
case _ => throw new UnsupportedOperationException(s"Unsupported typeFamily: $typeFamily")
}
}
}
/**
* Get literal value as Comparable.
*
* The class may be different between literal value and min/max value,
* and different classes can't be compared. So
* for Numeric/Boolean/Date/Time/Timestamp, literal will be converted to Double uniformly,
* for CHARACTER, literal will be converted to String.
*
* e.g. min/max values are Double: min=10.0 and max=100.0,
* however the literal values are Integer: select * from tb where a in (20, 30, 40)
*/
def literalToComparable(literal: RexLiteral): Comparable[_] = {
if (!literal.isNull) {
literal.getType.getFamily match {
case SqlTypeFamily.NUMERIC | SqlTypeFamily.BOOLEAN | SqlTypeFamily.DATE |
SqlTypeFamily.TIME | SqlTypeFamily.TIMESTAMP => literalToDouble(literal)
case SqlTypeFamily.CHARACTER => literal.getValueAs(classOf[String])
case _ => throw new UnsupportedOperationException(
s"Can't get value as comparable from literal type: ${literal.getType}")
}
} else {
null
}
}
def comparableToDouble(value: Any): JDouble = {
value match {
case null => null
case v: Number => v.doubleValue()
case v: Boolean => if (v) 1.0 else 0.0
// Returns days since 1970-01-01 for Date type value
case v: java.sql.Date => JDouble.valueOf(v.getTime / DateTimeUtils.MILLIS_PER_DAY)
// Returns milliseconds since 1970-01-01 00:00:00 for Timestamp type value
case v: java.sql.Timestamp => JDouble.valueOf(v.getTime)
// Returns milliseconds since midnight for Time type value
case v: java.sql.Time => JDouble.valueOf(new TimeString(v.toString).getMillisOfDay)
case _ =>
throw new UnsupportedOperationException(
s"Can't convert comparable type: ${value.getClass} to Double")
}
}
def literalToDouble(literal: RexLiteral): JDouble = {
if (!literal.isNull) {
literal.getType.getFamily match {
case SqlTypeFamily.NUMERIC =>
literal.getValue3.toString.toDouble
case SqlTypeFamily.BOOLEAN =>
if (RexLiteral.booleanValue(literal)) 1.0 else 0.0
case SqlTypeFamily.DATE | SqlTypeFamily.TIME | SqlTypeFamily.TIMESTAMP =>
literal.getValue2.toString.toDouble
case _ =>
throw new UnsupportedOperationException(
s"Can't get value as Double from literal type: ${literal.getType}")
}
} else {
null
}
}
def lessThan(left: Comparable[_], right: Comparable[_]): Boolean = {
if (left == null || right == null) {
false
} else {
left.asInstanceOf[Comparable[Any]].compareTo(right.asInstanceOf[Comparable[Any]]) < 0
}
}
def lessThanOrEqualTo(left: Comparable[_], right: Comparable[_]): Boolean = {
if (left == null || right == null) {
false
} else {
left.asInstanceOf[Comparable[Any]].compareTo(right.asInstanceOf[Comparable[Any]]) <= 0
}
}
def greaterThan(left: Comparable[_], right: Comparable[_]): Boolean = {
if (left == null || right == null) {
false
} else {
left.asInstanceOf[Comparable[Any]].compareTo(right.asInstanceOf[Comparable[Any]]) > 0
}
}
def greaterThanOrEqualTo(left: Comparable[_], right: Comparable[_]): Boolean = {
if (left == null || right == null) {
false
} else {
left.asInstanceOf[Comparable[Any]].compareTo(right.asInstanceOf[Comparable[Any]]) >= 0
}
}
/**
* Currently, Numeric/Boolean/String/Date/Time/Timestamp are supported.
*/
def isSupportedComparisonType(relType: RelDataType): Boolean = {
relType.getFamily match {
case SqlTypeFamily.NUMERIC | SqlTypeFamily.BOOLEAN | SqlTypeFamily.CHARACTER |
SqlTypeFamily.DATE | SqlTypeFamily.TIME | SqlTypeFamily.TIMESTAMP => true
case _ => false
}
}
/**
* Currently, value of Numeric/Boolean/Date/Time/Timestamp column can be converted to a number.
*/
def canConvertToNumericType(relType: RelDataType): Boolean = {
relType.getFamily match {
case SqlTypeFamily.NUMERIC | SqlTypeFamily.BOOLEAN |
SqlTypeFamily.DATE | SqlTypeFamily.TIME | SqlTypeFamily.TIMESTAMP => true
case _ => false
}
}
}