[Fix] lookup join close executor service (#162)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
index 4d42cd6..73141a3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
@@ -42,7 +42,7 @@
@Override
public Connection getOrEstablishConnection() throws ClassNotFoundException, SQLException {
- if (connection != null && !connection.isClosed()) {
+ if (connection != null && !connection.isClosed() && connection.isValid(10000)) {
return connection;
}
try {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java
index 9090783..d935a8d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java
@@ -129,4 +129,11 @@
throw new IOException(e);
}
}
+
+ @Override
+ public void close() throws IOException {
+ if(this.pool != null){
+ this.pool.close();
+ }
+ }
}
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java
index b6c5073..7c8663e 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java
@@ -19,11 +19,12 @@
import org.apache.flink.table.data.RowData;
+import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-public abstract class DorisLookupReader {
+public abstract class DorisLookupReader implements Closeable {
public abstract CompletableFuture<List<RowData>> asyncGet(RowData record) throws IOException;
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
index c9638f5..9b930ff 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
@@ -103,9 +103,12 @@
@Override
public void close() throws IOException {
if (started.compareAndSet(true, false)) {
- actionWatcherExecutorService.shutdown();
+ LOG.info("close executorService");
+ actionWatcherExecutorService.shutdownNow();
workerStated.set(false);
- workerExecutorService.shutdown();
+ workerExecutorService.shutdownNow();
+ this.actionWatcherExecutorService = null;
+ this.workerExecutorService = null;
this.semaphore = null;
}
}
@@ -160,6 +163,7 @@
if (firstGet != null) {
recordList.add(firstGet);
queue.drainTo(recordList, batchSize - 1);
+ LOG.debug("fetch {} records from queue", recordList.size());
Map<String, List<Get>> getsByTable = new HashMap<>();
for (Get get : recordList) {
List<Get> list = getsByTable.computeIfAbsent(get.getRecord().getTableIdentifier(), (s) -> new ArrayList<>());
@@ -181,6 +185,7 @@
}
}
}
+ LOG.info("action watcher stop");
}
@Override
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
index e29bfd1..c015921 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
@@ -153,6 +153,7 @@
for (int retry = 0; retry <= maxRetryTimes; retry++) {
resultRecordMap = new HashMap<>();
try {
+ long start = System.currentTimeMillis();
Connection conn = jdbcConnectionProvider.getOrEstablishConnection();
try (PreparedStatement ps = conn.prepareStatement(sql)) {
int paramIndex = 0;
@@ -174,6 +175,7 @@
}
}
}
+ LOG.debug("query cost {}ms, batch {} records, sql is {}", System.currentTimeMillis()-start, recordList.size(), sql);
return resultRecordMap;
} catch (Exception e) {
LOG.error(String.format("query doris error, retry times = %d", retry), e);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
index e6b2166..da168ba 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
@@ -68,6 +68,8 @@
@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
+ LOG.info("lookup options: threadSize {}, batchSize {}, queueSize {}",
+ lookupOptions.getJdbcReadThreadSize(), lookupOptions.getJdbcReadBatchSize(), lookupOptions.getJdbcReadBatchQueueSize());
this.cache = cacheMaxSize == -1 || cacheExpireMs == -1
? null
: CacheBuilder.newBuilder()
@@ -119,6 +121,9 @@
@Override
public void close() throws Exception {
super.close();
+ if(lookupReader != null){
+ lookupReader.close();
+ }
}
@VisibleForTesting
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
index 93a0059..00a6c77 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
@@ -111,6 +111,9 @@
@Override
public void close() throws Exception {
super.close();
+ if(lookupReader != null){
+ lookupReader.close();
+ }
}
@VisibleForTesting