fix interrupt issue (#140)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 8ba2050..711f765 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -182,19 +182,25 @@
LOG.debug("not loading, skip timer checker");
return;
}
- // TODO: introduce cache for reload instead of throwing exceptions.
- String errorMsg;
- try {
- RespContent content = dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get());
- errorMsg = content.getMessage();
- } catch (Exception e) {
- errorMsg = e.getMessage();
- }
- loadException = new StreamLoadException(errorMsg);
- LOG.error("stream load finished unexpectedly, interrupt worker thread! {}", errorMsg);
- // set the executor thread interrupted in case blocking in write data.
- executorThread.interrupt();
+ // double check to interrupt when loading is true and dorisStreamLoad.getPendingLoadFuture().isDone
+ // fix issue #139
+ if (dorisStreamLoad.getPendingLoadFuture() != null
+ && dorisStreamLoad.getPendingLoadFuture().isDone()) {
+ // TODO: introduce cache for reload instead of throwing exceptions.
+ String errorMsg;
+ try {
+ RespContent content = dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get());
+ errorMsg = content.getMessage();
+ } catch (Exception e) {
+ errorMsg = e.getMessage();
+ }
+
+ loadException = new StreamLoadException(errorMsg);
+ LOG.error("stream load finished unexpectedly, interrupt worker thread! {}", errorMsg);
+ // set the executor thread interrupted in case blocking in write data.
+ executorThread.interrupt();
+ }
}
}