blob: 6b5b14261093bfdf96f3a45b1d8641b0e45efb2e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.expressions
import org.apache.calcite.rel.RelFieldCollation.{Direction, NullDirection}
import org.apache.calcite.rex.RexNode
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.table.api.types.InternalType
import org.apache.flink.table.plan.logical.LogicalExprVisitor
import org.apache.flink.table.runtime.aggregate.RelFieldCollations
import org.apache.flink.table.validate._
abstract class Ordering extends UnaryExpression {
override private[flink] def validateInput(): ValidationResult = {
if (!child.isInstanceOf[NamedExpression]) {
ValidationFailure(s"Sort should only based on field reference")
} else {
ValidationSuccess
}
}
}
case class Asc(child: Expression) extends Ordering {
override def toString: String = s"($child).asc"
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
toRexNode(relBuilder, useDefaultNullCollation = true)
}
private[flink] def toRexNode(
implicit relBuilder: RelBuilder, useDefaultNullCollation: Boolean): RexNode = {
val node = child.toRexNode
if (useDefaultNullCollation) {
if (RelFieldCollations.defaultNullDirection(Direction.ASCENDING) == NullDirection.FIRST) {
relBuilder.nullsFirst(node)
} else {
relBuilder.nullsLast(node)
}
} else {
node
}
}
override private[flink] def resultType: InternalType = child.resultType
override def accept[T](logicalExprVisitor: LogicalExprVisitor[T]): T =
logicalExprVisitor.visit(this)
}
case class Desc(child: Expression) extends Ordering {
override def toString: String = s"($child).desc"
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
toRexNode(relBuilder, useDefaultNullCollation = true)
}
private[flink] def toRexNode(
implicit relBuilder: RelBuilder, useDefaultNullCollation: Boolean): RexNode = {
val node = relBuilder.desc(child.toRexNode)
if (useDefaultNullCollation) {
if (RelFieldCollations.defaultNullDirection(Direction.DESCENDING) == NullDirection.FIRST) {
relBuilder.nullsFirst(node)
} else {
relBuilder.nullsLast(node)
}
} else {
node
}
}
override private[flink] def resultType: InternalType = child.resultType
override def accept[T](logicalExprVisitor: LogicalExprVisitor[T]): T =
logicalExprVisitor.visit(this)
}
abstract class NullsOrdering extends UnaryExpression {
override private[flink] def validateInput(): ValidationResult = {
if (child.isInstanceOf[NamedExpression] ||
child.isInstanceOf[Asc] || child.isInstanceOf[Desc]) {
ValidationSuccess
} else {
ValidationFailure(s"Nulls first/last should only based on field reference, asc or desc")
}
}
}
case class NullsFirst(child: Expression) extends NullsOrdering {
override def toString: String = s"($child).nulls_first"
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
child match {
case c: Asc =>
relBuilder.nullsFirst(
c.toRexNode(relBuilder, useDefaultNullCollation = false))
case c: Desc =>
relBuilder.nullsFirst(
c.toRexNode(relBuilder, useDefaultNullCollation = false))
case _ =>
relBuilder.nullsFirst(child.toRexNode(relBuilder))
}
}
override private[flink] def resultType: InternalType = child.resultType
override def accept[T](logicalExprVisitor: LogicalExprVisitor[T]): T =
logicalExprVisitor.visit(this)
}
case class NullsLast(child: Expression) extends Ordering {
override def toString: String = s"($child).nulls_last"
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
child match {
case c: Asc =>
relBuilder.nullsLast(
c.toRexNode(relBuilder, useDefaultNullCollation = false))
case c: Desc =>
relBuilder.nullsLast(
c.toRexNode(relBuilder, useDefaultNullCollation = false))
case _ =>
relBuilder.nullsLast(child.toRexNode(relBuilder))
}
}
override private[flink] def resultType: InternalType = child.resultType
override def accept[T](logicalExprVisitor: LogicalExprVisitor[T]): T =
logicalExprVisitor.visit(this)
}