blob: 56da48a06781764d579d79cc5f92699e64418dbe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.plan.nodes.common
import org.apache.flink.api.common.functions.InvalidTypesException
import org.apache.flink.api.common.operators.ResourceSpec
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.transformations.{OneInputTransformation, StreamTransformation}
import org.apache.flink.table.api.types._
import org.apache.flink.table.api.{TableConfig, TableConfigOptions}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenUtils._
import org.apache.flink.table.codegen.CodeGeneratorContext._
import org.apache.flink.table.codegen._
import org.apache.flink.table.codegen.operator.OperatorCodeGenerator
import org.apache.flink.table.codegen.operator.OperatorCodeGenerator.{ELEMENT, STREAM_RECORD, generatorCollect}
import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
import org.apache.flink.table.runtime.OneInputSubstituteStreamOperator
import org.apache.flink.table.runtime.conversion.DataStructureConverters.genToInternal
import org.apache.flink.table.sources.{BatchTableSource, LookupableTableSource, StreamTableSource, TableSource}
import org.apache.flink.table.typeutils.BaseRowTypeInfo
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.RexNode
import java.util.{List => JList}
import scala.collection.JavaConversions._
/**
* Common class for batch and stream scans.
*/
trait CommonScan[T] {
// because source resourceSpec may cannot convert to {@link NodeResource}
// TODO conversionResSpec will be instead of conversion heap.
private[flink] var sourceResSpec: ResourceSpec = _
private[flink] var conversionResSpec: ResourceSpec = _
def setResForSourceAndConversion(
sourceResSpec: ResourceSpec,
conversionResSpec: ResourceSpec): Unit = {
this.sourceResSpec = sourceResSpec
this.conversionResSpec = conversionResSpec
}
// get source transformation.
private[flink] def getSourceTransformation(
streamEnv: StreamExecutionEnvironment): StreamTransformation[_]
private[flink] def needsConversion(dataType: DataType, clz: Class[_]): Boolean = dataType match {
case r: RowType => !CodeGenUtils.isInternalClass(clz, r)
case t: TypeInfoWrappedDataType if t.getTypeInfo.isInstanceOf[BaseRowTypeInfo] => false
case _ => true
}
private[flink] def extractTableSourceTypeClass(source: TableSource): Class[_] = {
try {
source match {
case s: BatchTableSource[_] =>
TypeExtractor.createTypeInfo(source, classOf[BatchTableSource[_]], source.getClass, 0)
.getTypeClass.asInstanceOf[Class[_]]
case s: StreamTableSource[_] =>
TypeExtractor.createTypeInfo(source, classOf[StreamTableSource[_]], source.getClass, 0)
.getTypeClass.asInstanceOf[Class[_]]
case s: LookupableTableSource[_] =>
TypeExtractor.createTypeInfo(
source, classOf[LookupableTableSource[_]], source.getClass, 0)
.getTypeClass.asInstanceOf[Class[_]]
}
} catch {
case _: InvalidTypesException =>
classOf[Object]
}
}
/**
* @param qualifiedName qualified name for table
*/
private[flink] def getOperatorName(qualifiedName: JList[String], rowType: RelDataType): String = {
val s = s"table:$qualifiedName, fields:(${rowType.getFieldNames.mkString(", ")})"
s"SourceConversion($s)"
}
private[flink] def hasTimeAttributeField(indexes: Array[Int]) =
indexes.contains(DataTypes.ROWTIME_STREAM_MARKER)||
indexes.contains(DataTypes.ROWTIME_BATCH_MARKER)||
indexes.contains(DataTypes.PROCTIME_STREAM_MARKER)||
indexes.contains(DataTypes.PROCTIME_BATCH_MARKER)
def needInternalConversion: Boolean
private[flink] def convertToInternalRow(
ctx: CodeGeneratorContext,
input: StreamTransformation[Any],
fieldIndexes: Array[Int],
inputType: DataType,
outRowType: RelDataType,
qualifiedName: JList[String],
config: TableConfig,
rowtimeExpr: Option[RexNode] = None): StreamTransformation[BaseRow] = {
val outputRowType = FlinkTypeFactory.toInternalRowType(outRowType)
// conversion
val convertName = "SourceConversion"
// type convert
val inputTerm = DEFAULT_INPUT1_TERM
val (inputTermConverter, internalInputType: InternalType) = {
val convertFunc = genToInternal(ctx, inputType)
if (inputType.toInternalType.isInstanceOf[RowType]) {
(convertFunc, inputType.toInternalType)
} else {
(
(record: String) =>
s"${classOf[GenericRow].getCanonicalName}.wrap(${convertFunc(record)})",
new RowType(inputType.toInternalType)
)
}
}
var codeSplit = GeneratedSplittableExpression.UNSPLIT_EXPRESSION
val (inputTypes, inputNames) = inputType.toInternalType match {
case rowType: RowType => (rowType.getFieldInternalTypes, rowType.getFieldNames)
case t => (Array(t), Array("f0"))
}
val processCode =
if ((inputTypes sameElements outputRowType.getFieldInternalTypes) &&
(inputNames sameElements outputRowType.getFieldNames) &&
!hasTimeAttributeField(fieldIndexes)) {
s"${generatorCollect(inputTerm)}"
} else {
// field index change (pojo)
val resultGenerator = new ExprCodeGenerator(ctx, false, config.getNullCheck)
.bindInput(
internalInputType, // this must be a RowType
inputTerm = inputTerm,
inputFieldMapping = Some(fieldIndexes))
val inputTypeTerm = boxedTypeTermForType(internalInputType)
val conversion = resultGenerator.generateConverterResultExpression(
outputRowType, classOf[GenericRow], rowtimeExpression = rowtimeExpr)
codeSplit = CodeGenUtils.generateSplitFunctionCalls(
conversion.codeBuffer,
config.getConf.getInteger(TableConfigOptions.SQL_CODEGEN_LENGTH_MAX),
"SourceConversionApply",
"private final void",
ctx.reuseFieldCode().length,
defineParams = s"$inputTypeTerm $inputTerm, $STREAM_RECORD $ELEMENT",
callingParams = s"$inputTerm, $ELEMENT"
)
val conversionCode = if (codeSplit.isSplit) {
codeSplit.callings.mkString("\n")
} else {
conversion.code
}
// extract time if the index is -1 or -2.
val (extractElement, resetElement) =
if (hasTimeAttributeField(fieldIndexes)) {
(s"ctx.$ELEMENT = $ELEMENT;", s"ctx.$ELEMENT = null;")
}
else {
("", "")
}
s"""
|$extractElement
|$conversionCode
|${generatorCollect(conversion.resultTerm)}
|$resetElement
|""".stripMargin
}
val generatedOperator = OperatorCodeGenerator.generateOneInputStreamOperator[Any, BaseRow](
ctx,
convertName,
processCode,
"",
outputRowType,
config,
splitFunc = codeSplit,
converter = inputTermConverter)
val substituteStreamOperator = new OneInputSubstituteStreamOperator[Any, BaseRow](
generatedOperator.name,
generatedOperator.code,
references = ctx.references)
new OneInputTransformation(
input,
getOperatorName(qualifiedName, outRowType),
substituteStreamOperator,
TypeConverters.toBaseRowTypeInfo(outputRowType),
input.getParallelism)
}
}
object CommonScan {
/**
* TableSource has no generic type, so we need match XXTableSources...
*/
private[flink] def extractTableSourceTypeClass(source: TableSource): Class[_] = {
try {
source match {
case _: BatchTableSource[_] =>
TypeExtractor.createTypeInfo(source, classOf[BatchTableSource[_]], source.getClass, 0)
.getTypeClass.asInstanceOf[Class[_]]
case _: StreamTableSource[_] =>
TypeExtractor.createTypeInfo(source, classOf[StreamTableSource[_]], source.getClass, 0)
.getTypeClass.asInstanceOf[Class[_]]
case _: LookupableTableSource[_] =>
TypeExtractor.createTypeInfo(
source, classOf[LookupableTableSource[_]], source.getClass, 0)
.getTypeClass.asInstanceOf[Class[_]]
case _ => throw new RuntimeException(s"Unknown source: $source")
}
} catch {
case _: InvalidTypesException =>
classOf[Object]
}
}
}