Fix some compile error test with release-1.13
diff --git a/pom.xml b/pom.xml
index f92f166..c9d2ce0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,7 @@
 
 	<properties>
 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-		<flink.version>1.15-SNAPSHOT</flink.version>
+		<flink.version>1.13.2</flink.version>
 		<flink.shaded.version>14.0</flink.shaded.version>
 		<netty.tcnative.version>2.0.39.Final</netty.tcnative.version>
 		<java.version>1.8</java.version>
diff --git a/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java b/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java
index 3a0af48..d4285cc 100644
--- a/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java
+++ b/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java
@@ -44,7 +44,6 @@
 import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.SplittableIterator;
 
 import org.junit.Assert;
@@ -345,9 +344,6 @@
 
         @Override
         public void setKeyContextElement(StreamRecord<Integer> record) {}
-
-        @Override
-        public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {}
     }
 
     private static class InputGenerator extends SplittableIterator<Integer> {
diff --git a/src/main/java/org/apache/flink/benchmark/StreamGraphUtils.java b/src/main/java/org/apache/flink/benchmark/StreamGraphUtils.java
index 0746193..43d1fba 100644
--- a/src/main/java/org/apache/flink/benchmark/StreamGraphUtils.java
+++ b/src/main/java/org/apache/flink/benchmark/StreamGraphUtils.java
@@ -23,7 +23,7 @@
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
+import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
 /**
@@ -37,7 +37,7 @@
 
 		StreamGraph streamGraph = env.getStreamGraph();
 		streamGraph.setChaining(false);
-		streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
+		streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
 		streamGraph.setJobType(JobType.BATCH);
 
 		return streamGraph;
diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingDownstreamTasksInBatchJobBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingDownstreamTasksInBatchJobBenchmarkExecutor.java
index 89a46f7..4f9df53 100644
--- a/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingDownstreamTasksInBatchJobBenchmarkExecutor.java
+++ b/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingDownstreamTasksInBatchJobBenchmarkExecutor.java
@@ -61,6 +61,6 @@
 
 	@TearDown(Level.Trial)
 	public void teardown() {
-		benchmark.teardown();
+		// benchmark.teardown();
 	}
 }
diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingTasksInStreamingJobBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingTasksInStreamingJobBenchmarkExecutor.java
index bbabaf4..0c05a7d 100644
--- a/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingTasksInStreamingJobBenchmarkExecutor.java
+++ b/src/main/java/org/apache/flink/scheduler/benchmark/deploying/DeployingTasksInStreamingJobBenchmarkExecutor.java
@@ -61,6 +61,6 @@
 
 	@TearDown(Level.Trial)
 	public void teardown() {
-		benchmark.teardown();
+		// benchmark.teardown();
 	}
 }
diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/e2e/CreateSchedulerBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/e2e/CreateSchedulerBenchmarkExecutor.java
index b14a6df..8191d00 100644
--- a/src/main/java/org/apache/flink/scheduler/benchmark/e2e/CreateSchedulerBenchmarkExecutor.java
+++ b/src/main/java/org/apache/flink/scheduler/benchmark/e2e/CreateSchedulerBenchmarkExecutor.java
@@ -60,6 +60,6 @@
 
 	@TearDown(Level.Trial)
 	public void teardown() {
-		benchmark.teardown();
+		// benchmark.teardown();
 	}
 }
diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/e2e/SchedulingAndDeployingBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/e2e/SchedulingAndDeployingBenchmarkExecutor.java
index 153c3d8..04d25ad 100644
--- a/src/main/java/org/apache/flink/scheduler/benchmark/e2e/SchedulingAndDeployingBenchmarkExecutor.java
+++ b/src/main/java/org/apache/flink/scheduler/benchmark/e2e/SchedulingAndDeployingBenchmarkExecutor.java
@@ -59,6 +59,6 @@
 
 	@TearDown(Level.Trial)
 	public void teardown() {
-		benchmark.teardown();
+		// benchmark.teardown();
 	}
 }
diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInBatchJobBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInBatchJobBenchmarkExecutor.java
index c8504b9..bfcca07 100644
--- a/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInBatchJobBenchmarkExecutor.java
+++ b/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInBatchJobBenchmarkExecutor.java
@@ -60,6 +60,6 @@
 
 	@TearDown(Level.Trial)
 	public void teardown() {
-		benchmark.teardown();
+		// benchmark.teardown();
 	}
 }
diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInStreamingJobBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInStreamingJobBenchmarkExecutor.java
index 2b1543e..aa8bdd3 100644
--- a/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInStreamingJobBenchmarkExecutor.java
+++ b/src/main/java/org/apache/flink/scheduler/benchmark/failover/RegionToRestartInStreamingJobBenchmarkExecutor.java
@@ -60,6 +60,6 @@
 
 	@TearDown(Level.Trial)
 	public void teardown() {
-		benchmark.teardown();
+		// benchmark.teardown();
 	}
 }
diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/partitionrelease/PartitionReleaseInBatchJobBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/partitionrelease/PartitionReleaseInBatchJobBenchmarkExecutor.java
index 3657874..d355087 100644
--- a/src/main/java/org/apache/flink/scheduler/benchmark/partitionrelease/PartitionReleaseInBatchJobBenchmarkExecutor.java
+++ b/src/main/java/org/apache/flink/scheduler/benchmark/partitionrelease/PartitionReleaseInBatchJobBenchmarkExecutor.java
@@ -60,6 +60,6 @@
 
 	@TearDown(Level.Trial)
 	public void teardown() {
-		benchmark.teardown();
+		// benchmark.teardown();
 	}
 }
diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/InitSchedulingStrategyBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/InitSchedulingStrategyBenchmarkExecutor.java
index 21c66d8..9c9ce1f 100644
--- a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/InitSchedulingStrategyBenchmarkExecutor.java
+++ b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/InitSchedulingStrategyBenchmarkExecutor.java
@@ -61,6 +61,6 @@
 
 	@TearDown(Level.Trial)
 	public void teardown() {
-		benchmark.teardown();
+		// benchmark.teardown();
 	}
 }
diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java
index 7968ff9..4661666 100644
--- a/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java
+++ b/src/main/java/org/apache/flink/scheduler/benchmark/scheduling/SchedulingDownstreamTasksInBatchJobBenchmarkExecutor.java
@@ -60,6 +60,6 @@
 
 	@TearDown(Level.Trial)
 	public void teardown() {
-		benchmark.teardown();
+		// benchmark.teardown();
 	}
 }
diff --git a/src/main/java/org/apache/flink/scheduler/benchmark/topology/BuildExecutionGraphBenchmarkExecutor.java b/src/main/java/org/apache/flink/scheduler/benchmark/topology/BuildExecutionGraphBenchmarkExecutor.java
index cf97809..104e3b4 100644
--- a/src/main/java/org/apache/flink/scheduler/benchmark/topology/BuildExecutionGraphBenchmarkExecutor.java
+++ b/src/main/java/org/apache/flink/scheduler/benchmark/topology/BuildExecutionGraphBenchmarkExecutor.java
@@ -19,7 +19,6 @@
 package org.apache.flink.scheduler.benchmark.topology;
 
 import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration;
-import org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils;
 import org.apache.flink.runtime.scheduler.benchmark.topology.BuildExecutionGraphBenchmark;
 import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
 
@@ -60,6 +59,6 @@
 
 	@TearDown(Level.Trial)
 	public void teardown() {
-		benchmark.teardown();
+		// benchmark.teardown();
 	}
 }