blob: 6289d0a9c01176b5c415e9b117f4c79903893793 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.metadata
import org.apache.flink.table.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef, RelModifiedMonotonicity}
import org.apache.flink.table.plan.metadata.FlinkMetadata.{ColumnInterval, ColumnNullCount, ColumnOriginNullCount, FilteredColumnInterval, FlinkDistribution, ModifiedMonotonicityMeta, SkewInfoMeta, UniqueGroups}
import org.apache.flink.table.plan.stats.{SkewInfoInternal, ValueInterval}
import org.apache.calcite.plan.RelTraitSet
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQuery}
import org.apache.calcite.util.ImmutableBitSet
import java.lang.{Double => JDouble}
import java.util.function.Supplier
/**
* RelMetadataQuery provides a strongly-typed facade on top of
* [[org.apache.calcite.rel.metadata.RelMetadataProvider]]
* for the set of relational expression metadata queries defined as standard within Calcite.
* FlinkRelMetadataQuery class is to add flink specified metadata queries.
*
* @param metadataProvider provider which provides metadata
* @param prototype the prototype which provides metadata handlers
*/
class FlinkRelMetadataQuery private(
metadataProvider: JaninoRelMetadataProvider,
prototype: RelMetadataQuery) extends RelMetadataQuery(metadataProvider, prototype) {
private[this] var columnIntervalHandler: ColumnInterval.Handler = _
private[this] var filteredColumnInterval: FilteredColumnInterval.Handler = _
private[this] var distributionHandler: FlinkDistribution.Handler = _
private[this] var columnNullCountHandler: ColumnNullCount.Handler = _
private[this] var columnOriginNullCountHandler: ColumnOriginNullCount.Handler = _
private[this] var skewInfoHandler: SkewInfoMeta.Handler = _
private[this] var modifiedMonotonicityHandler: ModifiedMonotonicityMeta.Handler = _
private[this] var uniqueGroupsHandler: UniqueGroups.Handler = _
private def this() {
this(RelMetadataQuery.THREAD_PROVIDERS.get, RelMetadataQuery.EMPTY)
this.columnIntervalHandler = RelMetadataQuery.initialHandler(classOf[ColumnInterval.Handler])
this.filteredColumnInterval =
RelMetadataQuery.initialHandler(classOf[FilteredColumnInterval.Handler])
this.distributionHandler = RelMetadataQuery.initialHandler(classOf[FlinkDistribution.Handler])
this.columnNullCountHandler = RelMetadataQuery.initialHandler(classOf[ColumnNullCount.Handler])
this.columnOriginNullCountHandler =
RelMetadataQuery.initialHandler(classOf[ColumnOriginNullCount.Handler])
this.skewInfoHandler = RelMetadataQuery.initialHandler(classOf[SkewInfoMeta.Handler])
this.modifiedMonotonicityHandler =
RelMetadataQuery.initialHandler(classOf[ModifiedMonotonicityMeta.Handler])
this.uniqueGroupsHandler = RelMetadataQuery.initialHandler(classOf[UniqueGroups.Handler])
}
/**
* Returns the [[ColumnInterval]] statistic.
*
* @param rel the relational expression
* @param index the index of the given column
* @return the interval of the given column of a specified relational expression.
* Returns null if interval cannot be estimated,
* Returns [[org.apache.flink.table.plan.stats.EmptyValueInterval]]
* if column values does not contains any value except for null.
*/
def getColumnInterval(rel: RelNode, index: Int): ValueInterval = {
try {
columnIntervalHandler.getColumnInterval(rel, this, index)
} catch {
case e: JaninoRelMetadataProvider.NoHandler =>
columnIntervalHandler = revise(e.relClass, FlinkMetadata.ColumnInterval.DEF)
getColumnInterval(rel, index)
}
}
/**
* Returns the [[ColumnInterval]] of the given column under the given filter argument.
*
* @param rel the relational expression
* @param columnIndex the index of the given column
* @param filterArg the index of the filter argument
* @return the interval of the given column of a specified relational expression.
* Returns null if interval cannot be estimated,
* Returns [[org.apache.flink.table.plan.stats.EmptyValueInterval]]
* if column values does not contains any value except for null.
*/
def getFilteredColumnInterval(rel: RelNode, columnIndex: Int, filterArg: Int): ValueInterval = {
try {
filteredColumnInterval.getFilteredColumnInterval(
rel, this, columnIndex, filterArg)
} catch {
case e: JaninoRelMetadataProvider.NoHandler =>
filteredColumnInterval = revise(e.relClass, FlinkMetadata.FilteredColumnInterval.DEF)
getFilteredColumnInterval(rel, columnIndex, filterArg)
}
}
/**
* Returns the null count of the given column.
*
* @param rel the relational expression
* @param index the index of the given column
* @return the null count of the given column if can be estimated, else return null.
*/
def getColumnNullCount(rel: RelNode, index: Int): JDouble = {
try {
columnNullCountHandler.getColumnNullCount(rel, this, index)
} catch {
case e: JaninoRelMetadataProvider.NoHandler =>
columnNullCountHandler = revise(e.relClass, FlinkMetadata.ColumnNullCount.DEF)
getColumnNullCount(rel, index)
}
}
/**
* Returns origin null count of the given column.
*
* @param rel the relational expression
* @param index the index of the given column
* @return the null count of the given column if can be estimated, else return null.
*/
def getColumnOriginNullCount(rel: RelNode, index: Int): JDouble = {
try {
columnOriginNullCountHandler.getColumnOriginNullCount(rel, this, index)
} catch {
case e: JaninoRelMetadataProvider.NoHandler =>
columnOriginNullCountHandler = revise(e.relClass, FlinkMetadata.ColumnOriginNullCount.DEF)
getColumnOriginNullCount(rel, index)
}
}
/**
* Returns the [[FlinkRelDistribution]] statistic.
*
* @param rel the relational expression
* @return description of how the rows in the relational expression are
* physically distributed
*/
def flinkDistribution(rel: RelNode): FlinkRelDistribution = {
try {
distributionHandler.flinkDistribution(rel, this)
} catch {
case e: JaninoRelMetadataProvider.NoHandler =>
distributionHandler = revise(e.relClass, FlinkMetadata.FlinkDistribution.DEF)
flinkDistribution(rel)
}
}
def getSkewInfo(rel: RelNode): SkewInfoInternal = {
try {
skewInfoHandler.getSkewInfo(rel, this)
} catch {
case e: JaninoRelMetadataProvider.NoHandler =>
skewInfoHandler = revise(e.relClass, FlinkMetadata.SkewInfoMeta.DEF)
getSkewInfo(rel)
}
}
/**
* Returns the [[RelModifiedMonotonicity]] statistic.
*
* @param rel the relational expression
* @return the monotonicity for the corresponding relnode
*/
def getRelModifiedMonotonicity(rel: RelNode): RelModifiedMonotonicity = {
try {
modifiedMonotonicityHandler.getRelModifiedMonotonicity(rel, this)
} catch {
case e: JaninoRelMetadataProvider.NoHandler =>
modifiedMonotonicityHandler = revise(
e.relClass,
FlinkMetadata.ModifiedMonotonicityMeta.DEF)
getRelModifiedMonotonicity(rel)
}
}
/**
* Returns the (minimum) unique groups of the given columns.
*
* @param rel the relational expression
* @param columns the given columns in a specified relational expression.
* The given columns should not be null.
* @return the (minimum) unique columns which should be a sub-collection of the given columns,
* and should not be null or empty. If none unique columns can be found, return the
* given columns.
*/
def getUniqueGroups(rel: RelNode, columns: ImmutableBitSet): ImmutableBitSet = {
try {
require(columns != null)
if (columns.isEmpty) {
return columns
}
val uniqueGroups = uniqueGroupsHandler.getUniqueGroups(rel, this, columns)
require(uniqueGroups != null && !uniqueGroups.isEmpty)
require(columns.contains(uniqueGroups))
uniqueGroups
} catch {
case e: JaninoRelMetadataProvider.NoHandler =>
uniqueGroupsHandler = revise(e.relClass, FlinkMetadata.UniqueGroups.DEF)
getUniqueGroups(rel, columns)
}
}
}
object FlinkRelMetadataQuery {
def instance(): FlinkRelMetadataQuery = new FlinkRelMetadataQuery()
def traitSet(rel: RelNode): RelTraitSet = {
rel.getTraitSet.replaceIf(
FlinkRelDistributionTraitDef.INSTANCE, new Supplier[FlinkRelDistribution]() {
def get: FlinkRelDistribution =
reuseOrCreate(rel.getCluster.getMetadataQuery).flinkDistribution(rel)
})
}
/**
* Reuse input metadataQuery instance if it could cast to FlinkRelMetadataQuery class,
* or create one if not.
*
* @param mq metadataQuery which try to reuse
* @return a FlinkRelMetadataQuery instance
*/
def reuseOrCreate(mq: RelMetadataQuery): FlinkRelMetadataQuery = {
mq match {
case q: FlinkRelMetadataQuery => q
case _ => FlinkRelMetadataQuery.instance()
}
}
}