blob: a008f881a56cea6abbf8f48502404455d18b94d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.exec.common;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.connectors.TransformationScanProvider;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Optional;
/**
* Base {@link ExecNode} to read data from an external source defined by a {@link ScanTableSource}.
*/
public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData>
implements MultipleTransformationTranslator<RowData> {
public static final String SOURCE_TRANSFORMATION = "source";
public static final String FIELD_NAME_SCAN_TABLE_SOURCE = "scanTableSource";
@JsonProperty(FIELD_NAME_SCAN_TABLE_SOURCE)
private final DynamicTableSourceSpec tableSourceSpec;
protected CommonExecTableSourceScan(
int id,
ExecNodeContext context,
ReadableConfig persistedConfig,
DynamicTableSourceSpec tableSourceSpec,
List<InputProperty> inputProperties,
LogicalType outputType,
String description) {
super(id, context, persistedConfig, inputProperties, outputType, description);
this.tableSourceSpec = tableSourceSpec;
}
@Override
public String getSimplifiedName() {
return tableSourceSpec.getContextResolvedTable().getIdentifier().getObjectName();
}
public DynamicTableSourceSpec getTableSourceSpec() {
return tableSourceSpec;
}
@Override
protected Transformation<RowData> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config) {
final StreamExecutionEnvironment env = planner.getExecEnv();
final TransformationMetadata meta = createTransformationMeta(SOURCE_TRANSFORMATION, config);
final InternalTypeInfo<RowData> outputTypeInfo =
InternalTypeInfo.of((RowType) getOutputType());
final ScanTableSource tableSource =
tableSourceSpec.getScanTableSource(
planner.getFlinkContext(), ShortcutUtils.unwrapTypeFactory(planner));
ScanTableSource.ScanRuntimeProvider provider =
tableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
if (provider instanceof SourceFunctionProvider) {
final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider;
final SourceFunction<RowData> function = sourceFunctionProvider.createSourceFunction();
final Transformation<RowData> transformation =
createSourceFunctionTransformation(
env,
function,
sourceFunctionProvider.isBounded(),
meta.getName(),
outputTypeInfo);
return meta.fill(transformation);
} else if (provider instanceof InputFormatProvider) {
final InputFormat<RowData, ?> inputFormat =
((InputFormatProvider) provider).createInputFormat();
final Transformation<RowData> transformation =
createInputFormatTransformation(
env, inputFormat, outputTypeInfo, meta.getName());
return meta.fill(transformation);
} else if (provider instanceof SourceProvider) {
final Source<RowData, ?, ?> source = ((SourceProvider) provider).createSource();
// TODO: Push down watermark strategy to source scan
final Transformation<RowData> transformation =
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
meta.getName(),
outputTypeInfo)
.getTransformation();
return meta.fill(transformation);
} else if (provider instanceof DataStreamScanProvider) {
Transformation<RowData> transformation =
((DataStreamScanProvider) provider)
.produceDataStream(createProviderContext(config), env)
.getTransformation();
meta.fill(transformation);
transformation.setOutputType(outputTypeInfo);
return transformation;
} else if (provider instanceof TransformationScanProvider) {
final Transformation<RowData> transformation =
((TransformationScanProvider) provider)
.createTransformation(createProviderContext(config));
meta.fill(transformation);
transformation.setOutputType(outputTypeInfo);
return transformation;
} else {
throw new UnsupportedOperationException(
provider.getClass().getSimpleName() + " is unsupported now.");
}
}
private ProviderContext createProviderContext(ExecNodeConfig config) {
return name -> {
if (this instanceof StreamExecNode && config.shouldSetUid()) {
return Optional.of(createTransformationUid(name, config));
}
return Optional.empty();
};
}
/**
* Adopted from {@link StreamExecutionEnvironment#addSource(SourceFunction, String,
* TypeInformation)} but with custom {@link Boundedness}.
*/
protected Transformation<RowData> createSourceFunctionTransformation(
StreamExecutionEnvironment env,
SourceFunction<RowData> function,
boolean isBounded,
String operatorName,
TypeInformation<RowData> outputTypeInfo) {
env.clean(function);
final int parallelism;
if (function instanceof ParallelSourceFunction) {
parallelism = env.getParallelism();
} else {
parallelism = 1;
}
final Boundedness boundedness;
if (isBounded) {
boundedness = Boundedness.BOUNDED;
} else {
boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
}
final StreamSource<RowData, ?> sourceOperator = new StreamSource<>(function, !isBounded);
return new LegacySourceTransformation<>(
operatorName, sourceOperator, outputTypeInfo, parallelism, boundedness);
}
/**
* Creates a {@link Transformation} based on the given {@link InputFormat}. The implementation
* is different for streaming mode and batch mode.
*/
protected abstract Transformation<RowData> createInputFormatTransformation(
StreamExecutionEnvironment env,
InputFormat<RowData, ?> inputFormat,
InternalTypeInfo<RowData> outputTypeInfo,
String operatorName);
}