[FLINK-25076][table-planner] Improve vertex name for sql job
This closes #18042
diff --git a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index 0cd9ff2..1d2b4f5 100644
--- a/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ b/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -69,6 +69,7 @@
import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.planner.utils.TableTestUtil.readFromResource;
+import static org.apache.flink.table.planner.utils.TableTestUtil.replaceNodeIdInOperator;
import static org.apache.flink.table.planner.utils.TableTestUtil.replaceStageId;
import static org.apache.flink.table.planner.utils.TableTestUtil.replaceStreamNodeId;
import static org.junit.Assert.assertEquals;
@@ -131,8 +132,8 @@
final String expected = readFromResource(expectedResourceFileName);
assertEquals(
- replaceStreamNodeId(replaceStageId(expected)),
- replaceStreamNodeId(replaceStageId(actual)));
+ replaceNodeIdInOperator(replaceStreamNodeId(replaceStageId(expected))),
+ replaceNodeIdInOperator(replaceStreamNodeId(replaceStageId(actual))));
tEnv.executeSql("drop database db1 cascade");
}
diff --git a/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInBatch.out b/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInBatch.out
index 1dac561..5858300 100644
--- a/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInBatch.out
+++ b/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInBatch.out
@@ -16,43 +16,43 @@
== Physical Execution Plan ==
{
"nodes" : [ {
- "id" : 1,
- "type" : "Source: Values(tuples=[[{ 0 }]], values=[ZERO])",
+ "id" : ,
+ "type" : "Source: Values[]",
"pact" : "Data Source",
- "contents" : "Source: Values(tuples=[[{ 0 }]], values=[ZERO])",
+ "contents" : "[]:Values(tuples=[[{ 0 }]], values=[ZERO])",
"parallelism" : 1
}, {
- "id" : 2,
- "type" : "Calc(select=[1 AS EXPR$0, 1 AS EXPR$1])",
+ "id" : ,
+ "type" : "Calc[]",
"pact" : "Operator",
- "contents" : "Calc(select=[1 AS EXPR$0, 1 AS EXPR$1])",
+ "contents" : "[]:Calc(select=[1 AS EXPR$0, 1 AS EXPR$1])",
"parallelism" : 1,
"predecessors" : [ {
- "id" : 1,
+ "id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
- "id" : 3,
+ "id" : ,
"type" : "Map",
"pact" : "Operator",
"contents" : "Map",
"parallelism" : 1,
"predecessors" : [ {
- "id" : 2,
+ "id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
- "id" : 4,
+ "id" : ,
"type" : "Sink: Unnamed",
"pact" : "Data Sink",
"contents" : "Sink: Unnamed",
"parallelism" : 8,
"predecessors" : [ {
- "id" : 3,
+ "id" : ,
"ship_strategy" : "REBALANCE",
"side" : "second"
} ]
} ]
-}
+}
\ No newline at end of file
diff --git a/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInStreaming.out b/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInStreaming.out
index 7e42154..e072969 100644
--- a/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInStreaming.out
+++ b/flink-connector-hive/src/test/resources/explain/testHiveTableSinkWithParallelismInStreaming.out
@@ -16,43 +16,43 @@
== Physical Execution Plan ==
{
"nodes" : [ {
- "id" : 1,
- "type" : "Source: Values(tuples=[[{ 0 }]])",
+ "id" : ,
+ "type" : "Source: Values[]",
"pact" : "Data Source",
- "contents" : "Source: Values(tuples=[[{ 0 }]])",
+ "contents" : "[]:Values(tuples=[[{ 0 }]])",
"parallelism" : 1
}, {
- "id" : 2,
- "type" : "Calc(select=[1 AS EXPR$0, 1 AS EXPR$1])",
+ "id" : ,
+ "type" : "Calc[]",
"pact" : "Operator",
- "contents" : "Calc(select=[1 AS EXPR$0, 1 AS EXPR$1])",
+ "contents" : "[]:Calc(select=[1 AS EXPR$0, 1 AS EXPR$1])",
"parallelism" : 1,
"predecessors" : [ {
- "id" : 1,
+ "id" : ,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
- "id" : 3,
+ "id" : ,
"type" : "StreamingFileWriter",
"pact" : "Operator",
"contents" : "StreamingFileWriter",
"parallelism" : 8,
"predecessors" : [ {
- "id" : 2,
+ "id" : ,
"ship_strategy" : "REBALANCE",
"side" : "second"
} ]
}, {
- "id" : 4,
+ "id" : ,
"type" : "Sink: end",
"pact" : "Data Sink",
"contents" : "Sink: end",
"parallelism" : 1,
"predecessors" : [ {
- "id" : 3,
+ "id" : ,
"ship_strategy" : "REBALANCE",
"side" : "second"
} ]
} ]
-}
+}
\ No newline at end of file