blob: d1d2548a48981820ea64649206bd9a16778b9500 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.table.plan.metadata
import org.apache.flink.table.functions.sql.internal.{SqlRuntimeFilterBuilderFunction, SqlRuntimeFilterFunction}
import org.apache.flink.table.plan.metadata.SelectivityEstimator._
import org.apache.flink.table.plan.schema.{FlinkTable, TableSourceTable}
import org.apache.flink.table.plan.stats._
import org.apache.flink.table.plan.util.{FlinkRelOptUtil, FlinkRexUtil, PartitionPredicateExtractor, RexNodeExtractor}
import org.apache.flink.table.sources.PartitionableTableSource
import org.apache.calcite.avatica.util.DateTimeUtils
import org.apache.calcite.plan.{RelOptPredicateList, RelOptUtil}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFamily}
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.metadata.RelMdUtil
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeFamily
import org.apache.calcite.sql.{SqlKind, SqlOperator}
import org.apache.calcite.util.{ImmutableBitSet, TimeString}
import java.lang.{Double => JDouble}
import scala.collection.JavaConversions._
import scala.collection.mutable
* Estimates selectivity of rows meeting a filter predicate on a RelNode.
* <p>For some unsupported cases, we use the default selectivity values
* referred to [[org.apache.calcite.rel.metadata.RelMdUtil.guessSelectivity]].
* <p><strong>NOTES:</strong>
* In current stage, we don't have advanced statistics such as sketches or histograms,
* we have to assume uniform distribution.
* We should update estimation methods in the future if advanced statistics is ready.
* <p>Part of the code is referred from Apache Spark.
* @param rel RelNode
* @param mq Metadata query
class SelectivityEstimator(rel: RelNode, mq: FlinkRelMetadataQuery)
extends RexVisitorImpl[Option[Double]](true) {
private val rexBuilder = rel.getCluster.getRexBuilder
private val maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(rel)
// these default values is referred to RelMdUtil#guessSelectivity
private[flink] val defaultComparisonSelectivity = Some(0.5d)
private[flink] val defaultEqualsSelectivity = Some(0.15d)
private[flink] val defaultIsNullSelectivity = Some(0.1d)
private[flink] val defaultIsNotNullSelectivity = Some(1.0 - defaultIsNullSelectivity.get)
private[flink] val defaultLikeSelectivity = Some(0.05d)
private[flink] val defaultSelectivity = Some(0.25d)
* Returns a percentage of rows meeting a filter predicate on TableScan node.
* @param predicate predicate whose selectivity is to be estimated against scan's output.
* @return estimated selectivity (between 0.0 and 1.0),
* or None if no reliable estimate can be determined.
def evaluate(predicate: RexNode): Option[Double] = {
try {
if (predicate == null) {
} else {
val rexSimplify = new RexSimplify(
rexBuilder, RelOptPredicateList.EMPTY, true, RexUtil.EXECUTOR)
val simplifiedPredicate = rexSimplify.simplify(predicate)
if (simplifiedPredicate.isAlwaysTrue) {
} else if (simplifiedPredicate.isAlwaysFalse) {
} else {
} catch {
// if found unsupported operations, fallback
case _: Throwable => None
* Get partition field names from PartitionableTableSource.
* @return partition field names if the TableSource in scan is PartitionableTableSource,
* otherwise None.
private def getPartitionFieldNames(scan: TableScan): Option[Array[String]] = {
val table = scan.getTable.unwrap(classOf[TableSourceTable])
table match {
case t: TableSourceTable =>
t.tableSource match {
case p: PartitionableTableSource =>
case _ => None
case _ => None
override def visitCall(call: RexCall): Option[Double] = {
call.getOperator match {
case AND =>
val predicates = splitAndPredicate(call)
// predicates.size == 1 indicates that
// the RexInputRef of each sub-condition in this AND call are the same.
if (predicates.size == 1) {
} else {
val selectivity =
case OR =>
val predicates = splitOrPredicate(call)
if (predicates.size == 1) {
predicates.head match {
case c: RexCall if c.getOperator == IN => estimateSinglePredicate(c)
case _ => estimateOrPredicate(call)
} else {
val selectivity =
Some(math.min(1.0, selectivity.sum - selectivity.product))
case NOT =>
val operands = call.getOperands
val selectivity = estimateOperand(operands.head)
Some(1.0 - selectivity)
case _ =>
private def estimateOperand(operand: RexNode): Double = {
val subSelectivity = operand.accept(this)
if (subSelectivity != null) subSelectivity.getOrElse(1.0) else 1.0
* Returns a percentage of rows meeting a single condition in Filter node.
* We will do partition pruning with those supported partition-pruning predicates,
* and the rowCount in [[org.apache.flink.table.plan.stats.TableStats]] should exclude the
* rows of the pruned partitions. So we should ignore partition predicate here.
* @param singlePredicate predicate whose selectivity is to be estimated against scan's output.
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if the condition is not supported.
private def estimateSinglePredicate(singlePredicate: RexCall): Option[Double] = {
if (isSupportedPartitionPredicate(singlePredicate)) {
return estimatePartitionPredicate(singlePredicate)
val operands = singlePredicate.getOperands
singlePredicate.getOperator match {
case EQUALS =>
estimateComparison(EQUALS, operands.head, operands.last)
case NOT_EQUALS =>
val selectivity = estimateComparison(EQUALS, operands.head, operands.last)
Some(1.0 - selectivity.getOrElse(1.0))
estimateComparison(GREATER_THAN, operands.head, operands.last)
estimateComparison(GREATER_THAN_OR_EQUAL, operands.head, operands.last)
case LESS_THAN =>
estimateComparison(LESS_THAN, operands.head, operands.last)
estimateComparison(LESS_THAN_OR_EQUAL, operands.head, operands.last)
case IS_NULL =>
case IS_NOT_NULL =>
case IN =>
estimateIn(operands.head, operands.slice(1, operands.size))
case LIKE =>
// RuntimeFilter should not break Cost's estimate.
// (Because it may be useless, just advance Join's filtering only).
case _: SqlRuntimeFilterBuilderFunction | _: SqlRuntimeFilterFunction =>
case _ =>
* Decomposes a predicate into a list of expressions that are AND'ed together,
* and the expressions with same RexInputRefs will be converted into an AND.
* e.g.
* input predicate: `a > 10 AND b < 40.0 AND a < 20 AND CAST(b AS INTEGER) > 30`
* output expressions: `a > 10 AND a < 20` and `b < 40.0` and `CAST(b AS INTEGER) > 30`
private def splitAndPredicate(call: RexCall): Seq[RexNode] = {
val cnf = FlinkRexUtil.toCnf(rexBuilder, maxCnfNodeCount, call)
val conjunctions = RelOptUtil.conjunctions(cnf)
val combinations = new mutable.HashMap[String, mutable.ListBuffer[RexNode]]()
val nonCombinations = new mutable.ListBuffer[RexNode]()
val visitor = new RexVisitorImpl[Unit](true) {
override def visitCall(call: RexCall): Unit = {
call.getOperator match {
case AND => throw new RuntimeException("This should not happen.")
// TODO supports more operators
val (left, right) = (call.operands.get(0), call.operands.get(1))
(left, right) match {
case (i: RexInputRef, _: RexLiteral) =>
combinations.getOrElseUpdate(i.toString, mutable.ListBuffer[RexNode]()) += call
case (_: RexLiteral, i: RexInputRef) =>
combinations.getOrElseUpdate(i.toString, mutable.ListBuffer[RexNode]()) += call
case _ => nonCombinations += call
case _ => nonCombinations += call
val combined = { r =>
RexUtil.composeConjunction(rexBuilder, r, false)
combined ++ nonCombinations
* Decomposes a predicate into a list of expressions that are OR'ed together,
* and the expressions with same RexInputRefs connected with EQUALS will be converted into an IN.
* e.g.
* input predicate: `a = 10 or a = 20 OR a > 40.0 OR a < 0 OR b > 30`
* output expressions: `a IN (10, 20)` and `(a > 40 OR a < 0)` and `b > 30`
private def splitOrPredicate(call: RexCall): Seq[RexNode] = {
val combinations = new mutable.HashMap[String, mutable.ListBuffer[RexNode]]()
val nonCombinations = new mutable.ListBuffer[RexNode]()
val visitor = new RexVisitorImpl[Unit](true) {
override def visitCall(call: RexCall): Unit = {
call.getOperator match {
// TODO supports more operators
val (left, right) = (call.operands.get(0), call.operands.get(1))
(left, right) match {
case (i: RexInputRef, _: RexLiteral) =>
combinations.getOrElseUpdate(i.toString, mutable.ListBuffer[RexNode]()) += call
case (l: RexLiteral, i: RexInputRef) =>
val newCall = if (call.getOperator == EQUALS) {
call.clone(call.getType, List(i, l))
} else {
combinations.getOrElseUpdate(i.toString, mutable.ListBuffer[RexNode]()) += newCall
case _ => nonCombinations += call
case _ => nonCombinations += call
val disjunctions = RelOptUtil.disjunctions(call)
val combined = combinations.values.flatMap { r =>
val (equalsNodes, nonEqualsNodes) = r.partition(_.asInstanceOf[RexCall].getOperator == EQUALS)
if (equalsNodes.size > 1) {
val inputRef = equalsNodes.head.asInstanceOf[RexCall].getOperands.head
val valuesNode =[RexCall].getOperands.last)
val inNode = rexBuilder.makeCall(IN, List(inputRef) ++ valuesNode)
if (nonEqualsNodes.isEmpty) {
} else {
Seq(inNode, RexUtil.composeDisjunction(rexBuilder, nonEqualsNodes, false))
} else {
Seq(RexUtil.composeDisjunction(rexBuilder, r, false))
combined ++ nonCombinations
* Returns a percentage of rows meeting an AND condition with same RexInputRef in Filter node.
* Each sub-condition in AND is a binary comparison condition,
* and only binary comparison operator <, <=, >, >= are supported now.
* The condition is like: `a > 10 AND a < 20`
* @param andPredicate predicate whose selectivity is to be estimated against scan's output.
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if the condition is not supported.
private def estimateAndPredicate(andPredicate: RexCall): Option[Double] = {
val cnf = FlinkRexUtil.toCnf(rexBuilder, maxCnfNodeCount, andPredicate)
val conjunctions = RelOptUtil.conjunctions(cnf)
val inputRefs = mutable.HashSet[Int]()
val supported = conjunctions.forall {
case c: RexCall =>
c.getOperator match {
val (left, right) = (c.operands.get(0), c.operands.get(1))
(left, right) match {
case (i: RexInputRef, _: RexLiteral) => inputRefs += i.getIndex
case (_: RexLiteral, i: RexInputRef) => inputRefs += i.getIndex
case _ => throw new RuntimeException("This should not happen.")
// TODO support partition predicate and non-numeric type
!isSupportedPartitionPredicate(c) &&
canConvertToNumericType(left.getType) &&
case _ => throw new RuntimeException("This should not happen.")
case _ => throw new RuntimeException("This should not happen.")
require(inputRefs.size == 1)
if (!supported) {
val selectivity =
return Some(selectivity.product)
val predicateIntervals = {
case c: RexCall =>
val (left, right) = (c.operands.get(0), c.operands.get(1))
c.getOperator match {
(left, right) match {
case (i: RexInputRef, l: RexLiteral) =>
ValueInterval(literalToDouble(l), null, includeLower = false, includeUpper = false)
case (l: RexLiteral, _: RexInputRef) =>
ValueInterval(null, literalToDouble(l), includeLower = false, includeUpper = false)
case _ => throw new RuntimeException("This should not happen.")
(left, right) match {
case (_: RexInputRef, l: RexLiteral) =>
ValueInterval(literalToDouble(l), null, includeLower = true, includeUpper = false)
case (l: RexLiteral, _: RexInputRef) =>
ValueInterval(null, literalToDouble(l), includeLower = false, includeUpper = true)
case _ => throw new RuntimeException("This should not happen.")
case LESS_THAN =>
(left, right) match {
case (_: RexInputRef, l: RexLiteral) =>
ValueInterval(null, literalToDouble(l), includeLower = false, includeUpper = false)
case (l: RexLiteral, _: RexInputRef) =>
ValueInterval(literalToDouble(l), null, includeLower = false, includeUpper = false)
case _ => throw new RuntimeException("This should not happen.")
(left, right) match {
case (_: RexInputRef, l: RexLiteral) =>
ValueInterval(null, literalToDouble(l), includeLower = false, includeUpper = true)
case (l: RexLiteral, _: RexInputRef) =>
ValueInterval(literalToDouble(l), null, includeLower = true, includeUpper = false)
case _ => throw new RuntimeException("This should not happen.")
case _ => throw new RuntimeException("This should not happen.")
case _ => throw new RuntimeException("This should not happen.")
estimateAndNumericComparison(inputRefs.head, predicateIntervals)
* Returns a percentage of rows meeting an AND condition and
* each sub-condition is a binary numeric comparison expression.
* This method evaluate expression for Numeric/Boolean/Date/Time/Timestamp columns.
private def estimateAndNumericComparison(
inputRefIndex: Int,
intervals: Seq[ValueInterval]): Option[Double] = {
val default = Some(math.pow(defaultComparisonSelectivity.get, intervals.size))
val columnInterval = mq.getColumnInterval(rel, inputRefIndex)
if (columnInterval == null) {
return default
// convert values in columnInterval to double type
val convertedInterval = convertValueInterval(columnInterval, SqlTypeFamily.NUMERIC)
val andInterval = intervals.foldLeft(convertedInterval)(ValueInterval.intersect)
columnInterval match {
case ValueInterval.empty => Some(0.0)
case ValueInterval.infinite => default
case _: LeftSemiInfiniteValueInterval => default
case _: RightSemiInfiniteValueInterval => default
case v: FiniteValueInterval =>
andInterval match {
case ValueInterval.empty => Some(0.0)
case lv: FiniteValueInterval =>
// TODO not take include min/max into consideration now
val (min, max) = (comparableToDouble(v.lower), comparableToDouble(v.upper))
val (litMin, litMax) = (comparableToDouble(lv.lower), comparableToDouble(lv.upper))
Some((litMax - litMin) / (max - min))
case _ =>
// predicate like `a > 10 AND a > 20` had been simplified in `evaluate` method
throw new RuntimeException("This should not happen.")
* Returns a percentage of rows meeting an OR condition with same RexInputRef in Filter node.
* Each sub-condition in OR is a binary comparison condition,
* and only binary comparison operator <, <=, >, >= are supported now.
* The condition is like: `a > 20 OR a < 10`
* @param orPredicate predicate whose selectivity is to be estimated against scan's output.
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if the condition is not supported.
private def estimateOrPredicate(orPredicate: RexCall): Option[Double] = {
val disjunctions = RelOptUtil.disjunctions(orPredicate)
val result =
Some(math.min(1.0, result.sum))
* Returns whether a predicate is supported partition-pruning predicate.
* @param singlePredicate predicate whose selectivity is to be estimated against scan's output.
* @return true if the TableSource in scan is PartitionableTableSource and the columns in the
* predicate are all partition columns, otherwise false.
private def isSupportedPartitionPredicate(singlePredicate: RexCall): Boolean = {
rel match {
case ts: TableScan =>
val partitionFieldNames = getPartitionFieldNames(ts)
partitionFieldNames match {
// partitionable table
case Some(names) if names.nonEmpty =>
val (convertedPredicates, unconvertedRexNodes) =
val (_, nonPartitionPredicates) =
PartitionPredicateExtractor.extractPartitionPredicates(convertedPredicates, names)
unconvertedRexNodes.isEmpty && nonPartitionPredicates.isEmpty
// non-partitionable table
case _ => false
case _ => false
* Returns a percentage of rows meeting a partition expression.
* @param partitionPredicate partition predicate whose selectivity is to be estimated against
* scan's output.
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if the condition is not supported.
private def estimatePartitionPredicate(partitionPredicate: RexCall): Option[Double] = {
val table = rel.asInstanceOf[TableScan].getTable.unwrap(classOf[TableSourceTable])
val partitionableTableSource = table match {
case t: TableSourceTable =>
t.tableSource match {
case p: PartitionableTableSource => p
case _ => throw new RuntimeException("This should not happen.")
case _ => throw new RuntimeException("This should not happen.")
// TODO deal with filter partial push-down
if (partitionableTableSource.isFilterPushedDown) {
// Ignore any predicates on partition columns because we have already
// accounted for these in the Table row count.
} else {
// We do not use PartitionableTableSource#getAllPartitions method to get the number of
// partitions, because this method may involve some heavy work(e.g. reads metadata using I/O).
// We assume the number of partitions is 100 here.
val numOfPartitions = 100
partitionPredicate.getOperator match {
case EQUALS => Some(1.0 / numOfPartitions)
case NOT_EQUALS => Some(1.0 - 1.0 / numOfPartitions)
case IS_NULL => Some(0.0)
case IS_NOT_NULL => Some(1.0)
case IN =>
val inSet = partitionPredicate.getOperands.subList(1, partitionPredicate.getOperands.size)
val sizeOfInSet = inSet.size()
Some(Math.min(1.0, sizeOfInSet * 1.0 / numOfPartitions))
case _ => Some(5 * (1.0 / numOfPartitions))
* Returns a percentage of rows meeting a binary comparison expression containing two columns.
* We will use default comparison selectivity(0.5), in the following cases:
* 1. the type is not supported
* 2. the operator is not supported
* 3. the specified column's stats is empty
* @param op a binary comparison operator, including =, <=>, <, <=, >, >=
* @param left the left RexInputRef
* @param right the right RexInputRef
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if no statistics collected for a given column.
private def estimateComparison(op: SqlOperator, left: RexNode, right: RexNode): Option[Double] = {
if (!isSupportedComparisonType(left.getType) || !isSupportedComparisonType(right.getType)) {
val default = op match {
case EQUALS => defaultEqualsSelectivity
case _ => defaultComparisonSelectivity
return default
op match {
case EQUALS => (left, right) match {
case (i: RexInputRef, l: RexLiteral) => estimateEquals(i, l)
case (l: RexLiteral, i: RexInputRef) => estimateEquals(i, l)
case (l: RexInputRef, r: RexInputRef) => estimateComparison(EQUALS, l, r)
case _ => defaultEqualsSelectivity
case LESS_THAN => (left, right) match {
case (i: RexInputRef, l: RexLiteral) => estimateComparison(LESS_THAN, i, l)
case (l: RexLiteral, i: RexInputRef) => estimateComparison(GREATER_THAN, i, l)
case (l: RexInputRef, r: RexInputRef) => estimateComparison(LESS_THAN, l, r)
case _ => defaultComparisonSelectivity
case LESS_THAN_OR_EQUAL => (left, right) match {
case (i: RexInputRef, l: RexLiteral) => estimateComparison(LESS_THAN_OR_EQUAL, i, l)
case (l: RexLiteral, i: RexInputRef) => estimateComparison(GREATER_THAN_OR_EQUAL, i, l)
case (l: RexInputRef, r: RexInputRef) => estimateComparison(LESS_THAN_OR_EQUAL, l, r)
case _ => defaultComparisonSelectivity
case GREATER_THAN => (left, right) match {
case (i: RexInputRef, l: RexLiteral) => estimateComparison(GREATER_THAN, i, l)
case (l: RexLiteral, i: RexInputRef) => estimateComparison(LESS_THAN, i, l)
case (l: RexInputRef, r: RexInputRef) => estimateComparison(GREATER_THAN, l, r)
case _ => defaultComparisonSelectivity
case GREATER_THAN_OR_EQUAL => (left, right) match {
case (i: RexInputRef, l: RexLiteral) => estimateComparison(GREATER_THAN_OR_EQUAL, i, l)
case (l: RexLiteral, i: RexInputRef) => estimateComparison(LESS_THAN_OR_EQUAL, i, l)
case (l: RexInputRef, r: RexInputRef) => estimateComparison(GREATER_THAN_OR_EQUAL, l, r)
case _ => defaultComparisonSelectivity
case _ => defaultComparisonSelectivity
* Returns a percentage of rows meeting an equality (=) expression.
* @param inputRef a RexInputRef
* @param literal a literal value (or constant)
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if no statistics collected for a given column.
private def estimateEquals(inputRef: RexInputRef, literal: RexLiteral): Option[Double] = {
if (literal.isNull) {
return estimateIsNull(inputRef)
val columnInterval = mq.getColumnInterval(rel, inputRef.getIndex)
if (columnInterval == null) {
return defaultEqualsSelectivity
val convertedInterval = convertValueInterval(columnInterval, inputRef.getType)
convertedInterval match {
case ValueInterval.infinite => defaultEqualsSelectivity
case ValueInterval.empty => Some(0.0)
case _ =>
if (ValueInterval.contains(convertedInterval, literalToComparable(literal))) {
val bitSetOfInputRef = ImmutableBitSet.of(inputRef.getIndex)
val ndv = mq.getDistinctRowCount(rel, bitSetOfInputRef, null)
if (ndv == null) {
} else {
// there is no possible that ndv < 1.0 because of the protection in RelMetadataQuery.
Some(1.0 / ndv)
} else {
* Returns a percentage of rows meeting a binary comparison expression.
* @param op a binary comparison operator, including <, <=, >, >=
* @param inputRef a RexInputRef
* @param literal a literal value (or constant)
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if no statistics collected for a given column.
private def estimateComparison(
op: SqlOperator,
inputRef: RexInputRef,
literal: RexLiteral): Option[Double] = {
if (literal.isNull) {
throw new IllegalArgumentException("Numeric comparison does not support null literal here.")
if (canConvertToNumericType(inputRef.getType)) {
estimateNumericComparison(op, inputRef, literal)
} else {
// TODO: It is difficult to support binary comparisons for non-numeric type
// without advanced statistics like histogram.
* Returns a percentage of rows meeting a binary numeric comparison expression.
* This method evaluate expression for Numeric/Boolean/Date/Time/Timestamp columns.
* @param op a binary comparison operator, including <, <=, >, >=
* @param inputRef a RexInputRef
* @param literal a literal value (or constant)
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if no statistics collected for a given column.
private def estimateNumericComparison(
op: SqlOperator,
inputRef: RexInputRef,
literal: RexLiteral): Option[Double] = {
val columnInterval = mq.getColumnInterval(rel, inputRef.getIndex)
if (columnInterval == null) {
return defaultComparisonSelectivity
columnInterval match {
case ValueInterval.infinite => defaultComparisonSelectivity
case ValueInterval.empty => Some(0.0)
case _ =>
val (min, includeMin) = columnInterval match {
case hasLower: WithLower => (comparableToDouble(hasLower.lower), hasLower.includeLower)
case _ => (null, true)
val (max, includeMax) = columnInterval match {
case hasUpper: WithUpper => (comparableToDouble(hasUpper.upper), hasUpper.includeUpper)
case _ => (null, true)
val lit = literalToDouble(literal)
val (noOverlap, completeOverlap) = op match {
case LESS_THAN => (
greaterThanOrEqualTo(min, lit),
if (includeMax) lessThan(max, lit) else lessThanOrEqualTo(max, lit))
if (includeMin) greaterThan(min, lit) else greaterThanOrEqualTo(min, lit),
lessThanOrEqualTo(max, lit))
case GREATER_THAN => (
lessThanOrEqualTo(max, lit),
if (includeMin) greaterThan(min, lit) else greaterThanOrEqualTo(min, lit))
if (includeMax) lessThan(max, lit) else lessThanOrEqualTo(max, lit),
greaterThanOrEqualTo(min, lit))
val selectivity = if (noOverlap) {
} else if (completeOverlap) {
} else if (min != null && max != null) {
val bitSetOfInputRef = ImmutableBitSet.of(inputRef.getIndex)
val ndv = mq.getDistinctRowCount(rel, bitSetOfInputRef, null)
op match {
// there is no possible that ndv < 1.0 because of the protection in RelMetadataQuery.
case LESS_THAN =>
if (doubleEquals(lit, max)) {
if (includeMax) {
Option(ndv).map(v => 1.0 - (1.0 / v))
} else {
} else {
// TODO not take includeMin into consideration now
(lit - min) / (max - min)
if (doubleEquals(lit, min)) {
if (includeMin) {
// The boundary value is the only satisfying value.
Option(ndv).map(1.0 / _).getOrElse(defaultComparisonSelectivity.get)
} else {
} else {
// TODO not take includeMax into consideration now
(lit - min) / (max - min)
if (doubleEquals(lit, min)) {
if (includeMin) {
Option(ndv).map(v => 1.0 - (1.0 / v))
} else {
} else {
// TODO not take includeMin into consideration now
(max - lit) / (max - min)
if (doubleEquals(lit, max)) {
if (includeMax) {
Option(ndv).map(1.0 / _).getOrElse(defaultComparisonSelectivity.get)
} else {
} else {
// TODO not take includeMax into consideration now
(max - lit) / (max - min)
} else {
* Returns a percentage of rows meeting a binary comparison expression containing two columns.
* In SQL queries, we also see predicate expressions involving two columns
* such as "column-1 (op) column-2" where column-1 and column-2 belong to same table.
* Note that, if column-1 and column-2 belong to different tables, then it is a join
* operator's work, NOT a filter operator's work.
* @param op a binary comparison operator, including =, <, <=, >, >=
* @param left the left RexInputRef
* @param right the right RexInputRef
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if no statistics collected for a given column.
private def estimateComparison(
op: SqlOperator,
left: RexInputRef,
right: RexInputRef): Option[Double] = {
if (canConvertToNumericType(left.getType) && canConvertToNumericType(right.getType)) {
estimateNumericComparison(op, left, right)
} else {
// TODO: It is difficult to support binary comparisons for non-numeric type
// without advanced statistics like histogram.
op match {
case EQUALS => defaultEqualsSelectivity
case _ => defaultComparisonSelectivity
* Returns a percentage of rows meeting a binary numeric comparison expression
* containing two columns.
* This method evaluate expression for Numeric/Boolean/Date/Time/Timestamp columns.
* @param op a binary comparison operator, including =, <, <=, >, >=
* @param left the left RexInputRef
* @param right the right RexInputRef
* @return an optional double value to show the percentage of rows meeting a given condition.
* It returns None if no statistics collected for a given column.
private def estimateNumericComparison(
op: SqlOperator,
left: RexInputRef,
right: RexInputRef): Option[Double] = {
val selectivityWithoutStats = op match {
case EQUALS => defaultEqualsSelectivity
case _ => defaultComparisonSelectivity
val leftInterval = mq.getColumnInterval(this.rel, left.getIndex)
val rightInterval = mq.getColumnInterval(this.rel, right.getIndex)
if (leftInterval == null ||
leftInterval == ValueInterval.infinite ||
rightInterval == null ||
rightInterval == ValueInterval.infinite) {
return selectivityWithoutStats
} else if (leftInterval == ValueInterval.empty || rightInterval == ValueInterval.empty) {
return Some(0.0)
val leftNullCount = mq.getColumnNullCount(this.rel, left.getIndex)
val rightNullCount = mq.getColumnNullCount(this.rel, right.getIndex)
if (leftNullCount == null || rightNullCount == null) {
return selectivityWithoutStats
val bitSetOfLeftInputRef = ImmutableBitSet.of(left.getIndex)
val leftNdv = mq.getDistinctRowCount(this.rel, bitSetOfLeftInputRef, null)
val bitSetOfRightInputRef = ImmutableBitSet.of(right.getIndex)
val rightNdv = mq.getDistinctRowCount(this.rel, bitSetOfRightInputRef, null)
if (op.equals(EQUALS) &&
&& rightInterval.isInstanceOf[FiniteValueInterval])) ||
leftNdv == null || rightNdv == null) {
return defaultEqualsSelectivity
// left interval
val (leftMin, leftIncludeMin) = leftInterval match {
case hasLower: WithLower => (comparableToDouble(hasLower.lower), hasLower.includeLower)
case _ => (null, true)
val (leftMax, leftIncludeMax) = leftInterval match {
case hasUpper: WithUpper => (comparableToDouble(hasUpper.upper), hasUpper.includeUpper)
case _ => (null, true)
val (rightMin, rightIncludeMin) = rightInterval match {
case hasLower: WithLower => (comparableToDouble(hasLower.lower), hasLower.includeLower)
case _ => (null, true)
val (rightMax, rightIncludeMax) = rightInterval match {
case hasUpper: WithUpper => (comparableToDouble(hasUpper.upper), hasUpper.includeUpper)
case _ => (null, true)
// determine the overlapping degree between predicate interval and column's interval
val (noOverlap, completeOverlap) = op match {
// Left < Right or Left <= Right
// - no overlap:
// rightMin rightMax leftMin leftMax
// --------+------------------+------------+-------------+------->
// - complete overlap: (If null values exists, we set it to partial overlap.)
// leftMin leftMax rightMin rightMax
// --------+------------------+------------+-------------+------->
case LESS_THAN =>
greaterThanOrEqualTo(leftMin, rightMax),
if (leftIncludeMax && rightIncludeMin) {
lessThan(leftMax, rightMin)
} else {
lessThanOrEqualTo(leftMax, rightMin)
if (leftIncludeMin && rightIncludeMax) {
greaterThan(leftMin, rightMax)
} else {
greaterThanOrEqualTo(leftMin, rightMax)
lessThanOrEqualTo(leftMax, rightMin))
// Left > Right or Left >= Right
// - no overlap:
// leftMin leftMax rightMin rightMax
// --------+------------------+------------+-------------+------->
// - complete overlap: (If null values exists, we set it to partial overlap.)
// rightMin rightMax leftMin leftMax
// --------+------------------+------------+-------------+------->
lessThanOrEqualTo(leftMax, rightMin),
if (leftIncludeMin && rightIncludeMax) {
greaterThan(leftMin, rightMax)
} else {
greaterThanOrEqualTo(leftMin, rightMax)
if (leftIncludeMax && rightIncludeMin) {
lessThan(leftMax, rightMin)
} else {
lessThanOrEqualTo(leftMax, rightMin)
greaterThanOrEqualTo(leftMin, rightMax))
// Left = Right
// - no overlap:
// leftMin leftMax rightMin rightMax
// --------+------------------+------------+-------------+------->
// rightMin rightMax leftMin leftMax
// --------+------------------+------------+-------------+------->
// - complete overlap:
// leftMin leftMin
// rightMin rightMax
// --------+------------------+------->
case EQUALS =>
// now, min/max/ndv is not null and ndv >= 0
val isLeftSmallerThanRight = if (leftIncludeMax && rightIncludeMin) {
leftMax < rightMin
} else {
leftMax <= rightMin
val isLeftBiggerThanRight = if (rightIncludeMax && leftIncludeMin) {
rightMax < leftMin
} else {
rightMax <= leftMin
isLeftSmallerThanRight || isLeftBiggerThanRight,
(leftMin == rightMin) && (leftMax == rightMax)
&& (leftIncludeMin == rightIncludeMin)
&& (leftIncludeMax == rightIncludeMax)
&& (leftNdv == rightNdv))
val allNotNull = (leftNullCount == 0) && (rightNullCount == 0)
val selectivity = if (noOverlap) {
} else if (completeOverlap && allNotNull) {
} else {
// For partial overlap, we use an empirical value 1/3 as suggested by the book
// "Database Systems, the complete book".
1.0 / 3.0
* Returns a percentage of rows meeting "IS NULL" condition.
* We will use default is_null selectivity(0.1), if the column stats is empty.
* @param input a RexNode
* @return an optional double value to show the percentage of rows meeting a given condition
* It returns None if no statistics collected for a given column.
private def estimateIsNull(input: RexNode): Option[Double] = {
val inputRef = convertToRexInputRef(input)
val nullCount = mq.getColumnNullCount(this.rel, inputRef.getIndex)
if (nullCount == null) {
return defaultIsNullSelectivity
val rowCount = mq.getRowCount(this.rel)
if (rowCount == null) {
} else {
// there is no possible that rowCount is 0 because of the protection in RelMetadataQuery.
Some(Math.min(nullCount / rowCount, 1.0))
* Returns a percentage of rows meeting "IS NOT NULL" condition.
* We will use default is_not_null selectivity(0.9), if the column stats is empty.
* @param input a RexNode
* @return an optional double value to show the percentage of rows meeting a given condition
* It returns None if no statistics collected for a given column.
private def estimateIsNotNull(input: RexNode): Option[Double] = {
val inputRef = convertToRexInputRef(input)
val nullCount = mq.getColumnNullCount(this.rel, inputRef.getIndex)
if (nullCount == null) {
return defaultIsNotNullSelectivity
val rowCount = mq.getRowCount(this.rel)
val selectivity = if (rowCount == 0) {
} else {
1.0 - (nullCount / rowCount)
* Returns a percentage of rows meeting "IN" operator expression.
* @param input a RexNode
* @param inSet a set of literal values
* @return an optional double value to show the percentage of rows meeting a given condition
* It returns None if no statistics exists for a given column.
private def estimateIn(input: RexNode, inSet: Seq[RexNode]): Option[Double] = {
val inputRef = convertToRexInputRef(input)
val defaultInSelectivity = Some(Math.min(1.0, defaultEqualsSelectivity.get * inSet.size))
if (!isSupportedComparisonType(input.getType)) {
return defaultInSelectivity
val columnInterval = mq.getColumnInterval(this.rel, inputRef.getIndex)
if (columnInterval == null) {
val ndv = mq.getDistinctRowCount(rel, ImmutableBitSet.of(inputRef.getIndex), null)
return if (ndv == null) {
} else {
Some(inSet.size / ndv)
val interval = convertValueInterval(columnInterval, inputRef.getType)
interval match {
case ValueInterval.infinite => defaultInSelectivity
case ValueInterval.empty => Some(0.0)
case _ =>
val validInSet = inSet.filter(
v => ValueInterval.contains(interval, literalToComparable(v.asInstanceOf[RexLiteral])))
if (validInSet.isEmpty) {
} else {
val bitSetOfInputRef = ImmutableBitSet.of(inputRef.getIndex)
val ndv = mq.getDistinctRowCount(rel, bitSetOfInputRef, null)
if (ndv == null) {
} else {
// there is no possible that ndv < 1.0 because of the protection in RelMetadataQuery.
// newNdv should not be greater than the old ndv. For example, column has only 2 values
// 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validSet.size is 5.
val newNdv = Math.min(ndv, validInSet.size)
// return the filter selectivity. Without advanced statistics such as histograms,
// we have to assume uniform distribution.
Some(Math.min(1.0, newNdv * 1.0 / ndv))
* Checks whether all values are literal. If contains non-literal values, throw an exception.
private def checkInSet(inSet: Seq[RexNode]): Unit = {
if (inSet.exists(!_.isA(SqlKind.LITERAL))) {
throw new IllegalArgumentException(s"${
} contain some non-literal values.")
* If a rexNode is RexInputRef type, convert it to RexInputRef; else throw an exception.
private def convertToRexInputRef(rexNode: RexNode): RexInputRef = {
rexNode match {
case i: RexInputRef => i
case _ => throw new IllegalArgumentException(s"$rexNode is not a RexInputRef.")
private def doubleEquals(v1: Double, v2: Double): Boolean = Math.abs(v1 - v2) < 1e-6
object SelectivityEstimator {
* Convert ValueInterval,
* for Numeric/Boolean/Date/Time/Timestamp, min/max will be converted to Double,
* for CHARACTER, min/max will be converted to String.
def convertValueInterval(interval: ValueInterval, dataType: RelDataType): ValueInterval = {
require(interval != null && dataType != null)
convertValueInterval(interval, dataType.getFamily)
* Convert ValueInterval,
* for Numeric/Boolean/Date/Time/Timestamp, min/max will be converted to Double,
* for CHARACTER, min/max will be converted to String.
def convertValueInterval(
interval: ValueInterval,
typeFamily: RelDataTypeFamily): ValueInterval = {
require(interval != null && typeFamily != null)
interval match {
case ValueInterval.empty | ValueInterval.infinite => interval
case _ =>
val (lower, includeLower) = interval match {
case li: WithLower => (li.lower, li.includeLower)
case _ => (null, false)
val (upper, includeUpper) = interval match {
case ui: WithUpper => (ui.upper, ui.includeUpper)
case _ => (null, false)
typeFamily match {
case SqlTypeFamily.NUMERIC | SqlTypeFamily.BOOLEAN | SqlTypeFamily.DATE |
SqlTypeFamily.TIME | SqlTypeFamily.TIMESTAMP =>
case SqlTypeFamily.CHARACTER =>
case _ => throw new UnsupportedOperationException(s"Unsupported typeFamily: $typeFamily")
* Get literal value as Comparable.
* The class may be different between literal value and min/max value,
* and different classes can't be compared. So
* for Numeric/Boolean/Date/Time/Timestamp, literal will be converted to Double uniformly,
* for CHARACTER, literal will be converted to String.
* e.g. min/max values are Double: min=10.0 and max=100.0,
* however the literal values are Integer: select * from tb where a in (20, 30, 40)
def literalToComparable(literal: RexLiteral): Comparable[_] = {
if (!literal.isNull) {
literal.getType.getFamily match {
case SqlTypeFamily.NUMERIC | SqlTypeFamily.BOOLEAN | SqlTypeFamily.DATE |
SqlTypeFamily.TIME | SqlTypeFamily.TIMESTAMP => literalToDouble(literal)
case SqlTypeFamily.CHARACTER => literal.getValueAs(classOf[String])
case _ => throw new UnsupportedOperationException(
s"Can't get value as comparable from literal type: ${literal.getType}")
} else {
def comparableToDouble(value: Any): JDouble = {
value match {
case null => null
case v: Number => v.doubleValue()
case v: Boolean => if (v) 1.0 else 0.0
// Returns days since 1970-01-01 for Date type value
case v: java.sql.Date => JDouble.valueOf(v.getTime / DateTimeUtils.MILLIS_PER_DAY)
// Returns milliseconds since 1970-01-01 00:00:00 for Timestamp type value
case v: java.sql.Timestamp => JDouble.valueOf(v.getTime)
// Returns milliseconds since midnight for Time type value
case v: java.sql.Time => JDouble.valueOf(new TimeString(v.toString).getMillisOfDay)
case _ =>
throw new UnsupportedOperationException(
s"Can't convert comparable type: ${value.getClass} to Double")
def literalToDouble(literal: RexLiteral): JDouble = {
if (!literal.isNull) {
literal.getType.getFamily match {
case SqlTypeFamily.NUMERIC =>
case SqlTypeFamily.BOOLEAN =>
if (RexLiteral.booleanValue(literal)) 1.0 else 0.0
case SqlTypeFamily.DATE | SqlTypeFamily.TIME | SqlTypeFamily.TIMESTAMP =>
case _ =>
throw new UnsupportedOperationException(
s"Can't get value as Double from literal type: ${literal.getType}")
} else {
def lessThan(left: Comparable[_], right: Comparable[_]): Boolean = {
if (left == null || right == null) {
} else {
left.asInstanceOf[Comparable[Any]].compareTo(right.asInstanceOf[Comparable[Any]]) < 0
def lessThanOrEqualTo(left: Comparable[_], right: Comparable[_]): Boolean = {
if (left == null || right == null) {
} else {
left.asInstanceOf[Comparable[Any]].compareTo(right.asInstanceOf[Comparable[Any]]) <= 0
def greaterThan(left: Comparable[_], right: Comparable[_]): Boolean = {
if (left == null || right == null) {
} else {
left.asInstanceOf[Comparable[Any]].compareTo(right.asInstanceOf[Comparable[Any]]) > 0
def greaterThanOrEqualTo(left: Comparable[_], right: Comparable[_]): Boolean = {
if (left == null || right == null) {
} else {
left.asInstanceOf[Comparable[Any]].compareTo(right.asInstanceOf[Comparable[Any]]) >= 0
* Currently, Numeric/Boolean/String/Date/Time/Timestamp are supported.
def isSupportedComparisonType(relType: RelDataType): Boolean = {
relType.getFamily match {
case SqlTypeFamily.NUMERIC | SqlTypeFamily.BOOLEAN | SqlTypeFamily.CHARACTER |
SqlTypeFamily.DATE | SqlTypeFamily.TIME | SqlTypeFamily.TIMESTAMP => true
case _ => false
* Currently, value of Numeric/Boolean/Date/Time/Timestamp column can be converted to a number.
def canConvertToNumericType(relType: RelDataType): Boolean = {
relType.getFamily match {
case SqlTypeFamily.NUMERIC | SqlTypeFamily.BOOLEAN |
SqlTypeFamily.DATE | SqlTypeFamily.TIME | SqlTypeFamily.TIMESTAMP => true
case _ => false