blob: 01c4357d0ab0823fd846141c97dbfc5ba7f9ce52 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.metadata
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.plan.FlinkJoinRelType
import org.apache.flink.table.plan.nodes.FlinkRelNode
import org.apache.flink.table.plan.nodes.calcite.{Expand, LogicalWindowAggregate, Rank}
import org.apache.flink.table.plan.nodes.logical._
import org.apache.flink.table.plan.nodes.physical.batch._
import org.apache.flink.table.plan.nodes.physical.stream._
import org.apache.flink.table.plan.schema.FlinkRelOptTable
import org.apache.flink.table.plan.util.FlinkRelMdUtil.splitColumnsIntoLeftAndRight
import org.apache.flink.table.plan.util.{FlinkRelMdUtil, TemporalJoinUtil}
import org.apache.flink.table.sources.{IndexKey, TableSource}
import org.apache.calcite.plan.RelOptTable
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.convert.Converter
import org.apache.calcite.rel.core._
import org.apache.calcite.rel.logical.LogicalSnapshot
import org.apache.calcite.rel.metadata._
import org.apache.calcite.rel.{RelNode, SingleRel}
import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
import org.apache.calcite.sql.SemiJoinType._
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.util.{Bug, BuiltInMethod, ImmutableBitSet, Util}
import java.lang.{Boolean => JBool}
import java.util
import scala.collection.JavaConversions._
class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata.ColumnUniqueness] {
def getDef: MetadataDef[BuiltInMetadata.ColumnUniqueness] = BuiltInMetadata.ColumnUniqueness.DEF
private def areTableColumnsUnique(
rel: TableScan,
tableSource: TableSource,
relOptTable: RelOptTable,
columns: ImmutableBitSet): JBool = {
if (columns.cardinality == 0) {
return false
}
if (tableSource != null) {
val indexes = TemporalJoinUtil.getTableIndexKeys(tableSource)
if (null != indexes &&
indexes.exists(index => index.isUnique && index.isIndex(columns.toArray))) {
return true
}
}
relOptTable match {
case flinkRelOptTable: FlinkRelOptTable => flinkRelOptTable.uniqueKeysSet match {
case Some(keysSet) if keysSet.isEmpty => false
case Some(keysSet) => keysSet.exists(columns.contains)
case _ => null
}
case _ => rel.getTable.isKey(columns)
}
}
def areColumnsUnique(
rel: TableScan,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
areTableColumnsUnique(rel, null, rel.getTable, columns)
}
def areColumnsUnique(
rel: Window,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = areColumnsUniqueOfOverWindow(rel, mq, columns, ignoreNulls)
def areColumnsUnique(
rel: BatchExecOverAggregate,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = areColumnsUniqueOfOverWindow(rel, mq, columns, ignoreNulls)
private def areColumnsUniqueOfOverWindow(
overWindow: SingleRel,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
val inputFieldLength = overWindow.getInput.getRowType.getFieldCount
val columnsBelongsToInput = ImmutableBitSet.of(columns.filter(_ < inputFieldLength).toList)
val isSubColumnsUnique = mq.areColumnsUnique(
overWindow.getInput,
columnsBelongsToInput,
ignoreNulls)
if (isSubColumnsUnique != null && isSubColumnsUnique) {
true
} else if (columnsBelongsToInput.cardinality() < columns.cardinality()) {
// We are not sure whether not belongs to input are unique or not
null
} else {
isSubColumnsUnique
}
}
def areColumnsUnique(
snapshot: LogicalSnapshot,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
mq.areColumnsUnique(snapshot.getInput, columns, ignoreNulls)
}
def areColumnsUnique(
rel: FlinkLogicalTableSourceScan,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
areTableColumnsUnique(rel, rel.tableSource, rel.getTable, columns)
}
def areColumnsUnique(
rel: FlinkLogicalLastRow,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
columns != null && columns.toArray.sameElements(rel.getUniqueKeys)
}
def areColumnsUnique(
rel: Expand,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
// values of expand_is are unique in rows expanded from a row,
// and a input unique key combined with expand_id are also unique
val expandIdIndex = rel.expandIdIndex
if (!columns.get(expandIdIndex)) {
return false
}
val columnsSkipExpandId = ImmutableBitSet.builder().addAll(columns).clear(expandIdIndex).build()
if (columnsSkipExpandId.cardinality == 0) {
return false
}
val inputRefColumns = columnsSkipExpandId.flatMap {
column =>
val inputRefs = FlinkRelMdUtil.getInputRefIndices(column, rel)
if (inputRefs.size() == 1 && inputRefs.head >= 0) {
Array(inputRefs.head)
} else {
Array.empty[Int]
}
}.toSeq
if (inputRefColumns.isEmpty) {
return false
}
mq.areColumnsUnique(rel.getInput, ImmutableBitSet.of(inputRefColumns: _*), ignoreNulls)
}
/**
* Determines whether a specified set of columns from a Calc relational expression are unique.
*
* @param rel the Calc relational expression
* @param mq metadata query instance
* @param columns column mask representing the subset of columns for which
* uniqueness will be determined
* @param ignoreNulls if true, ignore null values when determining column
* uniqueness
* @return whether the columns are unique, or
* null if not enough information is available to make that determination
*/
def areColumnsUnique(
rel: Calc,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
val program = rel.getProgram
// Calc is composed by projects and conditions. conditions does no change unique property;
// while projects maps a set of rows to a different set.
// Without knowledge of the mapping function(whether it
// preserves uniqueness), it is only safe to derive uniqueness
// info from the child of a project when the mapping is f(a) => a.
//
// Also need to map the input column set to the corresponding child
// references
areColumnsUniqueOfProject(
program.getProjectList.map(program.expandLocalRef),
mq, columns, ignoreNulls, rel)
}
/**
* Determines whether a specified set of columns from a RelSubSet relational expression are
* unique.
*
* FIX BUG in <a href="https://issues.apache.org/jira/browse/CALCITE-2134">[CALCITE-2134] </a>
*
* @param rel the RelSubSet relational expression
* @param mq metadata query instance
* @param columns column mask representing the subset of columns for which
* uniqueness will be determined
* @param ignoreNulls if true, ignore null values when determining column
* uniqueness
* @return whether the columns are unique, or
* null if not enough information is available to make that determination
*/
def areColumnsUnique(
rel: RelSubset,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
if (!Bug.CALCITE_1048_FIXED) {
return mq.areColumnsUnique(Util.first(rel.getBest, rel.getOriginal), columns, ignoreNulls)
}
var nullCount = 0
for (rel2 <- rel.getRels) {
rel2 match {
// NOTE: If add estimation uniqueness for new RelNode type e.g. Rank / Expand,
// add the RelNode to pattern matching in RelSubset.
case _: Aggregate | _: Filter | _: Values | _: TableScan | _: Project | _: Correlate |
_: Join | _: Exchange | _: Sort | _: SetOp | _: Calc | _: Converter | _: Window |
_: Expand | _: Rank | _: FlinkRelNode =>
try {
val unique = mq.areColumnsUnique(rel2, columns, ignoreNulls)
if (unique != null) {
if (unique) {
return true
}
} else {
nullCount += 1
}
}
catch {
case _: CyclicMetadataException =>
// Ignore this relational expression; there will be non-cyclic ones in this set.
}
case _ => // skip
}
}
if (nullCount == 0) false else null
}
def areColumnsUnique(
rel: Filter,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = mq.areColumnsUnique(rel.getInput, columns, ignoreNulls)
def areColumnsUnique(
rel: SetOp,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
// If not ALL then the rows are distinct.
// Therefore the set of all columns is a key.
!rel.all && columns.nextClearBit(0) >= rel.getRowType.getFieldCount
}
def areColumnsUnique(
rel: Intersect,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
if (areColumnsUnique(rel.asInstanceOf[SetOp], mq, columns, ignoreNulls)) {
return true
}
rel.getInputs foreach { input =>
val b = mq.areColumnsUnique(input, columns, ignoreNulls)
if (b != null && b) {
return true
}
}
false
}
def areColumnsUnique(
rel: Minus,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
if (areColumnsUnique(rel.asInstanceOf[SetOp], mq, columns, ignoreNulls)) {
true
} else {
mq.areColumnsUnique(rel.getInput(0), columns, ignoreNulls)
}
}
def areColumnsUnique(
rel: Sort,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = mq.areColumnsUnique(rel.getInput, columns, ignoreNulls)
def areColumnsUnique(
rel: Exchange,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = mq.areColumnsUnique(rel.getInput, columns, ignoreNulls)
def areColumnsUnique(
rel: Correlate,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
rel.getJoinType match {
case ANTI | SEMI => mq.areColumnsUnique(rel.getLeft, columns, ignoreNulls)
case LEFT | INNER =>
val leftFieldCount = rel.getLeft.getRowType.getFieldCount
val (leftColumns, rightColumns) = splitColumnsIntoLeftAndRight(leftFieldCount, columns)
val left = rel.getLeft
val right = rel.getRight
if (leftColumns.cardinality > 0 && rightColumns.cardinality > 0) {
val leftUnique = mq.areColumnsUnique(left, leftColumns, ignoreNulls)
val rightUnique = mq.areColumnsUnique(right, rightColumns, ignoreNulls)
if (leftUnique == null || rightUnique == null) null else leftUnique && rightUnique
}
else {
null
}
case _ => throw new IllegalStateException(
s"Unknown join type ${rel.getJoinType} for correlate relation $rel")
}
}
def areColumnsUnique(
rel: BatchExecCorrelate,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = null
def areColumnsUnique(
rel: Project,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
// LogicalProject maps a set of rows to a different set;
// Without knowledge of the mapping function(whether it
// preserves uniqueness), it is only safe to derive uniqueness
// info from the child of a project when the mapping is f(a) => a.
//
// Also need to map the input column set to the corresponding child
// references
areColumnsUniqueOfProject(rel.getProjects, mq, columns, ignoreNulls, rel)
}
private def areColumnsUniqueOfProject(
projExprs: util.List[RexNode],
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean,
originalNode: SingleRel): JBool = {
val childColumns = ImmutableBitSet.builder
columns.foreach { bit =>
val projExpr = projExprs.get(bit)
projExpr match {
case inputRef: RexInputRef => childColumns.set(inputRef.getIndex)
case a: RexCall if a.getKind.equals(SqlKind.AS) &&
a.getOperands.get(0).isInstanceOf[RexInputRef] =>
childColumns.set(a.getOperands.get(0).asInstanceOf[RexInputRef].getIndex)
case call: RexCall if ignoreNulls =>
// If the expression is a cast such that the types are the same
// except for the nullability, then if we're ignoring nulls,
// it doesn't matter whether the underlying column reference
// is nullable. Check that the types are the same by making a
// nullable copy of both types and then comparing them.
if (call.getOperator eq SqlStdOperatorTable.CAST) {
val castOperand = call.getOperands.get(0)
castOperand match {
case castRef: RexInputRef =>
val typeFactory = originalNode.getCluster.getTypeFactory
val castType = typeFactory.createTypeWithNullability(projExpr.getType, true)
val origType = typeFactory.createTypeWithNullability(castOperand.getType, true)
if (castType == origType) {
childColumns.set(castRef.getIndex)
}
case _ => // ignore
}
}
case _ =>
// If the expression will not influence uniqueness of the
// projection, then skip it.
}
}
// If no columns can affect uniqueness, then return unknown
if (childColumns.cardinality == 0) {
null
} else {
mq.areColumnsUnique(originalNode.getInput(), childColumns.build, ignoreNulls)
}
}
def areJoinColumnsUnique(
joinInfo: JoinInfo,
joinRelType: JoinRelType,
leftType: RelDataType,
isleftUnique: (ImmutableBitSet) => JBool,
isRightUnique: (ImmutableBitSet) => JBool,
mq: RelMetadataQuery,
columns: ImmutableBitSet): JBool = {
if (columns.cardinality == 0) {
return false
}
// Divide up the input column mask into column masks for the left and
// right sides of the join
val (leftColumns, rightColumns) = splitColumnsIntoLeftAndRight(leftType.getFieldCount, columns)
// If the original column mask contains columns from both the left and
// right hand side, then the columns are unique if and only if they're
// unique for their respective join inputs
val leftUnique = isleftUnique(leftColumns)
val rightUnique = isRightUnique(rightColumns)
if ((leftColumns.cardinality > 0) && (rightColumns.cardinality > 0)) {
if ((leftUnique == null) || (rightUnique == null)) {
return null
}
else {
return leftUnique && rightUnique
}
}
// If we're only trying to determine uniqueness for columns that
// originate from one join input, then determine if the equijoin
// columns from the other join input are unique. If they are, then
// the columns are unique for the entire join if they're unique for
// the corresponding join input, provided that input is not null
// generating.
if (leftColumns.cardinality > 0) {
if (joinRelType.generatesNullsOnLeft) {
false
} else {
val rightJoinColsUnique = isRightUnique(joinInfo.rightSet)
if ((rightJoinColsUnique == null) || (leftUnique == null)) {
null
} else {
rightJoinColsUnique && leftUnique
}
}
} else if (rightColumns.cardinality > 0) {
if (joinRelType.generatesNullsOnRight) {
false
} else {
val leftJoinColsUnique = isleftUnique(joinInfo.leftSet)
if ((leftJoinColsUnique == null) || (rightUnique == null)) {
null
} else {
leftJoinColsUnique && rightUnique
}
}
} else {
throw new AssertionError
}
}
def areColumnsUnique(
rel: StreamExecWindowJoin,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
val joinInfo = JoinInfo.of(rel.getLeft, rel.getRight, rel.joinCondition)
areJoinColumnsUnique(
joinInfo, rel.joinType, rel.getLeft.getRowType,
(leftSet: ImmutableBitSet) => mq.areColumnsUnique(rel.getLeft, leftSet, ignoreNulls),
(rightSet: ImmutableBitSet) => mq.areColumnsUnique(rel.getRight, rightSet, ignoreNulls),
mq, columns
)
}
def areColumnsUnique(
rel: StreamExecJoin,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
rel.joinType match {
case FlinkJoinRelType.ANTI | FlinkJoinRelType.SEMI =>
mq.areColumnsUnique(rel.getLeft, columns, ignoreNulls)
case _ =>
val joinInfo = JoinInfo.of(rel.getLeft, rel.getRight, rel.joinCondition)
areJoinColumnsUnique(
joinInfo, FlinkJoinRelType.toJoinRelType(rel.joinType), rel.getLeft.getRowType,
(leftSet: ImmutableBitSet) => mq.areColumnsUnique(rel.getLeft, leftSet, ignoreNulls),
(rightSet: ImmutableBitSet) => mq.areColumnsUnique(rel.getRight, rightSet, ignoreNulls),
mq, columns)
}
}
def areColumnsUnique(
rel: StreamExecLastRow,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
columns != null && columns.toArray.equals(rel.getUniqueKeys)
}
def areColumnsUnique(
rel: Join,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
areJoinColumnsUnique(
rel.analyzeCondition(), rel.getJoinType, rel.getLeft.getRowType,
(leftSet: ImmutableBitSet) => mq.areColumnsUnique(rel.getLeft, leftSet, ignoreNulls),
(rightSet: ImmutableBitSet) => mq.areColumnsUnique(rel.getRight, rightSet, ignoreNulls),
mq, columns
)
}
def areColumnsUnique(
rel: SemiJoin,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
// only return the unique keys from the LHS since a semijoin only
// returns the LHS
mq.areColumnsUnique(rel.getLeft, columns, ignoreNulls)
}
def areColumnsUnique(
rel: Aggregate,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
// group by keys form a unique key
val groupKey = ImmutableBitSet.range(rel.getGroupCount)
columns.contains(groupKey)
}
def areColumnsUnique(
rel: BatchExecGroupAggregateBase,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
if (rel.isFinal) {
columns.contains(ImmutableBitSet.of(rel.getGrouping: _*))
} else {
null
}
}
def areColumnsUnique(
rel: StreamExecGroupAggregate,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
columns.contains(ImmutableBitSet.of(rel.getGroupings: _*))
}
def areColumnsUnique(
rel: StreamExecGlobalGroupAggregate,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
columns.contains(ImmutableBitSet.of(rel.getGroupings.indices.toArray: _*))
}
def areColumnsUnique(
rel: StreamExecLocalGroupAggregate,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = null
def areColumnsUnique(
rel: StreamExecGroupWindowAggregate,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
areGroupWindowColumnsUnique(
columns,
rel.getRowType.getFieldCount,
rel.getWindowProperties,
rel.getGroupings,
mq,
ignoreNulls)
}
def areColumnsUnique(
rel: FlinkLogicalWindowAggregate,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
areGroupWindowColumnsUnique(
columns,
rel.getRowType.getFieldCount,
rel.getNamedProperties,
rel.getGroupSet.toArray,
mq,
ignoreNulls)
}
def areColumnsUnique(
rel: LogicalWindowAggregate,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
areGroupWindowColumnsUnique(
columns,
rel.getRowType.getFieldCount,
rel.getNamedProperties,
rel.getGroupSet.toArray,
mq,
ignoreNulls)
}
def areColumnsUnique(
rel: BatchExecWindowAggregateBase,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
if (rel.isFinal) {
areGroupWindowColumnsUnique(
columns,
rel.getRowType.getFieldCount,
rel.getNamedProperties,
rel.getGrouping,
mq,
ignoreNulls)
} else {
null
}
}
def areColumnsUnique(
rel: StreamExecCorrelate,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = null
private def areGroupWindowColumnsUnique(
columns: ImmutableBitSet,
fieldCount: Int,
namedProperties: Seq[NamedWindowProperty],
grouping: Array[Int],
mq: RelMetadataQuery,
ignoreNulls: Boolean): JBool = {
if (namedProperties.nonEmpty) {
val begin = fieldCount - namedProperties.size
val end = fieldCount - 1
val keys = ImmutableBitSet.of(grouping: _*)
(begin to end).map {
i => keys.union(ImmutableBitSet.of(i))
}.exists(columns.contains)
} else {
false
}
}
def areJoinTableColumnsUnique(
joinInfo: JoinInfo,
joinRelType: JoinRelType,
left: RelNode,
joinedIndex: Option[IndexKey],
columns: ImmutableBitSet,
mq: RelMetadataQuery,
ignoreNulls: Boolean): JBool = {
areJoinColumnsUnique(
joinInfo, joinRelType, left.getRowType,
(leftSet: ImmutableBitSet) => mq.areColumnsUnique(left, leftSet, ignoreNulls),
(rightSet: ImmutableBitSet) => {
null != joinedIndex &&
joinedIndex.exists(index => index.isUnique && index.isIndex(rightSet.toArray))
},
mq, columns
)
}
def areColumnsUnique(
join: BatchExecTemporalTableJoin,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
areJoinTableColumnsUnique(
join.joinInfo,
join.joinType,
join.getInput,
join.joinedIndex,
columns,
mq,
ignoreNulls
)
}
def areColumnsUnique(
join: StreamExecTemporalTableJoin,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
areJoinTableColumnsUnique(
join.joinInfo,
join.joinType,
join.getInput,
join.joinedIndex,
columns,
mq,
ignoreNulls
)
}
def areColumnsUnique(
rank: Rank,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
val rankFunColumnIndex = FlinkRelMdUtil.getRankFunColumnIndex(rank)
if (rankFunColumnIndex < 0) {
mq.areColumnsUnique(rank.getInput, columns, ignoreNulls)
} else {
val childColumns = columns.clear(rankFunColumnIndex)
val isChildColumnsUnique = mq.areColumnsUnique(rank.getInput, childColumns, ignoreNulls)
if (isChildColumnsUnique != null && isChildColumnsUnique) {
true
} else {
rank.rankFunction.getKind match {
case SqlKind.ROW_NUMBER =>
val fields = columns.toArray
(rank.partitionKey.toArray :+ rankFunColumnIndex).forall(fields.contains(_))
case _ => false
}
}
}
}
def areColumnsUnique(
rel: Values,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
if (rel.tuples.size < 2) {
return true
}
columns foreach { bit =>
val columnValues = rel.tuples map { tuple =>
val literal = tuple.get(bit)
if (literal.isNull) {
NullSentinel.INSTANCE
} else {
literal.getValueAs(classOf[Comparable[_]])
}
}
if (columnValues.toSet.size == columnValues.size) {
return true
}
}
false
}
def areColumnsUnique(
rel: Converter,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = {
mq.areColumnsUnique(rel.getInput, columns, ignoreNulls)
}
/**
* Catch-all implementation for
* [[BuiltInMetadata.ColumnUniqueness#areColumnsUnique(ImmutableBitSet, boolean)]],
* invoked using reflection, for any relational expression not
* handled by a more specific method.
*
* @param rel Relational expression
* @param mq Metadata query
* @param columns column mask representing the subset of columns for which
* uniqueness will be determined
* @param ignoreNulls if true, ignore null values when determining column
* uniqueness
* @return whether the columns are unique, or
* null if not enough information is available to make that determination
* @see org.apache.calcite.rel.metadata.RelMetadataQuery#areColumnsUnique(
* RelNode, ImmutableBitSet, boolean)
*/
def areColumnsUnique(
rel: RelNode,
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBool = null
}
object FlinkRelMdColumnUniqueness {
private val INSTANCE = new FlinkRelMdColumnUniqueness
val SOURCE: RelMetadataProvider = ReflectiveRelMetadataProvider.reflectiveSource(
BuiltInMethod.COLUMN_UNIQUENESS.method, INSTANCE)
}