blob: b4c7d514e1dac60f8ec87901ec82c1ecb0a171de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.expressions
import org.apache.calcite.rex.RexNode
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.table.api.UnresolvedException
import org.apache.flink.table.api.types.{DataTypes, InternalType, RowType}
import org.apache.flink.table.plan.logical.LogicalExprVisitor
import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
/**
* Flattening of composite types. All flattenings are resolved into
* `GetCompositeField` expressions.
*/
case class Flattening(child: Expression) extends UnaryExpression {
override def toString = s"$child.flatten()"
override private[flink] def resultType: InternalType =
throw UnresolvedException(s"Invalid call to on ${this.getClass}.")
override private[flink] def validateInput(): ValidationResult =
ValidationFailure(s"Unresolved flattening of $child")
override def accept[T](logicalExprVisitor: LogicalExprVisitor[T]): T =
logicalExprVisitor.visit(this)
}
case class GetCompositeField(child: Expression, key: Any) extends UnaryExpression {
private var fieldIndex: Option[Int] = None
override def toString = s"$child.get($key)"
override private[flink] def validateInput(): ValidationResult = {
// check for composite type
if (!child.resultType.isInstanceOf[RowType]) {
return ValidationFailure(s"Cannot access field of non-composite type '${child.resultType}'.")
}
val compositeType = child.resultType.asInstanceOf[RowType]
// check key
key match {
case name: String =>
val index = compositeType.getFieldIndex(name)
if (index < 0) {
ValidationFailure(s"Field name '$name' could not be found.")
} else {
fieldIndex = Some(index)
ValidationSuccess
}
case index: Int =>
if (index >= compositeType.getArity) {
ValidationFailure(s"Field index '$index' exceeds arity.")
} else {
fieldIndex = Some(index)
ValidationSuccess
}
case _ =>
ValidationFailure(s"Invalid key '$key'.")
}
}
override private[flink] def resultType: InternalType =
child.resultType.asInstanceOf[RowType].getInternalTypeAt(fieldIndex.get)
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
relBuilder
.getRexBuilder
.makeFieldAccess(child.toRexNode, fieldIndex.get)
}
override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
val child: Expression = anyRefs.head.asInstanceOf[Expression]
copy(child, key).asInstanceOf[this.type]
}
/**
* Gives a meaningful alias if possible (e.g. a$mypojo$field).
*/
private[flink] def aliasName(): Option[String] = child match {
case gcf: GetCompositeField =>
val alias = gcf.aliasName()
if (alias.isDefined) {
Some(s"${alias.get}$$$key")
} else {
None
}
case c: ResolvedFieldReference => Some(s"${c.name}$$$key")
case _ => None
}
override def accept[T](logicalExprVisitor: LogicalExprVisitor[T]): T =
logicalExprVisitor.visit(this)
}