[FLINK-15669] [sql-client] Fix SQL client can't cancel flink job
Before, we were storing the "result" under a unique ID. When cancelling,
LocalExecutor.cancelQueryInternal() is using that ID to try and cancel
the running Flink job. This failed, of course. Now we store the result
under the JobID (which was also the behaviour before Flink 1.10) and can
therefore cancel the job.
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 8493bee..3665735 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -79,7 +79,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
 
@@ -645,10 +644,6 @@
 			});
 		}
 
-		// store the result with a unique id
-		final String resultId = UUID.randomUUID().toString();
-		resultStore.storeResult(resultId, result);
-
 		// create a copy so that we can change settings without affecting the original config
 		Configuration configuration = new Configuration(context.getFlinkConfig());
 		// for queries we wait for the job result, so run in attached mode
@@ -660,11 +655,23 @@
 		final ProgramDeployer deployer = new ProgramDeployer(
 				configuration, jobName, pipeline);
 
+		JobClient jobClient;
+		// blocking deployment
+		try {
+			jobClient = deployer.deploy().get();
+		} catch (Exception e) {
+			throw new SqlExecutionException("Error while submitting job.", e);
+		}
+
+		String jobId = jobClient.getJobID().toString();
+		// store the result under the JobID
+		resultStore.storeResult(jobId, result);
+
 		// start result retrieval
-		result.startRetrieval(deployer);
+		result.startRetrieval(jobClient);
 
 		return new ResultDescriptor(
-				resultId,
+				jobId,
 				removeTimeAttributes(table.getSchema()),
 				result.isMaterialized());
 	}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
index 04aef6b..6d5b427 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
@@ -24,6 +24,7 @@
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.experimental.SocketStreamIterator;
@@ -32,7 +33,6 @@
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.client.gateway.local.CollectStreamTableSink;
-import org.apache.flink.table.client.gateway.local.ProgramDeployer;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
 
@@ -86,18 +86,16 @@
 	}
 
 	@Override
-	public void startRetrieval(ProgramDeployer deployer) {
+	public void startRetrieval(JobClient jobClient) {
 		// start listener thread
 		retrievalThread.start();
 
-		jobExecutionResultFuture = deployer
-				.deploy()
-				.thenCompose(jobClient -> jobClient.getJobExecutionResult(classLoader))
+		jobExecutionResultFuture = jobClient.getJobExecutionResult(classLoader)
 				.whenComplete((unused, throwable) -> {
 					if (throwable != null) {
 						executionException.compareAndSet(
 								null,
-								new SqlExecutionException("Error while submitting job.", throwable));
+								new SqlExecutionException("Error while retrieving result.", throwable));
 					}
 				});
 	}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/DynamicResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/DynamicResult.java
index 22e0c8c..d59a663 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/DynamicResult.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/DynamicResult.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.client.gateway.local.result;
 
-import org.apache.flink.table.client.gateway.local.ProgramDeployer;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.sinks.TableSink;
 
 /**
@@ -37,9 +37,9 @@
 	boolean isMaterialized();
 
 	/**
-	 * Starts the table program using the given deployer and monitors it's execution.
+	 * Starts retrieving the result using the given {@link JobClient} and monitors it's execution.
 	 */
-	void startRetrieval(ProgramDeployer deployer);
+	void startRetrieval(JobClient jobClient);
 
 	/**
 	 * Returns the table sink required by this result type.
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
index c2d9fd86..cd60076 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
@@ -22,11 +22,11 @@
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.client.gateway.local.CollectBatchTableSink;
-import org.apache.flink.table.client.gateway.local.ProgramDeployer;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.AbstractID;
@@ -76,16 +76,14 @@
 	}
 
 	@Override
-	public void startRetrieval(ProgramDeployer deployer) {
-		deployer
-				.deploy()
-				.thenCompose(jobClient -> jobClient.getJobExecutionResult(classLoader))
+	public void startRetrieval(JobClient jobClient) {
+		jobClient.getJobExecutionResult(classLoader)
 				.thenAccept(new ResultRetrievalHandler())
 				.whenComplete((unused, throwable) -> {
 					if (throwable != null) {
 						executionException.compareAndSet(null,
 								new SqlExecutionException(
-										"Error while submitting job.",
+										"Error while retrieving result.",
 										throwable));
 					}
 				});
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index c89746e..956503f 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -25,6 +25,7 @@
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
@@ -84,6 +85,7 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -484,6 +486,51 @@
 		}
 	}
 
+	@Test(timeout = 30_000L)
+	public void testStreamQueryCancel() throws Exception {
+		final URL url = getClass().getClassLoader().getResource("test-data.csv");
+		Objects.requireNonNull(url);
+		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_PLANNER", planner);
+		replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+		replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
+		replaceVars.put("$VAR_RESULT_MODE", "changelog");
+		replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+		replaceVars.put("$VAR_MAX_ROWS", "100");
+
+		final LocalExecutor executor = createModifiedExecutor(clusterClient, replaceVars);
+		final SessionContext session = new SessionContext("test-session", new Environment());
+		String sessionId = executor.openSession(session);
+		assertEquals("test-session", sessionId);
+
+		try {
+			final ResultDescriptor desc = executor.executeQuery(sessionId, "SELECT * FROM TestView1");
+			final JobID jobId = JobID.fromHexString(desc.getResultId());
+
+			assertFalse(desc.isMaterialized());
+
+			JobStatus jobStatus1 = getJobStatus(executor, sessionId, jobId);
+
+			assertNotEquals(JobStatus.CANCELED, jobStatus1);
+
+			executor.cancelQuery(sessionId, desc.getResultId());
+
+			JobStatus jobStatus2 = null;
+			// wait up to 30 seconds
+			for (int i = 0; i < 300; ++i) {
+				jobStatus2 = getJobStatus(executor, sessionId, jobId);
+				if (jobStatus2 != JobStatus.CANCELED) {
+					Thread.sleep(100);
+				} else {
+					break;
+				}
+			}
+			assertEquals(JobStatus.CANCELED, jobStatus2);
+		} finally {
+			executor.closeSession(sessionId);
+		}
+	}
+
 	@Test(timeout = 90_000L)
 	public void testStreamQueryExecutionChangelogMultipleTimes() throws Exception {
 		final URL url = getClass().getClassLoader().getResource("test-data.csv");
@@ -650,6 +697,50 @@
 		}
 	}
 
+	@Test(timeout = 30_000L)
+	public void testBatchQueryCancel() throws Exception {
+		final URL url = getClass().getClassLoader().getResource("test-data.csv");
+		Objects.requireNonNull(url);
+		final Map<String, String> replaceVars = new HashMap<>();
+		replaceVars.put("$VAR_PLANNER", planner);
+		replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+		replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
+		replaceVars.put("$VAR_RESULT_MODE", "table");
+		replaceVars.put("$VAR_UPDATE_MODE", "");
+		replaceVars.put("$VAR_MAX_ROWS", "100");
+
+		final LocalExecutor executor = createModifiedExecutor(clusterClient, replaceVars);
+		final SessionContext session = new SessionContext("test-session", new Environment());
+		String sessionId = executor.openSession(session);
+		assertEquals("test-session", sessionId);
+
+		try {
+			final ResultDescriptor desc = executor.executeQuery(sessionId, "SELECT * FROM TestView1");
+			final JobID jobId = JobID.fromHexString(desc.getResultId());
+			assertTrue(desc.isMaterialized());
+
+			JobStatus jobStatus1 = getJobStatus(executor, sessionId, jobId);
+
+			assertNotEquals(JobStatus.CANCELED, jobStatus1);
+
+			executor.cancelQuery(sessionId, desc.getResultId());
+
+			JobStatus jobStatus2 = null;
+			// wait up to 30 seconds
+			for (int i = 0; i < 300; ++i) {
+				jobStatus2 = getJobStatus(executor, sessionId, jobId);
+				if (jobStatus2 != JobStatus.CANCELED) {
+					Thread.sleep(100);
+				} else {
+					break;
+				}
+			}
+			assertEquals(JobStatus.CANCELED, jobStatus2);
+		} finally {
+			executor.closeSession(sessionId);
+		}
+	}
+
 	@Test(timeout = 90_000L)
 	public void testBatchQueryExecutionMultipleTimes() throws Exception {
 		final URL url = getClass().getClassLoader().getResource("test-data.csv");
@@ -1304,4 +1395,21 @@
 		}
 		return actualResults;
 	}
+
+	private JobStatus getJobStatus(LocalExecutor executor, String sessionId, JobID jobId) {
+		final ExecutionContext<?> context = executor.getExecutionContext(sessionId);
+		return getJobStatusInternal(context, jobId);
+	}
+
+	private <T> JobStatus getJobStatusInternal(ExecutionContext<T> context, JobID jobId) {
+		try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
+			// retrieve existing cluster
+			ClusterClient<T> clusterClient = clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
+			return clusterClient.getJobStatus(jobId).get();
+		} catch (SqlExecutionException e) {
+			throw e;
+		} catch (Exception e) {
+			throw new SqlExecutionException("Could not locate a cluster.", e);
+		}
+	}
 }