[FLINK-31351][sql-gateway] Don't stop the stuck thread by force
This closes #22127
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index 8b239ab..37a9363 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -30,7 +30,11 @@
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.result.NotReadyResult;
import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlCancelException;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +48,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -183,13 +188,14 @@
/** Closes the {@link OperationManager} and all operations. */
public void close() {
stateLock.writeLock().lock();
+ Exception closeException = null;
try {
isRunning = false;
- for (Operation operation : submittedOperations.values()) {
- operation.close();
- }
- submittedOperations.clear();
+ IOUtils.closeAll(submittedOperations.values(), Throwable.class);
+ } catch (Exception e) {
+ closeException = e;
} finally {
+ submittedOperations.clear();
stateLock.writeLock().unlock();
}
// wait all operations closed
@@ -201,13 +207,19 @@
operationLock.release();
}
LOG.debug("Closes the Operation Manager.");
+ if (closeException != null) {
+ throw new SqlExecutionException(
+ "Failed to close the OperationManager.", closeException);
+ }
}
// -------------------------------------------------------------------------------------------
/** Operation to manage the execution, results and so on. */
@VisibleForTesting
- public class Operation {
+ public class Operation implements AutoCloseable {
+
+ private static final long WAIT_CLEAN_UP_MILLISECONDS = 5_000;
private final OperationHandle operationHandle;
@@ -387,7 +399,7 @@
private void closeResources() {
if (invocation != null && !invocation.isDone()) {
invocation.cancel(true);
- stopExecutionByForce(invocation);
+ waitTaskCleanup(invocation);
LOG.debug(String.format("Cancel the operation %s.", operationHandle));
}
@@ -405,32 +417,23 @@
updateState(OperationStatus.ERROR);
}
- private void stopExecutionByForce(FutureTask<?> invocation) {
+ private void waitTaskCleanup(FutureTask<?> invocation) {
// thread is cleaned async, waiting for a while
- Deadline deadline = Deadline.fromNow(Duration.ofSeconds(1));
+ Deadline deadline = Deadline.fromNow(Duration.ofMillis(WAIT_CLEAN_UP_MILLISECONDS));
while (deadline.hasTimeLeft()) {
Optional<Thread> threadOptional = getThreadInFuture(invocation);
if (!threadOptional.isPresent()) {
// thread has been cleaned up
return;
}
+ // try to release the use of the processor to let the task finish its cleanup.
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
}
Optional<Thread> threadOptional = getThreadInFuture(invocation);
- if (threadOptional.isPresent()) {
- // we have to use Thread.stop() here, because this can
- // guarantee thread to be stopped, even there is some
- // potential consistent problem, we are fine with it.
- Thread thread = threadOptional.get();
- LOG.info(
- "\"Future.cancel(true)\" can't cleanup current thread {}, using \"Thread.stop()\" instead.",
- thread.getName());
- try {
- thread.stop();
- } catch (Throwable e) {
- // catch all errors to project the sqlserver
- LOG.error("Failed to stop thread: " + thread.getName(), e);
- }
- }
+ // Currently, SQL Gateway still doesn't have health reporter to notify the users the
+ // resource leak or HA to restart the running process. So we just dump the thread and
+ // throw an exception to notify the users.
+ threadOptional.ifPresent(this::throwExceptionWithThreadStackTrace);
}
private Optional<Thread> getThreadInFuture(FutureTask<?> invocation) {
@@ -445,6 +448,26 @@
return Optional.empty();
}
}
+
+ private void throwExceptionWithThreadStackTrace(Thread thread) {
+ StackTraceElement[] stack = thread.getStackTrace();
+ StringBuilder stackTraceStr = new StringBuilder();
+ for (StackTraceElement e : stack) {
+ stackTraceStr.append("\tat ").append(e).append("\n");
+ }
+
+ String msg =
+ String.format(
+ "Operation '%s' did not react to \"Future.cancel(true)\" and "
+ + "is stuck for %s seconds in method.\n"
+ + "Thread name: %s, thread state: %s, thread stacktrace:\n%s",
+ operationHandle,
+ WAIT_CLEAN_UP_MILLISECONDS / 1000,
+ thread.getName(),
+ thread.getState(),
+ stackTraceStr);
+ throw new SqlCancelException(msg);
+ }
}
// -------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/SqlCancelException.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/SqlCancelException.java
new file mode 100644
index 0000000..2e92a90
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/SqlCancelException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.utils;
+
+/** Thrown to trigger a canceling of the executing operation. */
+public class SqlCancelException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public SqlCancelException(String msg) {
+ super(msg);
+ }
+}
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
index e4d95f1..6ccdd0b 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/operation/OperationManagerTest.java
@@ -32,11 +32,13 @@
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.api.utils.ThreadUtils;
import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
+import org.apache.flink.table.gateway.service.utils.SqlCancelException;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
@@ -52,7 +54,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link OperationManager}. */
-public class OperationManagerTest {
+class OperationManagerTest {
private static final ExecutorService EXECUTOR_SERVICE =
ThreadUtils.newThreadPool(5, 500, 60_0000, "operation-manager-test");
@@ -64,8 +66,8 @@
new ExecutorThreadFactory(
"SqlGatewayService Test Pool", IgnoreExceptionHandler.INSTANCE);
- @BeforeAll
- public static void setUp() {
+ @BeforeEach
+ void setUp() {
operationManager = new OperationManager(EXECUTOR_SERVICE);
defaultResultSet =
new ResultSetImpl(
@@ -79,14 +81,18 @@
ResultKind.SUCCESS_WITH_CONTENT);
}
- @AfterAll
- public static void cleanUp() {
- EXECUTOR_SERVICE.shutdown();
+ @AfterEach
+ void cleanEach() {
operationManager.close();
}
+ @AfterAll
+ static void cleanUp() {
+ EXECUTOR_SERVICE.shutdown();
+ }
+
@Test
- public void testRunOperationAsynchronously() throws Exception {
+ void testRunOperationAsynchronously() throws Exception {
OperationHandle operationHandle = operationManager.submitOperation(() -> defaultResultSet);
assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
@@ -100,7 +106,7 @@
}
@Test
- public void testRunOperationSynchronously() throws Exception {
+ void testRunOperationSynchronously() throws Exception {
OperationHandle operationHandle = operationManager.submitOperation(() -> defaultResultSet);
operationManager.awaitOperationTermination(operationHandle);
@@ -112,7 +118,7 @@
}
@Test
- public void testCancelOperation() throws Exception {
+ void testCancelOperation() throws Exception {
CountDownLatch endRunningLatch = new CountDownLatch(1);
OperationHandle operationHandle =
operationManager.submitOperation(
@@ -129,33 +135,58 @@
}
@Test
- public void testCancelOperationByForce() throws Exception {
- AtomicReference<Throwable> exception = new AtomicReference<>(null);
+ void testCancelUninterruptedOperation() throws Exception {
+ AtomicReference<Boolean> isRunning = new AtomicReference<>(false);
OperationHandle operationHandle =
operationManager.submitOperation(
() -> {
- try {
- // mock cpu busy task that doesn't interrupt system call
- while (true) {}
- } catch (Throwable t) {
- exception.set(t);
- throw t;
+ // mock cpu busy task that doesn't interrupt system call
+ while (true) {
+ isRunning.compareAndSet(false, true);
}
});
-
- threadFactory.newThread(() -> operationManager.cancelOperation(operationHandle)).start();
- operationManager.awaitOperationTermination(operationHandle);
+ CommonTestUtils.waitUtil(
+ isRunning::get, Duration.ofSeconds(10), "Failed to start up the task.");
+ assertThatThrownBy(() -> operationManager.cancelOperation(operationHandle))
+ .satisfies(
+ FlinkAssertions.anyCauseMatches(
+ SqlCancelException.class,
+ String.format(
+ "Operation '%s' did not react to \"Future.cancel(true)\" and "
+ + "is stuck for %s seconds in method.\n",
+ operationHandle, 5)));
assertThat(operationManager.getOperationInfo(operationHandle).getStatus())
.isEqualTo(OperationStatus.CANCELED);
- CommonTestUtils.waitUtil(
- () -> exception.get() != null,
- Duration.ofSeconds(10),
- "Failed to kill the task with infinite loop.");
}
@Test
- public void testCloseOperation() throws Exception {
+ void testCloseUninterruptedOperation() throws Exception {
+ AtomicReference<Boolean> isRunning = new AtomicReference<>(false);
+ for (int i = 0; i < 10; i++) {
+ threadFactory
+ .newThread(
+ () -> {
+ operationManager.submitOperation(
+ () -> {
+ // mock cpu busy task that doesn't interrupt system call
+ while (true) {
+ isRunning.compareAndSet(false, true);
+ }
+ });
+ })
+ .start();
+ }
+ CommonTestUtils.waitUtil(
+ isRunning::get, Duration.ofSeconds(10), "Failed to start up the task.");
+
+ assertThatThrownBy(() -> operationManager.close())
+ .satisfies(FlinkAssertions.anyCauseMatches(SqlCancelException.class));
+ assertThat(operationManager.getOperationCount()).isEqualTo(0);
+ }
+
+ @Test
+ void testCloseOperation() throws Exception {
CountDownLatch endRunningLatch = new CountDownLatch(1);
OperationHandle operationHandle =
operationManager.submitOperation(
@@ -177,7 +208,7 @@
}
@Test
- public void testRunOperationSynchronouslyWithError() {
+ void testRunOperationSynchronouslyWithError() {
OperationHandle operationHandle =
operationManager.submitOperation(
() -> {