[FLINK-23599] Remove JobVertex#connectIdInput and its related unit tests

This closes #16686.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index a3dc3b5..ac214cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -500,11 +500,6 @@
         return edge;
     }
 
-    public void connectIdInput(IntermediateDataSetID dataSetId, DistributionPattern distPattern) {
-        JobEdge edge = new JobEdge(dataSetId, this, distPattern);
-        this.inputs.add(edge);
-    }
-
     // --------------------------------------------------------------------------------------------
 
     public boolean isInputVertex() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
index 59f9488..44b4bcc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
@@ -27,7 +27,6 @@
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -223,71 +222,6 @@
         verifyTestGraph(eg, v1, v2, v3, v4, v5);
     }
 
-    @Test
-    public void testAttachViaIds() throws Exception {
-        // construct part one of the execution graph
-        JobVertex v1 = new JobVertex("vertex1");
-        JobVertex v2 = new JobVertex("vertex2");
-        JobVertex v3 = new JobVertex("vertex3");
-
-        v1.setParallelism(5);
-        v2.setParallelism(7);
-        v3.setParallelism(2);
-
-        v1.setInvokableClass(AbstractInvokable.class);
-        v2.setInvokableClass(AbstractInvokable.class);
-        v3.setInvokableClass(AbstractInvokable.class);
-
-        // this creates an intermediate result for v1
-        v2.connectNewDataSetAsInput(
-                v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-
-        // create results for v2 and v3
-        IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
-        IntermediateDataSet v3result1 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
-        IntermediateDataSet v3result2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
-
-        // construct part two of the execution graph
-        JobVertex v4 = new JobVertex("vertex4");
-        JobVertex v5 = new JobVertex("vertex5");
-        v4.setParallelism(11);
-        v5.setParallelism(4);
-
-        v4.setInvokableClass(AbstractInvokable.class);
-        v5.setInvokableClass(AbstractInvokable.class);
-
-        v4.connectIdInput(v2result.getId(), DistributionPattern.ALL_TO_ALL);
-        v4.connectIdInput(v3result1.getId(), DistributionPattern.ALL_TO_ALL);
-        v5.connectNewDataSetAsInput(
-                v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-        v5.connectIdInput(v3result2.getId(), DistributionPattern.ALL_TO_ALL);
-
-        List<JobVertex> ordered = Arrays.asList(v1, v2, v3);
-        List<JobVertex> ordered2 = Arrays.asList(v4, v5);
-
-        ExecutionGraph eg =
-                createDefaultExecutionGraph(
-                        Stream.concat(ordered.stream(), ordered2.stream())
-                                .collect(Collectors.toList()));
-        try {
-            eg.attachJobGraph(ordered);
-        } catch (JobException e) {
-            e.printStackTrace();
-            fail("Job failed with exception: " + e.getMessage());
-        }
-
-        // attach the second part of the graph
-        try {
-            eg.attachJobGraph(ordered2);
-        } catch (JobException e) {
-            e.printStackTrace();
-            fail("Job failed with exception: " + e.getMessage());
-        }
-
-        // verify
-        verifyTestGraph(eg, v1, v2, v3, v4, v5);
-    }
-
     private void verifyTestGraph(
             ExecutionGraph eg,
             JobVertex v1,
@@ -308,41 +242,6 @@
     }
 
     @Test
-    public void testCannotConnectMissingId() throws Exception {
-        // construct part one of the execution graph
-        JobVertex v1 = new JobVertex("vertex1");
-        v1.setParallelism(7);
-        v1.setInvokableClass(AbstractInvokable.class);
-
-        // construct part two of the execution graph
-        JobVertex v2 = new JobVertex("vertex2");
-        v2.setInvokableClass(AbstractInvokable.class);
-        v2.connectIdInput(new IntermediateDataSetID(), DistributionPattern.ALL_TO_ALL);
-
-        List<JobVertex> ordered = Arrays.asList(v1);
-        List<JobVertex> ordered2 = Arrays.asList(v2);
-
-        ExecutionGraph eg =
-                createDefaultExecutionGraph(
-                        Stream.concat(ordered.stream(), ordered2.stream())
-                                .collect(Collectors.toList()));
-        try {
-            eg.attachJobGraph(ordered);
-        } catch (JobException e) {
-            e.printStackTrace();
-            fail("Job failed with exception: " + e.getMessage());
-        }
-
-        // attach the second part of the graph
-        try {
-            eg.attachJobGraph(ordered2);
-            fail("Attached wrong jobgraph");
-        } catch (JobException e) {
-            // expected
-        }
-    }
-
-    @Test
     public void testCannotConnectWrongOrder() throws Exception {
         JobVertex v1 = new JobVertex("vertex1");
         JobVertex v2 = new JobVertex("vertex2");