blob: be675998b3c9849eae6097a5ed83654ba46b5c94 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.calcite
import java.lang.{String => JString}
import java.util
import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.{Contexts, RelOptCluster, RelOptTable, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core._
import org.apache.calcite.rel.{RelCollation, RelNode}
import org.apache.calcite.rel.core.RelFactories._
import org.apache.calcite.rel.logical._
import org.apache.calcite.rex._
import org.apache.calcite.sql.{SemiJoinType, SqlKind}
import org.apache.calcite.sql.SqlKind.{EXCEPT, INTERSECT, UNION}
import org.apache.calcite.tools.{RelBuilder, RelBuilderFactory}
import org.apache.calcite.util.ImmutableBitSet
import org.apache.flink.table.api.QueryConfig
import org.apache.flink.table.calcite.FlinkRelFactories.{ExpandFactory, SinkFactory}
import org.apache.flink.table.plan.nodes.calcite.LogicalExpand
import org.apache.flink.table.plan.nodes.logical._
import org.apache.flink.table.plan.schema.FlinkRelOptTable
import org.apache.flink.table.sinks.TableSink
import scala.collection.JavaConversions._
/**
* Contains factory interface and default implementation for creating various
* flink logical rel nodes.
*/
object FlinkLogicalRelFactories {
val FLINK_LOGICAL_PROJECT_FACTORY = new ProjectFactoryImpl
val FLINK_LOGICAL_FILTER_FACTORY = new FilterFactoryImpl
val FLINK_LOGICAL_JOIN_FACTORY = new JoinFactoryImpl
val FLINK_LOGICAL_CORRELATE_FACTORY = new CorrelateFactoryImpl
val FLINK_LOGICAL_SEMI_JOIN_FACTORY = new SemiJoinFactoryImpl
val FLINK_LOGICAL_SORT_FACTORY = new SortFactoryImpl
val FLINK_LOGICAL_AGGREGATE_FACTORY = new AggregateFactoryImpl
val FLINK_LOGICAL_MATCH_FACTORY = new MatchFactoryImpl
val FLINK_LOGICAL_SET_OP_FACTORY = new SetOpFactoryImpl
val FLINK_LOGICAL_VALUES_FACTORY = new ValuesFactoryImpl
val FLINK_LOGICAL_TABLE_SCAN_FACTORY = new TableScanFactoryImpl
val FLINK_LOGICAL_EXPAND_FACTORY = new ExpandFactoryImpl
/** A [[RelBuilderFactory]] that creates a [[RelBuilder]] that will
* create logical relational expressions for everything. */
val FLINK_LOGICAL_REL_BUILDER: RelBuilderFactory = FlinkRelBuilder.proto(
Contexts.of(
FLINK_LOGICAL_PROJECT_FACTORY,
FLINK_LOGICAL_FILTER_FACTORY,
FLINK_LOGICAL_JOIN_FACTORY,
FLINK_LOGICAL_SEMI_JOIN_FACTORY,
FLINK_LOGICAL_SORT_FACTORY,
FLINK_LOGICAL_AGGREGATE_FACTORY,
FLINK_LOGICAL_MATCH_FACTORY,
FLINK_LOGICAL_SET_OP_FACTORY,
FLINK_LOGICAL_VALUES_FACTORY,
FLINK_LOGICAL_TABLE_SCAN_FACTORY,
FLINK_LOGICAL_EXPAND_FACTORY))
/**
* Implementation of [[ProjectFactory]] that returns a [[FlinkLogicalCalc]].
*/
class ProjectFactoryImpl extends ProjectFactory {
def createProject(
input: RelNode,
childExprs: util.List[_ <: RexNode],
fieldNames: util.List[JString]): RelNode = {
val rexBuilder = input.getCluster.getRexBuilder
val inputRowType = input.getRowType
val programBuilder = new RexProgramBuilder(inputRowType, rexBuilder)
childExprs.zip(fieldNames).foreach { case (childExpr, fieldName) =>
programBuilder.addProject(childExpr, fieldName)
}
val program = programBuilder.getProgram
FlinkLogicalCalc.create(input, program)
}
}
/**
* Implementation of [[SortFactory]] that returns a [[FlinkLogicalSort]].
*/
class SortFactoryImpl extends SortFactory {
def createSort(
input: RelNode,
collation: RelCollation,
offset: RexNode,
fetch: RexNode): RelNode = {
FlinkLogicalSort.create(input, collation, offset, fetch)
}
@deprecated // to be removed before 2.0
def createSort(
traits: RelTraitSet, input: RelNode,
collation: RelCollation, offset: RexNode, fetch: RexNode): RelNode = {
createSort(input, collation, offset, fetch)
}
}
/**
* Implementation of [[SetOpFactory]] that
* returns a [[org.apache.calcite.rel.core.SetOp]] for the particular kind of set
* operation (UNION, EXCEPT, INTERSECT).
*/
class SetOpFactoryImpl extends SetOpFactory {
def createSetOp(kind: SqlKind, inputs: util.List[RelNode], all: Boolean): RelNode = {
kind match {
case UNION =>
FlinkLogicalUnion.create(inputs, all)
case EXCEPT =>
FlinkLogicalMinus.create(inputs, all)
case INTERSECT =>
FlinkLogicalIntersect.create(inputs, all)
case _ =>
throw new AssertionError("not a set op: " + kind)
}
}
}
/**
* Implementation of [[AggregateFactory]] that returns a [[FlinkLogicalAggregate]].
*/
class AggregateFactoryImpl extends AggregateFactory {
@SuppressWarnings(Array("deprecation"))
def createAggregate(
input: RelNode,
indicator: Boolean,
groupSet: ImmutableBitSet,
groupSets: ImmutableList[ImmutableBitSet],
aggCalls: util.List[AggregateCall]): RelNode = {
FlinkLogicalAggregate.create(input, indicator, groupSet, groupSets, aggCalls)
}
}
/**
* Implementation of [[FilterFactory]] that returns a [[FlinkLogicalCalc]].
*/
class FilterFactoryImpl extends FilterFactory {
def createFilter(input: RelNode, condition: RexNode): RelNode = {
// Create a program containing a filter.
val rexBuilder = input.getCluster.getRexBuilder
val inputRowType = input.getRowType
val programBuilder = new RexProgramBuilder(inputRowType, rexBuilder)
programBuilder.addIdentity()
programBuilder.addCondition(condition)
val program = programBuilder.getProgram
FlinkLogicalCalc.create(input, program)
}
}
/**
* Implementation of [[JoinFactory]] that returns a vanilla
* [[org.apache.calcite.rel.logical.LogicalJoin]].
*/
class JoinFactoryImpl extends JoinFactory {
def createJoin(
left: RelNode,
right: RelNode,
condition: RexNode,
variablesSet: util.Set[CorrelationId],
joinType: JoinRelType,
semiJoinDone: Boolean): RelNode = {
FlinkLogicalJoin.create(left, right, condition, joinType)
}
def createJoin(
left: RelNode,
right: RelNode,
condition: RexNode,
joinType: JoinRelType,
variablesStopped: util.Set[JString],
semiJoinDone: Boolean): RelNode = {
createJoin(
left,
right,
condition,
CorrelationId.setOf(variablesStopped),
joinType,
semiJoinDone)
}
}
/**
* Implementation of [[CorrelateFactory]] that returns a [[FlinkLogicalCorrelate]].
*/
class CorrelateFactoryImpl extends CorrelateFactory {
def createCorrelate(
left: RelNode,
right: RelNode,
correlationId: CorrelationId,
requiredColumns: ImmutableBitSet,
joinType: SemiJoinType): RelNode = {
FlinkLogicalCorrelate.create(left, right, correlationId, requiredColumns, joinType)
}
}
/**
* Implementation of [[SemiJoinFactory} that returns a [[FlinkLogicalSemiJoin]].
*/
class SemiJoinFactoryImpl extends SemiJoinFactory {
def createSemiJoin(left: RelNode, right: RelNode, condition: RexNode): RelNode = {
val joinInfo: JoinInfo = JoinInfo.of(left, right, condition)
FlinkLogicalSemiJoin.create(
left, right, condition, joinInfo.leftKeys, joinInfo.rightKeys, isAntiJoin = false)
}
def createAntiJoin(left: RelNode, right: RelNode, condition: RexNode): RelNode = {
val joinInfo: JoinInfo = JoinInfo.of(left, right, condition)
FlinkLogicalSemiJoin.create(
left, right, condition, joinInfo.leftKeys, joinInfo.rightKeys, isAntiJoin = true)
}
}
/**
* Implementation of [[ValuesFactory]] that returns a [[FlinkLogicalValues]].
*/
class ValuesFactoryImpl extends ValuesFactory {
def createValues(
cluster: RelOptCluster,
rowType: RelDataType,
tuples: util.List[ImmutableList[RexLiteral]]): RelNode = {
FlinkLogicalValues.create(
cluster, rowType, ImmutableList.copyOf[ImmutableList[RexLiteral]](tuples))
}
}
/**
* Implementation of [[TableScanFactory]] that returns a
* [[FlinkLogicalTableSourceScan]] or [[FlinkLogicalNativeTableScan]].
*/
class TableScanFactoryImpl extends TableScanFactory {
def createScan(cluster: RelOptCluster, table: RelOptTable): RelNode = {
val tableScan = LogicalTableScan.create(cluster, table)
tableScan match {
case tss: LogicalTableScan if FlinkLogicalTableSourceScan.isLogicalTableSourceScan(tss) =>
FlinkLogicalTableSourceScan.create(cluster, tss.getTable.asInstanceOf[FlinkRelOptTable])
case nts: LogicalTableScan if FlinkLogicalNativeTableScan.isLogicalNativeTableScan(nts) =>
FlinkLogicalNativeTableScan.create(cluster, nts.getTable.asInstanceOf[FlinkRelOptTable])
}
}
}
/**
* Implementation of [[MatchFactory]] that returns a [[LogicalMatch]].
*/
class MatchFactoryImpl extends MatchFactory {
def createMatch(
input: RelNode,
pattern: RexNode,
rowType: RelDataType,
strictStart: Boolean,
strictEnd: Boolean,
patternDefinitions: util.Map[JString, RexNode],
measures: util.Map[JString, RexNode],
after: RexNode,
subsets: util.Map[JString, _ <: util.SortedSet[JString]],
rowsPerMatch: RexNode,
partitionKeys: util.List[RexNode],
orderKeys: RelCollation,
interval: RexNode): RelNode = {
FlinkLogicalMatch.create(
input,
pattern,
strictStart,
strictEnd,
patternDefinitions,
measures,
after,
subsets,
rowsPerMatch,
partitionKeys,
orderKeys,
interval,
rowType)
}
}
/**
* Implementation of [[FlinkRelFactories#ExpandFactory]] that returns a
* [[LogicalExpand]].
*/
class ExpandFactoryImpl extends ExpandFactory {
def createExpand(
input: RelNode,
rowType: RelDataType,
projects: util.List[util.List[RexNode]],
expandIdIndex: Int): RelNode = {
FlinkLogicalExpand.create(input, rowType, projects, expandIdIndex)
}
}
/**
* Implementation of [[FlinkRelFactories#SinkFactory]] that returns a [[FlinkLogicalSink]].
*/
class SinkFactoryImpl extends SinkFactory {
def createSink(
input: RelNode,
sink: TableSink[_],
sinkName: String): RelNode = {
FlinkLogicalSink.create(input, sink, sinkName)
}
}
}