[Fix] lookup join close executor service (#162)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
index 4d42cd6..73141a3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
@@ -42,7 +42,7 @@
 
     @Override
     public Connection getOrEstablishConnection() throws ClassNotFoundException, SQLException {
-        if (connection != null && !connection.isClosed()) {
+        if (connection != null && !connection.isClosed() && connection.isValid(10000)) {
             return connection;
         }
         try {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java
index 9090783..d935a8d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java
@@ -129,4 +129,11 @@
             throw new IOException(e);
         }
     }
+
+    @Override
+    public void close() throws IOException {
+        if(this.pool != null){
+            this.pool.close();
+        }
+    }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java
index b6c5073..7c8663e 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java
@@ -19,11 +19,12 @@
 
 import org.apache.flink.table.data.RowData;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-public abstract class DorisLookupReader {
+public abstract class DorisLookupReader implements Closeable {
 
     public abstract CompletableFuture<List<RowData>> asyncGet(RowData record) throws IOException;
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
index c9638f5..9b930ff 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
@@ -103,9 +103,12 @@
     @Override
     public void close() throws IOException {
         if (started.compareAndSet(true, false)) {
-            actionWatcherExecutorService.shutdown();
+            LOG.info("close executorService");
+            actionWatcherExecutorService.shutdownNow();
             workerStated.set(false);
-            workerExecutorService.shutdown();
+            workerExecutorService.shutdownNow();
+            this.actionWatcherExecutorService = null;
+            this.workerExecutorService = null;
             this.semaphore = null;
         }
     }
@@ -160,6 +163,7 @@
                     if (firstGet != null) {
                         recordList.add(firstGet);
                         queue.drainTo(recordList, batchSize - 1);
+                        LOG.debug("fetch {} records from queue", recordList.size());
                         Map<String, List<Get>> getsByTable = new HashMap<>();
                         for (Get get : recordList) {
                             List<Get> list = getsByTable.computeIfAbsent(get.getRecord().getTableIdentifier(), (s) -> new ArrayList<>());
@@ -181,6 +185,7 @@
                     }
                 }
             }
+            LOG.info("action watcher stop");
         }
 
         @Override
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
index e29bfd1..c015921 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
@@ -153,6 +153,7 @@
         for (int retry = 0; retry <= maxRetryTimes; retry++) {
             resultRecordMap = new HashMap<>();
             try {
+                long start = System.currentTimeMillis();
                 Connection conn = jdbcConnectionProvider.getOrEstablishConnection();
                 try (PreparedStatement ps = conn.prepareStatement(sql)) {
                     int paramIndex = 0;
@@ -174,6 +175,7 @@
                         }
                     }
                 }
+                LOG.debug("query cost {}ms, batch {} records, sql is {}", System.currentTimeMillis()-start, recordList.size(), sql);
                 return resultRecordMap;
             } catch (Exception e) {
                 LOG.error(String.format("query doris error, retry times = %d", retry), e);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
index e6b2166..da168ba 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
@@ -68,6 +68,8 @@
     @Override
     public void open(FunctionContext context) throws Exception {
         super.open(context);
+        LOG.info("lookup options: threadSize {}, batchSize {}, queueSize {}",
+                lookupOptions.getJdbcReadThreadSize(), lookupOptions.getJdbcReadBatchSize(), lookupOptions.getJdbcReadBatchQueueSize());
         this.cache = cacheMaxSize == -1 || cacheExpireMs == -1
                 ? null
                 : CacheBuilder.newBuilder()
@@ -119,6 +121,9 @@
     @Override
     public void close() throws Exception {
         super.close();
+        if(lookupReader != null){
+            lookupReader.close();
+        }
     }
 
     @VisibleForTesting
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
index 93a0059..00a6c77 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
@@ -111,6 +111,9 @@
     @Override
     public void close() throws Exception {
         super.close();
+        if(lookupReader != null){
+            lookupReader.close();
+        }
     }
 
     @VisibleForTesting