blob: 2c7a33042a0bc9c4cfe92c4b864ac2416b44532e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.planner.plan.nodes.exec;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.serde.ConfigurationJsonSerializerFilter;
import org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JacksonInject;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Base class for {@link ExecNode}.
*
* @param <T> The type of the elements that result from this node.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class ExecNodeBase<T> implements ExecNode<T> {
/**
* The default value of this flag is false. Other cases must set this flag accordingly via
* {@link #setCompiled(boolean)}. It is not exposed via a constructor arg to avoid complex
* constructor overloading for all {@link ExecNode}s. However, during deserialization this flag
* will always be set to true.
*/
@JacksonInject("isDeserialize")
private boolean isCompiled;
private final String description;
private final LogicalType outputType;
private final List<InputProperty> inputProperties;
private List<ExecEdge> inputEdges;
private transient Transformation<T> transformation;
/** Holds the context information (id, name, version) as deserialized from a JSON plan. */
@JsonProperty(value = FIELD_NAME_TYPE, access = JsonProperty.Access.WRITE_ONLY)
private final ExecNodeContext context;
/**
* Retrieves the default context from the {@link ExecNodeMetadata} annotation to be serialized
* into the JSON plan.
*/
@JsonProperty(value = FIELD_NAME_TYPE, access = JsonProperty.Access.READ_ONLY, index = 1)
protected final ExecNodeContext getContextFromAnnotation() {
return ExecNodeContext.newContext(this.getClass()).withId(getId());
}
@JsonProperty(value = FIELD_NAME_CONFIGURATION, access = JsonProperty.Access.WRITE_ONLY)
private final ReadableConfig persistedConfig;
@JsonProperty(
value = FIELD_NAME_CONFIGURATION,
access = JsonProperty.Access.READ_ONLY,
index = 2)
// Custom filter to exclude node configuration if no consumed options are used
@JsonInclude(
value = JsonInclude.Include.CUSTOM,
valueFilter = ConfigurationJsonSerializerFilter.class)
public ReadableConfig getPersistedConfig() {
return persistedConfig;
}
protected ExecNodeBase(
int id,
ExecNodeContext context,
ReadableConfig persistedConfig,
List<InputProperty> inputProperties,
LogicalType outputType,
String description) {
this.context = checkNotNull(context).withId(id);
this.persistedConfig = persistedConfig == null ? new Configuration() : persistedConfig;
this.inputProperties = checkNotNull(inputProperties);
this.outputType = checkNotNull(outputType);
this.description = checkNotNull(description);
}
@Override
public final int getId() {
return context.getId();
}
@Override
public String getDescription() {
return description;
}
@Override
public LogicalType getOutputType() {
return outputType;
}
@Override
public List<InputProperty> getInputProperties() {
return inputProperties;
}
@Override
public List<ExecEdge> getInputEdges() {
return checkNotNull(
inputEdges,
"inputEdges should not null, please call `setInputEdges(List<ExecEdge>)` first.");
}
@Override
public void setInputEdges(List<ExecEdge> inputEdges) {
checkNotNull(inputEdges, "inputEdges should not be null.");
this.inputEdges = new ArrayList<>(inputEdges);
}
@Override
public void replaceInputEdge(int index, ExecEdge newInputEdge) {
List<ExecEdge> edges = getInputEdges();
checkArgument(index >= 0 && index < edges.size());
edges.set(index, newInputEdge);
}
@Override
public final Transformation<T> translateToPlan(Planner planner) {
if (transformation == null) {
transformation =
translateToPlanInternal(
(PlannerBase) planner,
ExecNodeConfig.of(
((PlannerBase) planner).getTableConfig(),
persistedConfig,
isCompiled));
if (this instanceof SingleTransformationTranslator) {
if (inputsContainSingleton()) {
transformation.setParallelism(1);
transformation.setMaxParallelism(1);
}
}
}
return transformation;
}
@Override
public void accept(ExecNodeVisitor visitor) {
visitor.visit(this);
}
@Override
public void setCompiled(boolean compiled) {
isCompiled = compiled;
}
/**
* Internal method, translates this node into a Flink operator.
*
* @param planner The planner.
* @param config per-{@link ExecNode} configuration that contains the merged configuration from
* various layers which all the nodes implementing this method should use, instead of
* retrieving configuration from the {@code planner}. For more details check {@link
* ExecNodeConfig}.
*/
protected abstract Transformation<T> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config);
/** Whether singleton distribution is required. */
protected boolean inputsContainSingleton() {
return getInputProperties().stream()
.anyMatch(
p ->
p.getRequiredDistribution().getType()
== InputProperty.DistributionType.SINGLETON);
}
@JsonIgnore
protected String getSimplifiedName() {
return getClass().getSimpleName().replace("StreamExec", "").replace("BatchExec", "");
}
protected String createTransformationUid(String operatorName, ExecNodeConfig config) {
return context.generateUid(operatorName, config);
}
protected String createTransformationName(ReadableConfig config) {
return createFormattedTransformationName(getDescription(), getSimplifiedName(), config);
}
protected String createTransformationDescription(ReadableConfig config) {
return createFormattedTransformationDescription(getDescription(), config);
}
protected TransformationMetadata createTransformationMeta(
String operatorName, ExecNodeConfig config) {
if (ExecNodeMetadataUtil.isUnsupported(this.getClass()) || !config.shouldSetUid()) {
return new TransformationMetadata(
createTransformationName(config), createTransformationDescription(config));
} else {
return new TransformationMetadata(
createTransformationUid(operatorName, config),
createTransformationName(config),
createTransformationDescription(config));
}
}
protected TransformationMetadata createTransformationMeta(
String operatorName, String detailName, String simplifiedName, ExecNodeConfig config) {
final String name = createFormattedTransformationName(detailName, simplifiedName, config);
final String desc = createFormattedTransformationDescription(detailName, config);
if (ExecNodeMetadataUtil.isUnsupported(this.getClass()) || !config.shouldSetUid()) {
return new TransformationMetadata(name, desc);
} else {
return new TransformationMetadata(
createTransformationUid(operatorName, config), name, desc);
}
}
protected String createFormattedTransformationDescription(
String description, ReadableConfig config) {
if (config.get(ExecutionConfigOptions.TABLE_EXEC_SIMPLIFY_OPERATOR_NAME_ENABLED)) {
return String.format("[%d]:%s", getId(), description);
}
return description;
}
protected String createFormattedTransformationName(
String detailName, String simplifiedName, ReadableConfig config) {
if (config.get(ExecutionConfigOptions.TABLE_EXEC_SIMPLIFY_OPERATOR_NAME_ENABLED)) {
return String.format("%s[%d]", simplifiedName, getId());
}
return detailName;
}
public void resetTransformation() {
this.transformation = null;
}
}