[Fix] Fix SaveModeHandler not be closed (#5843)

diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 195ffbb..862c0a5 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -30,6 +30,7 @@
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -47,6 +48,7 @@
 import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
 
 public class SinkExecuteProcessor
         extends FlinkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {
@@ -116,7 +118,13 @@
             if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
                 SupportSaveMode saveModeSink = (SupportSaveMode) sink;
                 Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler();
-                saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
+                if (saveModeHandler.isPresent()) {
+                    try (SaveModeHandler handler = saveModeHandler.get()) {
+                        handler.handleSaveMode();
+                    } catch (Exception e) {
+                        throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+                    }
+                }
             }
             DataStreamSink<Row> dataStreamSink =
                     stream.getDataStream()
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 310c0e4..6d0055b 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -30,6 +30,7 @@
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -48,6 +49,7 @@
 import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
 
 public class SinkExecuteProcessor
         extends FlinkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {
@@ -117,7 +119,13 @@
             if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
                 SupportSaveMode saveModeSink = (SupportSaveMode) sink;
                 Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler();
-                saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
+                if (saveModeHandler.isPresent()) {
+                    try (SaveModeHandler handler = saveModeHandler.get()) {
+                        handler.handleSaveMode();
+                    } catch (Exception e) {
+                        throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+                    }
+                }
             }
             DataStreamSink<Row> dataStreamSink =
                     stream.getDataStream()
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index cbcac81..48baa7f 100644
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -31,6 +31,7 @@
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -49,6 +50,7 @@
 import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
 
 public class SinkExecuteProcessor
         extends SparkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {
@@ -138,7 +140,13 @@
             if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
                 SupportSaveMode saveModeSink = (SupportSaveMode) sink;
                 Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler();
-                saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
+                if (saveModeHandler.isPresent()) {
+                    try (SaveModeHandler handler = saveModeHandler.get()) {
+                        handler.handleSaveMode();
+                    } catch (Exception e) {
+                        throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+                    }
+                }
             }
             SparkSinkInjector.inject(dataset.write(), sink, datasetTableInfo.getCatalogTable())
                     .option("checkpointLocation", "/tmp")
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 84d0f0b..fc5eade 100644
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -31,6 +31,7 @@
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -50,6 +51,7 @@
 import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
+import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
 
 public class SinkExecuteProcessor
         extends SparkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {
@@ -139,7 +141,13 @@
             if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
                 SupportSaveMode saveModeSink = (SupportSaveMode) sink;
                 Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler();
-                saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
+                if (saveModeHandler.isPresent()) {
+                    try (SaveModeHandler handler = saveModeHandler.get()) {
+                        handler.handleSaveMode();
+                    } catch (Exception e) {
+                        throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+                    }
+                }
             }
             SparkSinkInjector.inject(dataset.write(), sink, datasetTableInfo.getCatalogTable())
                     .option("checkpointLocation", "/tmp")
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 7c835e1..0569117 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -40,6 +40,7 @@
 import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.CollectionConstants;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
 import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -86,6 +87,7 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
 import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
 import static org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getFactoryId;
 import static org.apache.seatunnel.engine.core.parse.ConfigParserUtil.getInputIds;
@@ -654,7 +656,13 @@
         if (SupportSaveMode.class.isAssignableFrom(sink.getClass())) {
             SupportSaveMode saveModeSink = (SupportSaveMode) sink;
             Optional<SaveModeHandler> saveModeHandler = saveModeSink.getSaveModeHandler();
-            saveModeHandler.ifPresent(SaveModeHandler::handleSaveMode);
+            if (saveModeHandler.isPresent()) {
+                try (SaveModeHandler handler = saveModeHandler.get()) {
+                    handler.handleSaveMode();
+                } catch (Exception e) {
+                    throw new SeaTunnelRuntimeException(HANDLE_SAVE_MODE_FAILED, e);
+                }
+            }
         }
     }