[FLINK-33000][sql-gateway] ResultFetcherTest should utilize TestExecutorExtension instead of using a ThreadFactory
diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
index c5b8bf3..e95d9fd 100644
--- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
+++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
@@ -31,6 +31,7 @@
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -38,6 +39,7 @@
 import org.apache.commons.collections.iterators.IteratorChain;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.math.BigDecimal;
 import java.sql.Timestamp;
@@ -51,7 +53,8 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -68,8 +71,14 @@
     private static ResolvedSchema schema;
     private static List<RowData> data;
 
-    private final ThreadFactory threadFactory =
-            new ExecutorThreadFactory("Result Fetcher Test Pool", IgnoreExceptionHandler.INSTANCE);
+    @RegisterExtension
+    private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION =
+            new TestExecutorExtension<>(
+                    () ->
+                            Executors.newCachedThreadPool(
+                                    new ExecutorThreadFactory(
+                                            "Result Fetcher Test Pool",
+                                            IgnoreExceptionHandler.INSTANCE)));
 
     @BeforeAll
     static void setUp() {
@@ -243,8 +252,9 @@
 
         AtomicReference<Boolean> payloadHasData = new AtomicReference<>(true);
         for (int i = 0; i < fetchThreadNum; i++) {
-            threadFactory
-                    .newThread(
+            EXECUTOR_EXTENSION
+                    .getExecutor()
+                    .submit(
                             () -> {
                                 ResultSet resultSet =
                                         fetcher.fetchResults(FetchOrientation.FETCH_NEXT, 1);
@@ -253,10 +263,19 @@
                                     payloadHasData.set(false);
                                 }
 
-                                rows.put(Thread.currentThread().getId(), resultSet.getData());
+                                rows.compute(
+                                        Thread.currentThread().getId(),
+                                        (k, v) -> {
+                                            if (v == null) {
+                                                return resultSet.getData();
+                                            } else {
+                                                v.addAll(resultSet.getData());
+                                                return v;
+                                            }
+                                        });
+
                                 latch.countDown();
-                            })
-                    .start();
+                            });
         }
 
         latch.await();
@@ -290,8 +309,9 @@
 
         long testToken = token;
         AtomicReference<Boolean> meetEnd = new AtomicReference<>(false);
-        threadFactory
-                .newThread(
+        EXECUTOR_EXTENSION
+                .getExecutor()
+                .submit(
                         () -> {
                             // Should meet EOS in the end.
                             long nextToken = testToken;
@@ -304,8 +324,7 @@
                                 nextToken = checkNotNull(resultSet.getNextToken());
                             }
                             meetEnd.set(true);
-                        })
-                .start();
+                        });
 
         CommonTestUtils.waitUtil(
                 meetEnd::get,
@@ -436,8 +455,9 @@
 
         List<RowData> firstFetch = fetcher.fetchResults(0, Integer.MAX_VALUE).getData();
         for (int i = 0; i < fetchThreadNum; i++) {
-            threadFactory
-                    .newThread(
+            EXECUTOR_EXTENSION
+                    .getExecutor()
+                    .submit(
                             () -> {
                                 ResultSet resultSet = fetcher.fetchResults(0, Integer.MAX_VALUE);
 
@@ -445,8 +465,7 @@
                                     isEqual.set(false);
                                 }
                                 latch.countDown();
-                            })
-                    .start();
+                            });
         }
 
         latch.await();