[FLINK-28861][table] Fix bug in UID format for future migrations and make it configurable

Before this commit, the UID format was not future-proof for migrations. The ExecNode version
should not be in the UID, otherwise, operator migration won't be possible once plan migration
is executed. See the FLIP-190 example that drops a version in the plan, once operator migration
has been performed. Given that the plan feature is marked as @Experimental, this change should
still be possible without providing backwards compatibility.

However, the config option table.exec.uid.format allows for restoring the old format and solves
other UID related issues on the way.

This closes #20555.
diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index d6292ed..d837fd4 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -161,6 +161,12 @@
             <td>Specifies a minimum time interval for how long idle state (i.e. state which was not updated), will be retained. State will never be cleared until it was idle for less than the minimum time, and will be cleared at some time after it was idle. Default is never clean-up the state. NOTE: Cleaning up state requires additional overhead for bookkeeping. Default value is 0, which means that it will never clean up state.</td>
         </tr>
         <tr>
+            <td><h5>table.exec.uid.format</h5><br> <span class="label label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">"&lt;id&gt;_&lt;transformation&gt;"</td>
+            <td>String</td>
+            <td>Defines the format pattern for generating the UID of an ExecNode streaming transformation. The pattern can be defined globally or per-ExecNode in the compiled plan. Supported arguments are: &lt;id&gt; (from static counter), &lt;type&gt; (e.g. 'stream-exec-sink'), &lt;version&gt;, and &lt;transformation&gt; (e.g. 'constraint-validator' for a sink). In Flink 1.15.x the pattern was wrongly defined as '&lt;id&gt;_&lt;type&gt;_&lt;version&gt;_&lt;transformation&gt;' which would prevent migrations in the future.</td>
+        </tr>
+        <tr>
             <td><h5>table.exec.uid.generation</h5><br> <span class="label label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">PLAN_ONLY</td>
             <td><p>Enum</p></td>
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 2f449df..c14546c 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -529,6 +529,19 @@
                                                     + "affecting the stable UIDs.")
                                     .build());
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<String> TABLE_EXEC_UID_FORMAT =
+            key("table.exec.uid.format")
+                    .stringType()
+                    .defaultValue("<id>_<transformation>")
+                    .withDescription(
+                            "Defines the format pattern for generating the UID of an ExecNode streaming transformation. "
+                                    + "The pattern can be defined globally or per-ExecNode in the compiled plan. "
+                                    + "Supported arguments are: <id> (from static counter), <type> (e.g. 'stream-exec-sink'), "
+                                    + "<version>, and <transformation> (e.g. 'constraint-validator' for a sink). "
+                                    + "In Flink 1.15.x the pattern was wrongly defined as '<id>_<type>_<version>_<transformation>' "
+                                    + "which would prevent migrations in the future.");
+
     // ------------------------------------------------------------------------------------------
     // Enum option types
     // ------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
index 0e949aa..2c7a330 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
@@ -207,8 +207,8 @@
         return getClass().getSimpleName().replace("StreamExec", "").replace("BatchExec", "");
     }
 
-    protected String createTransformationUid(String operatorName) {
-        return context.generateUid(operatorName);
+    protected String createTransformationUid(String operatorName, ExecNodeConfig config) {
+        return context.generateUid(operatorName, config);
     }
 
     protected String createTransformationName(ReadableConfig config) {
@@ -226,7 +226,7 @@
                     createTransformationName(config), createTransformationDescription(config));
         } else {
             return new TransformationMetadata(
-                    createTransformationUid(operatorName),
+                    createTransformationUid(operatorName, config),
                     createTransformationName(config),
                     createTransformationDescription(config));
         }
@@ -239,7 +239,8 @@
         if (ExecNodeMetadataUtil.isUnsupported(this.getClass()) || !config.shouldSetUid()) {
             return new TransformationMetadata(name, desc);
         } else {
-            return new TransformationMetadata(createTransformationUid(operatorName), name, desc);
+            return new TransformationMetadata(
+                    createTransformationUid(operatorName, config), name, desc);
         }
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
index 0abb680..cc34369 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
@@ -23,6 +23,7 @@
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
 import org.apache.flink.table.types.logical.LogicalType;
 
@@ -30,6 +31,8 @@
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DatabindContext;
 
+import org.apache.commons.lang3.StringUtils;
+
 import javax.annotation.Nullable;
 
 import java.util.List;
@@ -122,7 +125,7 @@
     }
 
     /** Returns a new {@code uid} for transformations. */
-    public String generateUid(String transformationName) {
+    public String generateUid(String transformationName, ExecNodeConfig config) {
         if (!transformationNamePattern.matcher(transformationName).matches()) {
             throw new TableException(
                     "Invalid transformation name '"
@@ -130,7 +133,20 @@
                             + "'. "
                             + "This is a bug, please file an issue.");
         }
-        return String.format("%s_%s_%s", getId(), getTypeAsString(), transformationName);
+        final String uidPattern = config.get(ExecutionConfigOptions.TABLE_EXEC_UID_FORMAT);
+        // Note: name and version are not included in the UID by default as they would prevent
+        // migration.
+        // No version because: An operator can change its state layout and bump up the ExecNode
+        // version, in this case the UID should still be able to map state even after plan
+        // migration to the new version.
+        // No name because: We might fuse operators in the future, and a new operator might
+        // subscribe to multiple old UIDs.
+        return StringUtils.replaceEach(
+                uidPattern,
+                new String[] {"<id>", "<type>", "<version>", "<transformation>"},
+                new String[] {
+                    String.valueOf(id), name, String.valueOf(version), transformationName
+                });
     }
 
     /**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 5831bd3..71f8184 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -532,7 +532,7 @@
     private ProviderContext createProviderContext(ExecNodeConfig config) {
         return name -> {
             if (this instanceof StreamExecNode && config.shouldSetUid()) {
-                return Optional.of(createTransformationUid(name));
+                return Optional.of(createTransformationUid(name, config));
             }
             return Optional.empty();
         };
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
index baebd1a..a008f88 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
@@ -158,7 +158,7 @@
     private ProviderContext createProviderContext(ExecNodeConfig config) {
         return name -> {
             if (this instanceof StreamExecNode && config.shouldSetUid()) {
-                return Optional.of(createTransformationUid(name));
+                return Optional.of(createTransformationUid(name, config));
             }
             return Optional.empty();
         };
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
index 3472036..70288dd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
@@ -236,7 +236,7 @@
             int leftArity,
             int rightArity,
             InternalTypeInfo<RowData> returnTypeInfo,
-            ReadableConfig config) {
+            ExecNodeConfig config) {
         boolean shouldCreateUid =
                 config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS);
 
@@ -260,7 +260,7 @@
                         returnTypeInfo,
                         leftParallelism);
         if (shouldCreateUid) {
-            filterAllLeftStream.setUid(createTransformationUid(FILTER_LEFT_TRANSFORMATION));
+            filterAllLeftStream.setUid(createTransformationUid(FILTER_LEFT_TRANSFORMATION, config));
         }
         filterAllLeftStream.setDescription(
                 createFormattedTransformationDescription(
@@ -277,7 +277,8 @@
                         returnTypeInfo,
                         rightParallelism);
         if (shouldCreateUid) {
-            filterAllRightStream.setUid(createTransformationUid(FILTER_RIGHT_TRANSFORMATION));
+            filterAllRightStream.setUid(
+                    createTransformationUid(FILTER_RIGHT_TRANSFORMATION, config));
         }
         filterAllRightStream.setDescription(
                 createFormattedTransformationDescription(
@@ -294,7 +295,7 @@
                         returnTypeInfo,
                         leftParallelism);
         if (shouldCreateUid) {
-            padLeftStream.setUid(createTransformationUid(PAD_LEFT_TRANSFORMATION));
+            padLeftStream.setUid(createTransformationUid(PAD_LEFT_TRANSFORMATION, config));
         }
         padLeftStream.setDescription(
                 createFormattedTransformationDescription("pad left input transformation", config));
@@ -310,7 +311,7 @@
                         returnTypeInfo,
                         rightParallelism);
         if (shouldCreateUid) {
-            padRightStream.setUid(createTransformationUid(PAD_RIGHT_TRANSFORMATION));
+            padRightStream.setUid(createTransformationUid(PAD_RIGHT_TRANSFORMATION, config));
         }
         padRightStream.setDescription(
                 createFormattedTransformationDescription("pad right input transformation", config));
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
index 20cf508..4fa88b2 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
@@ -31,19 +31,26 @@
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.CompiledPlanUtils;
+import org.apache.flink.table.planner.utils.JsonTestUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.parallel.Execution;
 import org.junit.jupiter.api.parallel.ExecutionMode;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_UID_FORMAT;
 import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION;
 import static org.apache.flink.table.api.config.ExecutionConfigOptions.UidGeneration.ALWAYS;
 import static org.apache.flink.table.api.config.ExecutionConfigOptions.UidGeneration.DISABLED;
@@ -192,10 +199,69 @@
         }
     }
 
+    @Test
+    public void testUidDefaults() throws IOException {
+        checkUidModification(
+                config -> {}, json -> {}, "\\d+_sink", "\\d+_constraint-validator", "\\d+_values");
+    }
+
+    @Test
+    public void testUidFlink1_15() throws IOException {
+        checkUidModification(
+                config ->
+                        config.set(TABLE_EXEC_UID_FORMAT, "<id>_<type>_<version>_<transformation>"),
+                json -> {},
+                "\\d+_stream-exec-sink_1_sink",
+                "\\d+_stream-exec-sink_1_constraint-validator",
+                "\\d+_stream-exec-values_1_values");
+    }
+
+    @Test
+    public void testPerNodeCustomUid() throws IOException {
+        checkUidModification(
+                config -> {},
+                json ->
+                        JsonTestUtils.setExecNodeConfig(
+                                json,
+                                "stream-exec-sink_1",
+                                TABLE_EXEC_UID_FORMAT.key(),
+                                "my_custom_<transformation>_<id>"),
+                "my_custom_sink_\\d+",
+                "my_custom_constraint-validator_\\d+",
+                "\\d+_values");
+    }
+
+    private static void checkUidModification(
+            Consumer<TableConfig> configModifier,
+            Consumer<JsonNode> jsonModifier,
+            String... expectedUidPatterns)
+            throws IOException {
+        final TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        configModifier.accept(env.getConfig());
+        final String plan = minimalPlan(env).asJsonString();
+        final JsonNode json = JsonTestUtils.readFromString(plan);
+        jsonModifier.accept(json);
+        final List<String> planUids =
+                CompiledPlanUtils.toTransformations(
+                                env, env.loadPlan(PlanReference.fromJsonString(json.toString())))
+                        .get(0).getTransitivePredecessors().stream()
+                        .map(Transformation::getUid)
+                        .collect(Collectors.toList());
+        assertThat(planUids).hasSize(expectedUidPatterns.length);
+        IntStream.range(0, expectedUidPatterns.length)
+                .forEach(i -> assertThat(planUids.get(i)).matches(expectedUidPatterns[i]));
+    }
+
     // --------------------------------------------------------------------------------------------
     // Helper methods
     // --------------------------------------------------------------------------------------------
 
+    private static CompiledPlan minimalPlan(TableEnvironment env) {
+        return env.fromValues(1, 2, 3)
+                .insertInto(TableDescriptor.forConnector("blackhole").build())
+                .compilePlan();
+    }
+
     private static LegacySourceTransformation<?> toLegacySourceTransformation(
             StreamTableEnvironment env, Table table) {
         Transformation<?> transform = env.toChangelogStream(table).getTransformation();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
index 6aa9f96..c4595a5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/JsonTestUtils.java
@@ -39,8 +39,28 @@
         return OBJECT_MAPPER_INSTANCE.readTree(JsonTestUtils.class.getResource(path));
     }
 
+    public static JsonNode readFromString(String path) throws IOException {
+        return OBJECT_MAPPER_INSTANCE.readTree(path);
+    }
+
     public static JsonNode setFlinkVersion(JsonNode target, FlinkVersion flinkVersion) {
         return ((ObjectNode) target)
                 .set("flinkVersion", OBJECT_MAPPER_INSTANCE.valueToTree(flinkVersion.toString()));
     }
+
+    public static JsonNode setExecNodeConfig(
+            JsonNode target, String type, String key, String value) {
+        target.get("nodes")
+                .elements()
+                .forEachRemaining(
+                        n -> {
+                            if (n.get("type").asText().equals(type)) {
+                                final ObjectNode configNode =
+                                        OBJECT_MAPPER_INSTANCE.createObjectNode();
+                                configNode.put(key, value);
+                                ((ObjectNode) n).set("configuration", configNode);
+                            }
+                        });
+        return target;
+    }
 }