blob: 4ba8da6a5d658d9b29c9c21fe5a6739376e43fde [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.metadata
import org.apache.flink.table.plan.metadata.FlinkMetadata.ColumnNullCount
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSnapshot
import org.apache.flink.table.plan.schema.FlinkRelOptTable
import org.apache.flink.table.plan.util.{FlinkRelOptUtil, FlinkRexUtil}
import org.apache.flink.util.Preconditions
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core._
import org.apache.calcite.rel.metadata._
import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexNode}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.util.Util
import java.lang.{Double => JDouble}
import java.util
import scala.collection.JavaConversions._
/**
* FlinkRelMdColumnNullCount supplies a default implementation of
* [[FlinkRelMetadataQuery.getColumnNullCount]] for the standard logical algebra.
*/
class FlinkRelMdColumnNullCount private extends MetadataHandler[ColumnNullCount] {
override def getDef: MetadataDef[ColumnNullCount] = FlinkMetadata.ColumnNullCount.DEF
/**
* Gets the null count of the given column in TableScan.
*
* @param ts TableScan RelNode
* @param mq RelMetadataQuery instance
* @param index the index of the given column
* @return the null count of the given column in TableScan
*/
def getColumnNullCount(ts: TableScan, mq: RelMetadataQuery, index: Int): JDouble = {
Preconditions.checkArgument(mq.isInstanceOf[FlinkRelMetadataQuery])
val relOptTable = ts.getTable.asInstanceOf[FlinkRelOptTable]
val fieldNames = relOptTable.getRowType.getFieldNames
Preconditions.checkArgument(index >= 0 && index < fieldNames.size())
val fieldName = fieldNames.get(index)
val statistic = relOptTable.getFlinkStatistic
val colStats = statistic.getColumnStats(fieldName)
if (colStats != null && colStats.nullCount != null) {
colStats.nullCount.toDouble
} else {
null
}
}
/**
* Gets the null count of the given column in FlinkLogicalSnapshot.
* TODO implements it.
*
* @param snapshot Snapshot RelNode
* @param mq RelMetadataQuery instance
* @param index the index of the given column
* @return the null count of the given column in TableScan
*/
def getColumnNullCount(
snapshot: FlinkLogicalSnapshot,
mq: RelMetadataQuery,
index: Int): JDouble = null
/**
* Gets the null count of the given column in Project.
*
* @param project Project RelNode
* @param mq RelMetadataQuery instance
* @param index the index of the given column
* @return the null count of the given column in Project
*/
def getColumnNullCount(project: Project, mq: RelMetadataQuery, index: Int): JDouble = {
val (nullCountOfInput, _) = getColumnNullAfterProjects(
project.getInput, project.getProjects, mq, index)
nullCountOfInput
}
/**
* Gets the null count of the given column in Filter.
*
* @param filter Filter RelNode
* @param mq RelMetadataQuery instance
* @param index the index of the given column
* @return the null count of the given column in Filter
*/
def getColumnNullCount(filter: Filter, mq: RelMetadataQuery, index: Int): JDouble = {
val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
val nullCountOfInput = fmq.getColumnNullCount(filter.getInput, index)
val predicate = filter.getCondition
getColumnNullAfterPredicate(predicate, nullCountOfInput, filter, index)
}
/**
* Gets the null count of the given column in Calc.
*
* @param calc Calc RelNode
* @param mq RelMetadataQuery instance
* @param index the index of the given column
* @return the null count of the given column in Calc
*/
def getColumnNullCount(calc: Calc, mq: RelMetadataQuery, index: Int): JDouble = {
val program = calc.getProgram
val projects = program.getProjectList.map(program.expandLocalRef)
val (nullCountOfInput, mappingFieldInInput) = getColumnNullAfterProjects(
calc.getInput, projects, mq, index)
if (program.getCondition == null) {
nullCountOfInput
} else {
val predicate = program.expandLocalRef(program.getCondition)
getColumnNullAfterPredicate(predicate, nullCountOfInput, calc, mappingFieldInInput)
}
}
private def getColumnNullAfterProjects(
input: RelNode,
projects: util.List[RexNode],
mq: RelMetadataQuery,
index: Int): (JDouble, Int) = {
val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
projects.get(index) match {
case inputRef: RexInputRef => (
fmq.getColumnNullCount(input, inputRef.getIndex),
inputRef.getIndex)
case literal: RexLiteral => if (literal.isNull) (1D, -1) else (0D, -1)
case _ => (null, -1)
}
}
private def getColumnNullAfterPredicate(
predicate: RexNode,
nullCountOfInput: JDouble,
relNode: RelNode,
mappingFieldInInput: Int): JDouble = {
if (predicate == null) {
nullCountOfInput
} else {
// if null count of input is already 0, then null count must be 0 after predicate.
if (nullCountOfInput != null && Math.abs(nullCountOfInput) < RelOptUtil.EPSILON) {
0D
} else if (mappingFieldInInput == -1) {
null
} else {
// If predicate has $index is not null, null count of index is must be 0 after predicate.
val rexBuilder = relNode.getCluster.getRexBuilder
val maxCnfNodeCount = FlinkRelOptUtil.getMaxCnfNodeCount(relNode)
val cnf = FlinkRexUtil.toCnf(rexBuilder, maxCnfNodeCount, predicate)
val conjunctions = RelOptUtil.conjunctions(cnf)
val notNullPredicatesAtIndexField = conjunctions.exists {
c => c match {
case call: RexCall if call.getOperator == SqlStdOperatorTable.IS_NOT_NULL =>
call.getOperands.head match {
case i: RexInputRef => i.getIndex == mappingFieldInInput
case _ => false
}
case _ => false
}
}
if (notNullPredicatesAtIndexField) 0D else null
}
}
}
/**
* Gets the null count of the given column in Join.
*
* @param rel Join RelNode
* @param mq RelMetadataQuery instance
* @param index the index of the given column
* @return the null count of the given column in Join.
*/
def getColumnNullCount(rel: Join, mq: RelMetadataQuery, index: Int): JDouble = {
def isUniqueOnJoinKeys(): Boolean = {
val joinInfo = rel.analyzeCondition()
if (joinInfo.leftKeys.isEmpty) {
return false
}
val isLeftJoinKeysUnique = mq.areColumnsUnique(rel.getLeft, joinInfo.leftSet())
if (isLeftJoinKeysUnique == null || !isLeftJoinKeysUnique) {
return false
}
val isRightJoinKeysUnique = mq.areColumnsUnique(rel.getRight, joinInfo.rightSet())
if (isRightJoinKeysUnique == null || !isRightJoinKeysUnique) {
return false
}
return true
}
val isLojOrRoj = rel.getJoinType match {
case JoinRelType.LEFT | JoinRelType.RIGHT => true
case _ => false
}
// Only handle LOJ OR ROJ, and both left joinKeys and right joinKeys are uniqueness.
if (!isLojOrRoj || !isUniqueOnJoinKeys()) {
return null
}
def calRowCountOfNewInnerJoin(): JDouble = {
val newInnerJoin = rel.copy(
rel.getTraitSet, rel.getCondition, rel.getLeft, rel.getRight,
JoinRelType.INNER, rel.isSemiJoinDone)
mq.getRowCount(newInnerJoin)
}
val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
val nFieldsOnLeft = rel.getLeft.getRowType.getFieldCount
rel.getJoinType match {
case JoinRelType.LEFT =>
if (index < nFieldsOnLeft) {
// If column comes from left child of LOJ, nullCount is not changed.
fmq.getColumnNullCount(rel.getLeft, index)
} else {
// If column comes from right child of LOJ, and both left joinKeys and right joinKeys are
// Unique in each child, nullCount is
// RowCount(LeftChild) - RowCount(LeftChild InnerJoin RightChild) + OldNullCount
val totalRowCount = mq.getRowCount(rel.getLeft)
val rowCntOfInnerJoin = calRowCountOfNewInnerJoin()
val oldNullCnt = fmq.getColumnNullCount(rel.getRight, index - nFieldsOnLeft)
if (totalRowCount == null || rowCntOfInnerJoin == null || oldNullCnt == null) {
return null
} else {
Math.max(totalRowCount - rowCntOfInnerJoin, 0D) + oldNullCnt
}
}
case JoinRelType.RIGHT =>
if (index >= nFieldsOnLeft) {
// If column comes from right child of ROJ, nullCount is not changed.
fmq.getColumnNullCount(rel.getRight, index - nFieldsOnLeft)
} else {
// If column comes from left child of ROJ, and both left joinKeys and right joinKeys are
// Unique in each child, nullCount is
// RowCount(RightChild) - RowCount(LeftChild InnerJoin RightChild) + OldNullCount
val totalRowCount = mq.getRowCount(rel.getRight)
val rowCntOfInnerJoin = calRowCountOfNewInnerJoin()
val oldNullCnt = fmq.getColumnNullCount(rel.getLeft, index)
if (totalRowCount == null || rowCntOfInnerJoin == null || oldNullCnt == null) {
return null
} else {
Math.max(totalRowCount - rowCntOfInnerJoin, 0D) + oldNullCnt
}
}
}
}
def getColumnNullCount(rel: Exchange, mq: RelMetadataQuery, index: Int): JDouble = {
val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
fmq.getColumnNullCount(rel.getInput, index)
}
def getColumnNullCount(rel: Sort, mq: RelMetadataQuery, index: Int): JDouble = {
val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
fmq.getColumnNullCount(rel.getInput, index)
}
def getColumnNullCount(subset: RelSubset, mq: RelMetadataQuery, index: Int): JDouble = {
val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
fmq.getColumnNullCount(Util.first(subset.getBest, subset.getOriginal), index)
}
/**
* Catches-all rule when none of the others apply.
*
* @param rel RelNode to analyze
* @param mq RelMetadataQuery instance
* @param index the index of the given column
* @return Always returns null
*/
def getColumnNullCount(rel: RelNode, mq: RelMetadataQuery, index: Int): JDouble = null
}
object FlinkRelMdColumnNullCount {
private val INSTANCE = new FlinkRelMdColumnNullCount
val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource(
FlinkMetadata.ColumnNullCount.METHOD, INSTANCE)
}