[Fix] Fix SaveModeHandler not be closed (#5843)
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 195ffbb..862c0a5 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -30,6 +30,7 @@
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -47,6 +48,7 @@
import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {
@@ -116,7 +118,13 @@
if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportSaveMode saveModeSink = (SupportSaveMode) sink;
Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler();
- saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
+ if (saveModeHandler.isPresent()) {
+ try (SaveModeHandler handler = saveModeHandler.get()) {
+ handler.handleSaveMode();
+ } catch (Exception e) {
+ throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+ }
+ }
}
DataStreamSink<Row> dataStreamSink =
stream.getDataStream()
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 310c0e4..6d0055b 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -30,6 +30,7 @@
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -48,6 +49,7 @@
import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {
@@ -117,7 +119,13 @@
if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportSaveMode saveModeSink = (SupportSaveMode) sink;
Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler();
- saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
+ if (saveModeHandler.isPresent()) {
+ try (SaveModeHandler handler = saveModeHandler.get()) {
+ handler.handleSaveMode();
+ } catch (Exception e) {
+ throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+ }
+ }
}
DataStreamSink<Row> dataStreamSink =
stream.getDataStream()
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index cbcac81..48baa7f 100644
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -31,6 +31,7 @@
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -49,6 +50,7 @@
import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
public class SinkExecuteProcessor
extends SparkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {
@@ -138,7 +140,13 @@
if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportSaveMode saveModeSink = (SupportSaveMode) sink;
Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler();
- saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
+ if (saveModeHandler.isPresent()) {
+ try (SaveModeHandler handler = saveModeHandler.get()) {
+ handler.handleSaveMode();
+ } catch (Exception e) {
+ throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+ }
+ }
}
SparkSinkInjector.inject(dataset.write(), sink, datasetTableInfo.getCatalogTable())
.option("checkpointLocation", "/tmp")
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 84d0f0b..fc5eade 100644
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -31,6 +31,7 @@
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -50,6 +51,7 @@
import java.util.stream.Collectors;
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
public class SinkExecuteProcessor
extends SparkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {
@@ -139,7 +141,13 @@
if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportSaveMode saveModeSink = (SupportSaveMode) sink;
Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler();
- saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
+ if (saveModeHandler.isPresent()) {
+ try (SaveModeHandler handler = saveModeHandler.get()) {
+ handler.handleSaveMode();
+ } catch (Exception e) {
+ throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+ }
+ }
}
SparkSinkInjector.inject(dataset.write(), sink, datasetTableInfo.getCatalogTable())
.option("checkpointLocation", "/tmp")
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 7c835e1..0569117 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -40,6 +40,7 @@
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.CollectionConstants;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -86,6 +87,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
import static org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryId;
import static org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getInputIds;
@@ -654,7 +656,13 @@
if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
SupportSaveMode saveModeSink = (SupportSaveMode) sink;
Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler();
- saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
+ if (saveModeHandler.isPresent()) {
+ try (SaveModeHandler handler = saveModeHandler.get()) {
+ handler.handleSaveMode();
+ } catch (Exception e) {
+ throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+ }
+ }
}
}