blob: e98f14f0ec91813af4298715065255ff65047fad [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.logical
import org.apache.calcite.rel.RelNode
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.table.plan.TreeNode
import org.apache.flink.table.api.{TableEnvironment, ValidationException}
import org.apache.flink.table.expressions._
import org.apache.flink.table.typeutils.TypeCoercion
import org.apache.flink.table.validate._
import scala.util.Random
/**
* LogicalNode is created and validated as we construct query plan using Table API.
*
* The main validation procedure is separated into two phases:
*
* Expressions' resolution and transformation ([[resolveExpressions]]):
*
* - translate [[UnresolvedFieldReference]] into [[ResolvedFieldReference]]
* using child operator's output
* - translate [[Call]](UnresolvedFunction) into solid Expression
* - generate alias names for query output
* - ....
*
* LogicalNode validation ([[validate]]):
*
* - check no [[UnresolvedFieldReference]] exists any more
* - check if all expressions have children of needed type
* - check each logical operator have desired input
*
* Once we pass the validation phase, we can safely convert LogicalNode into Calcite's RelNode.
*/
abstract class LogicalNode extends TreeNode[LogicalNode] with LogicalNodeVisitable {
def output: Seq[Attribute]
def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
// resolve references and function calls
val exprResolved = expressionPostOrderTransform {
case u @ UnresolvedFieldReference(name) =>
// try resolve a field
resolveReference(tableEnv, name).getOrElse(u)
case c @ Call(name, children) if c.childrenValid =>
tableEnv.getFunctionCatalog.lookupFunction(name, children)
}
exprResolved.expressionPostOrderTransform {
case ips: InputTypeSpec if ips.childrenValid =>
var changed: Boolean = false
val newChildren = ips.expectedTypes.zip(ips.children).map { case (tpe, child) =>
val childType = child.resultType
if (childType != tpe && TypeCoercion.canSafelyCast(childType, tpe)) {
changed = true
Cast(child, tpe)
} else {
child
}
}.toArray[AnyRef]
if (changed) ips.makeCopy(newChildren) else ips
}
}
final def toRelNode(relBuilder: RelBuilder): RelNode = construct(relBuilder).build()
protected[logical] def construct(relBuilder: RelBuilder): RelBuilder
def validate(tableEnv: TableEnvironment): LogicalNode = {
val resolvedNode = resolveExpressions(tableEnv)
resolvedNode.expressionPostOrderTransform {
case a: Attribute if !a.valid =>
val from = children.flatMap(_.output).map(_.name).mkString(", ")
failValidation(s"Cannot resolve [${a.name}] given input [$from].")
case e: Expression if e.validateInput().isFailure =>
failValidation(s"Expression $e failed on input check: " +
s"${e.validateInput().asInstanceOf[ValidationFailure].message}")
}
}
/**
* Resolves the given strings to a [[NamedExpression]] using the input from all child
* nodes of this LogicalPlan.
*/
def resolveReference(tableEnv: TableEnvironment, name: String): Option[NamedExpression] = {
// try to resolve a field
val childrenOutput = children.flatMap(_.output)
val fieldCandidates = childrenOutput.filter(_.name.equals(name))
if (fieldCandidates.length > 1) {
failValidation(s"Reference $name is ambiguous.")
} else if (fieldCandidates.nonEmpty) {
return Some(fieldCandidates.head.withName(name))
}
// try to resolve a table
tableEnv.scanInternal(Array(name)) match {
case Some(table) => Some(TableReference(name, table))
case None => None
}
}
/**
* Runs [[postOrderTransform]] with `rule` on all expressions present in this logical node.
*
* @param rule the rule to be applied to every expression in this logical node.
*/
def expressionPostOrderTransform(rule: PartialFunction[Expression, Expression]): LogicalNode = {
var changed = false
def expressionPostOrderTransform(e: Expression): Expression = {
val newExpr = e.postOrderTransform(rule)
if (newExpr.fastEquals(e)) {
e
} else {
changed = true
newExpr
}
}
val newArgs = productIterator.map {
case e: Expression => expressionPostOrderTransform(e)
case Some(e: Expression) => Some(expressionPostOrderTransform(e))
case seq: Traversable[_] => seq.map {
case e: Expression => expressionPostOrderTransform(e)
case other => other
}
case r: Resolvable[_] => r.resolveExpressions(e => expressionPostOrderTransform(e))
case other: AnyRef => other
}.toArray
if (changed) makeCopy(newArgs) else this
}
protected def failValidation(msg: String): Nothing = {
throw new ValidationException(msg)
}
}
abstract class LeafNode extends LogicalNode {
override def children: Seq[LogicalNode] = Nil
}
abstract class UnaryNode extends LogicalNode {
def child: LogicalNode
override def children: Seq[LogicalNode] = child :: Nil
}
abstract class BinaryNode extends LogicalNode {
def left: LogicalNode
def right: LogicalNode
override def children: Seq[LogicalNode] = left :: right :: Nil
}