[FLINK-33000][sql-gateway] ResultFetcherTest should utilize TestExecutorExtension instead of using a ThreadFactory
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
index c5b8bf3..e95d9fd 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
@@ -31,6 +31,7 @@
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -38,6 +39,7 @@
import org.apache.commons.collections.iterators.IteratorChain;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import java.math.BigDecimal;
import java.sql.Timestamp;
@@ -51,7 +53,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -68,8 +71,14 @@
private static ResolvedSchema schema;
private static List<RowData> data;
- private final ThreadFactory threadFactory =
- new ExecutorThreadFactory("Result Fetcher Test Pool", IgnoreExceptionHandler.INSTANCE);
+ @RegisterExtension
+ private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION =
+ new TestExecutorExtension<>(
+ () ->
+ Executors.newCachedThreadPool(
+ new ExecutorThreadFactory(
+ "Result Fetcher Test Pool",
+ IgnoreExceptionHandler.INSTANCE)));
@BeforeAll
static void setUp() {
@@ -243,8 +252,9 @@
AtomicReference<Boolean> payloadHasData = new AtomicReference<>(true);
for (int i = 0; i < fetchThreadNum; i++) {
- threadFactory
- .newThread(
+ EXECUTOR_EXTENSION
+ .getExecutor()
+ .submit(
() -> {
ResultSet resultSet =
fetcher.fetchResults(FetchOrientation.FETCH_NEXT, 1);
@@ -253,10 +263,19 @@
payloadHasData.set(false);
}
- rows.put(Thread.currentThread().getId(), resultSet.getData());
+ rows.compute(
+ Thread.currentThread().getId(),
+ (k, v) -> {
+ if (v == null) {
+ return resultSet.getData();
+ } else {
+ v.addAll(resultSet.getData());
+ return v;
+ }
+ });
+
latch.countDown();
- })
- .start();
+ });
}
latch.await();
@@ -290,8 +309,9 @@
long testToken = token;
AtomicReference<Boolean> meetEnd = new AtomicReference<>(false);
- threadFactory
- .newThread(
+ EXECUTOR_EXTENSION
+ .getExecutor()
+ .submit(
() -> {
// Should meet EOS in the end.
long nextToken = testToken;
@@ -304,8 +324,7 @@
nextToken = checkNotNull(resultSet.getNextToken());
}
meetEnd.set(true);
- })
- .start();
+ });
CommonTestUtils.waitUtil(
meetEnd::get,
@@ -436,8 +455,9 @@
List<RowData> firstFetch = fetcher.fetchResults(0, Integer.MAX_VALUE).getData();
for (int i = 0; i < fetchThreadNum; i++) {
- threadFactory
- .newThread(
+ EXECUTOR_EXTENSION
+ .getExecutor()
+ .submit(
() -> {
ResultSet resultSet = fetcher.fetchResults(0, Integer.MAX_VALUE);
@@ -445,8 +465,7 @@
isEqual.set(false);
}
latch.countDown();
- })
- .start();
+ });
}
latch.await();