blob: e8f9ff4efdb70044edc42e93e80291eb2d14b535 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.flink.api.common.functions.FlatJoinFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator
import org.apache.flink.table.api.{StreamQueryConfig, TableConfig}
import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.CRowKeySelector
import org.apache.flink.table.runtime.join._
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.types.Row
class DataStreamJoinToCoProcessTranslator(
config: TableConfig,
returnType: TypeInformation[Row],
leftSchema: RowSchema,
rightSchema: RowSchema,
joinInfo: JoinInfo,
rexBuilder: RexBuilder) {
val nonEquiJoinPredicates: Option[RexNode] = if (joinInfo.isEqui) {
None
}
else {
Some(joinInfo.getRemaining(rexBuilder))
}
def getLeftKeySelector(): CRowKeySelector = {
new CRowKeySelector(
joinInfo.leftKeys.toIntArray,
leftSchema.projectedTypeInfo(joinInfo.leftKeys.toIntArray))
}
def getRightKeySelector(): CRowKeySelector = {
new CRowKeySelector(
joinInfo.rightKeys.toIntArray,
rightSchema.projectedTypeInfo(joinInfo.rightKeys.toIntArray))
}
def getJoinOperator(
joinType: JoinRelType,
returnFieldNames: Seq[String],
ruleDescription: String,
queryConfig: StreamQueryConfig): TwoInputStreamOperator[CRow, CRow, CRow] = {
// input must not be nullable, because the runtime join function will make sure
// the code-generated function won't process null inputs
val generator = new FunctionCodeGenerator(
config,
nullableInput = false,
leftSchema.typeInfo,
Some(rightSchema.typeInfo))
val conversion = generator.generateConverterResultExpression(
returnType,
returnFieldNames)
val body = if (nonEquiJoinPredicates.isEmpty) {
// only equality condition
s"""
|${conversion.code}
|${generator.collectorTerm}.collect(${conversion.resultTerm});
|""".stripMargin
} else {
val condition = generator.generateExpression(nonEquiJoinPredicates.get)
s"""
|${condition.code}
|if (${condition.resultTerm}) {
| ${conversion.code}
| ${generator.collectorTerm}.collect(${conversion.resultTerm});
|}
|""".stripMargin
}
val genFunction = generator.generateFunction(
ruleDescription,
classOf[FlatJoinFunction[Row, Row, Row]],
body,
returnType)
createJoinOperator(joinType, queryConfig, genFunction)
}
protected def createJoinOperator(
joinType: JoinRelType,
queryConfig: StreamQueryConfig,
genFunction: GeneratedFunction[FlatJoinFunction[Row, Row, Row], Row])
: TwoInputStreamOperator[CRow, CRow, CRow] = {
val joinFunction = joinType match {
case JoinRelType.INNER =>
new NonWindowInnerJoin(
leftSchema.typeInfo,
rightSchema.typeInfo,
genFunction.name,
genFunction.code,
queryConfig)
case JoinRelType.LEFT | JoinRelType.RIGHT if joinInfo.isEqui =>
new NonWindowLeftRightJoin(
leftSchema.typeInfo,
rightSchema.typeInfo,
genFunction.name,
genFunction.code,
joinType == JoinRelType.LEFT,
queryConfig)
case JoinRelType.LEFT | JoinRelType.RIGHT =>
new NonWindowLeftRightJoinWithNonEquiPredicates(
leftSchema.typeInfo,
rightSchema.typeInfo,
genFunction.name,
genFunction.code,
joinType == JoinRelType.LEFT,
queryConfig)
case JoinRelType.FULL if joinInfo.isEqui =>
new NonWindowFullJoin(
leftSchema.typeInfo,
rightSchema.typeInfo,
genFunction.name,
genFunction.code,
queryConfig)
case JoinRelType.FULL =>
new NonWindowFullJoinWithNonEquiPredicates(
leftSchema.typeInfo,
rightSchema.typeInfo,
genFunction.name,
genFunction.code,
queryConfig)
}
new KeyedCoProcessOperator(joinFunction)
}
}