| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.table.planner.plan.nodes.exec; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.annotation.VisibleForTesting; |
| import org.apache.flink.configuration.ReadableConfig; |
| import org.apache.flink.table.api.TableConfig; |
| import org.apache.flink.table.api.TableException; |
| import org.apache.flink.table.api.config.ExecutionConfigOptions; |
| import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil; |
| import org.apache.flink.table.types.logical.LogicalType; |
| |
| import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; |
| import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue; |
| import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DatabindContext; |
| |
| import org.apache.commons.lang3.StringUtils; |
| |
| import javax.annotation.Nullable; |
| |
| import java.util.List; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.regex.Pattern; |
| import java.util.stream.Stream; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** |
| * Helper class that holds the necessary identifier fields that are used for JSON plan serialization |
| * and deserialization. It is instantiated using {@link ExecNodeContext#newContext(Class)} when |
| * creating a new instance of an {@link ExecNode}, so that is contains the info from the {@link |
| * ExecNodeMetadata} annotation of the class with the latest {@link ExecNodeMetadata#version()}. It |
| * can also be instantiated with {@link ExecNodeContext#ExecNodeContext(String)} automatically when |
| * the {@link ExecNode} is deserialized from a JSON Plan, and in this case the {@link |
| * ExecNodeContext} contains the version that is read from the JSON Plan and not the latest one. The |
| * serialization format is {@code <name>_<version>}, see {@link ExecNodeContext#getTypeAsString()}. |
| */ |
| @Internal |
| public final class ExecNodeContext { |
| |
| private static final Pattern transformationNamePattern = Pattern.compile("[a-z\\-]+"); |
| |
| /** This is used to assign a unique ID to every ExecNode. */ |
| private static final AtomicInteger idCounter = new AtomicInteger(0); |
| |
| /** Generate an unique ID for ExecNode. */ |
| public static int newNodeId() { |
| return idCounter.incrementAndGet(); |
| } |
| |
| /** Reset the id counter to 0. */ |
| @VisibleForTesting |
| public static void resetIdCounter() { |
| idCounter.set(0); |
| } |
| |
| private final Integer id; |
| private final String name; |
| private final Integer version; |
| |
| private ExecNodeContext() { |
| this(null, null, null); |
| } |
| |
| private ExecNodeContext(String name, Integer version) { |
| this(null, name, version); |
| } |
| |
| /** |
| * @param id The unique id of the {@link ExecNode}. See {@link ExecNode#getId()}. It can be null |
| * initially and then later set by using {@link #withId(int)} which creates a new instance |
| * of {@link ExecNodeContext} since it's immutable. This way we can satisfy both the {@link |
| * ExecNodeBase#ExecNodeBase(int, ExecNodeContext, ReadableConfig, List, LogicalType, |
| * String)} ctor, which is used for the {@link JsonCreator} ctors, where the {@code id} and |
| * the {@code context} are read separately, and the {@link |
| * ExecNodeBase#getContextFromAnnotation()} which creates a new context with a new id |
| * provided by: {@link #newNodeId()}. |
| * @param name The name of the {@link ExecNode}. See {@link ExecNodeMetadata#name()}. |
| * @param version The version of the {@link ExecNode}. See {@link ExecNodeMetadata#version()}. |
| */ |
| private ExecNodeContext(@Nullable Integer id, String name, Integer version) { |
| this.id = id; |
| this.name = name; |
| this.version = version; |
| } |
| |
| @JsonCreator |
| public ExecNodeContext(String value) { |
| this.id = null; |
| String[] split = value.split("_"); |
| this.name = split[0]; |
| this.version = Integer.valueOf(split[1]); |
| } |
| |
| /** The unique identifier for each ExecNode in the JSON plan. */ |
| int getId() { |
| return checkNotNull(id); |
| } |
| |
| /** The type identifying an ExecNode in the JSON plan. See {@link ExecNodeMetadata#name()}. */ |
| public String getName() { |
| return name; |
| } |
| |
| /** The version of the ExecNode in the JSON plan. See {@link ExecNodeMetadata#version()}. */ |
| public Integer getVersion() { |
| return version; |
| } |
| |
| /** Returns a new {@code uid} for transformations. */ |
| public String generateUid(String transformationName, ExecNodeConfig config) { |
| if (!transformationNamePattern.matcher(transformationName).matches()) { |
| throw new TableException( |
| "Invalid transformation name '" |
| + transformationName |
| + "'. " |
| + "This is a bug, please file an issue."); |
| } |
| final String uidPattern = config.get(ExecutionConfigOptions.TABLE_EXEC_UID_FORMAT); |
| // Note: name and version are not included in the UID by default as they would prevent |
| // migration. |
| // No version because: An operator can change its state layout and bump up the ExecNode |
| // version, in this case the UID should still be able to map state even after plan |
| // migration to the new version. |
| // No name because: We might fuse operators in the future, and a new operator might |
| // subscribe to multiple old UIDs. |
| return StringUtils.replaceEach( |
| uidPattern, |
| new String[] {"<id>", "<type>", "<version>", "<transformation>"}, |
| new String[] { |
| String.valueOf(id), name, String.valueOf(version), transformationName |
| }); |
| } |
| |
| /** |
| * Set the unique ID of the node, so that the {@link ExecNodeContext}, together with the type |
| * related {@link #name} and {@link #version}, stores all the necessary info to uniquely |
| * reconstruct the {@link ExecNode}, and avoid storing the {@link #id} independently as a field |
| * in {@link ExecNodeBase}. |
| */ |
| public ExecNodeContext withId(int id) { |
| return new ExecNodeContext(id, this.name, this.version); |
| } |
| |
| /** |
| * Returns the {@link #name} and {@link #version}, to be serialized into the JSON plan as one |
| * string, which in turn will be parsed by {@link ExecNodeContext#ExecNodeContext(String)} when |
| * deserialized from a JSON plan or when needed by {@link |
| * ExecNodeTypeIdResolver#typeFromId(DatabindContext, String)}. |
| */ |
| @JsonValue |
| public String getTypeAsString() { |
| return name + "_" + version; |
| } |
| |
| @Override |
| public String toString() { |
| return getId() + "_" + getTypeAsString(); |
| } |
| |
| public static <T extends ExecNode<?>> ExecNodeContext newContext(Class<T> execNodeClass) { |
| ExecNodeMetadata metadata = ExecNodeMetadataUtil.latestAnnotation(execNodeClass); |
| if (metadata == null) { |
| if (!ExecNodeMetadataUtil.isUnsupported(execNodeClass)) { |
| throw new IllegalStateException( |
| String.format( |
| "ExecNode: %s is not listed in the unsupported classes since it is not annotated with: %s.", |
| execNodeClass.getCanonicalName(), |
| ExecNodeMetadata.class.getSimpleName())); |
| } |
| return new ExecNodeContext(); |
| } |
| if (!ExecNodeMetadataUtil.execNodes().contains(execNodeClass)) { |
| throw new IllegalStateException( |
| String.format( |
| "ExecNode: %s is not listed in the supported classes and yet is annotated with: %s.", |
| execNodeClass.getCanonicalName(), |
| ExecNodeMetadata.class.getSimpleName())); |
| } |
| return new ExecNodeContext(metadata.name(), metadata.version()); |
| } |
| |
| /** |
| * Create a configuration for the {@link ExecNode}, ready to be persisted to a JSON plan. |
| * |
| * @param execNodeClass The {@link ExecNode} class. |
| * @param tableConfig The planner configuration (include the {@link TableConfig}). |
| * @return The {@link ExecNode} configuration, which contains the consumed options for the node, |
| * defined by {@link ExecNodeMetadata#consumedOptions()}, along with their values. |
| */ |
| public static <T extends ExecNode<?>> ReadableConfig newPersistedConfig( |
| Class<T> execNodeClass, ReadableConfig tableConfig) { |
| return ExecNodeMetadataUtil.newPersistedConfig( |
| execNodeClass, |
| tableConfig, |
| Stream.concat( |
| ExecNodeMetadataUtil.TABLE_CONFIG_OPTIONS.stream(), |
| ExecNodeMetadataUtil.EXECUTION_CONFIG_OPTIONS.stream())); |
| } |
| } |