Pipe: dynamically adjust connection timeout to handle SocketTimeoutException & Avoid resource cleaning when load task is in process (#12485)

* Pipe: dynamically adjust connection timeout to handle SocketTimeoutException

* Load: dynamically adjust connection timeout to handle SocketTimeoutException & Avoid resource cleaning when load task is in process
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 56ad0b9..02c02b9 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -291,7 +291,7 @@
     AsyncPipeDataTransferServiceClient client = null;
     try {
       client = clientManager.borrowClient();
-      pipeTransferTsFileInsertionEventHandler.transfer(client);
+      pipeTransferTsFileInsertionEventHandler.transfer(clientManager, client);
     } catch (final Exception ex) {
       logOnClientException(client, ex);
       pipeTransferTsFileInsertionEventHandler.onError(ex);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index bcafce0..9e426f4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -23,6 +23,7 @@
 import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
+import org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
@@ -68,6 +69,7 @@
 
   private final AtomicBoolean isSealSignalSent;
 
+  private IoTDBDataNodeAsyncClientManager clientManager;
   private AsyncPipeDataTransferServiceClient client;
 
   public PipeTransferTsFileInsertionEventHandler(
@@ -93,10 +95,15 @@
     isSealSignalSent = new AtomicBoolean(false);
   }
 
-  public void transfer(final AsyncPipeDataTransferServiceClient client)
+  public void transfer(
+      IoTDBDataNodeAsyncClientManager clientManager,
+      final AsyncPipeDataTransferServiceClient client)
       throws TException, IOException {
+    this.clientManager = clientManager;
     this.client = client;
+
     client.setShouldReturnSelf(false);
+    client.setTimeout(clientManager.getConnectionTimeout());
 
     final int readLength = reader.read(readBuffer);
 
@@ -110,7 +117,7 @@
           LOGGER.warn("Failed to close file reader when successfully transferred mod file.", e);
         }
         reader = new RandomAccessFile(tsFile, "r");
-        transfer(client);
+        transfer(clientManager, client);
       } else if (currentFile == tsFile) {
         isSealSignalSent.set(true);
         client.pipeTransfer(
@@ -205,7 +212,7 @@
         }
       }
 
-      transfer(client);
+      transfer(clientManager, client);
     } catch (final Exception e) {
       onError(e);
     }
@@ -221,6 +228,12 @@
         exception);
 
     try {
+      clientManager.adjustTimeoutIfNecessary(exception);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to adjust timeout when failed to transfer file.", e);
+    }
+
+    try {
       if (reader != null) {
         reader.close();
       }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 9919d954..f873cc2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -350,6 +350,7 @@
                         modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length()));
       } catch (final Exception e) {
         clientAndStatus.setRight(false);
+        clientManager.adjustTimeoutIfNecessary(e);
         throw new PipeConnectionException(
             String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()),
             e);
@@ -366,6 +367,7 @@
                         tsFile.getName(), tsFile.length()));
       } catch (final Exception e) {
         clientAndStatus.setRight(false);
+        clientManager.adjustTimeoutIfNecessary(e);
         throw new PipeConnectionException(
             String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()),
             e);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
index 90c6a2f..a24b6c4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -208,7 +208,7 @@
   private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws TTransportException {
     return new IoTDBSyncClient(
         new ThriftClientProperty.Builder()
-            .setConnectionTimeoutMs((int) PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+            .setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
             .setRpcThriftCompressionEnabled(
                 PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
             .build(),
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 99d454d..4cd97d7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -68,7 +68,7 @@
 
   @Override
   public void runMayThrow() throws Throwable {
-    socket.setSoTimeout((int) PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+    socket.setSoTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
     socket.setKeepAlive(true);
 
     LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId, socket);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index ef4ed7f..b568b88 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -54,6 +54,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 
@@ -104,6 +105,13 @@
 
         final CleanupTask cleanupTask = cleanupTaskQueue.peek();
         if (cleanupTask.scheduledTime <= System.currentTimeMillis()) {
+          if (cleanupTask.isLoadTaskRunning) {
+            cleanupTaskQueue.poll();
+            cleanupTask.resetScheduledTime();
+            cleanupTaskQueue.add(cleanupTask);
+            continue;
+          }
+
           cleanupTask.run();
 
           uuid2CleanupTask.remove(cleanupTask.uuid);
@@ -157,17 +165,24 @@
       }
     }
 
-    final TsFileWriterManager writerManager =
-        uuid2WriterManager.computeIfAbsent(
-            uuid, o -> new TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid)));
-    for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
-      if (!tsFileData.isModification()) {
-        ChunkData chunkData = (ChunkData) tsFileData;
-        writerManager.write(
-            new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData);
-      } else {
-        writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData);
+    final Optional<CleanupTask> cleanupTask = Optional.of(uuid2CleanupTask.get(uuid));
+    cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
+    try {
+      final TsFileWriterManager writerManager =
+          uuid2WriterManager.computeIfAbsent(
+              uuid,
+              o -> new TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid)));
+      for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
+        if (!tsFileData.isModification()) {
+          ChunkData chunkData = (ChunkData) tsFileData;
+          writerManager.write(
+              new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData);
+        } else {
+          writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData);
+        }
       }
+    } finally {
+      cleanupTask.ifPresent(CleanupTask::markLoadTaskNotRunning);
     }
   }
 
@@ -176,7 +191,15 @@
     if (!uuid2WriterManager.containsKey(uuid)) {
       return false;
     }
-    uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, progressIndex);
+
+    final Optional<CleanupTask> cleanupTask = Optional.of(uuid2CleanupTask.get(uuid));
+    cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
+    try {
+      uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, progressIndex);
+    } finally {
+      cleanupTask.ifPresent(CleanupTask::markLoadTaskNotRunning);
+    }
+
     clean(uuid);
     return true;
   }
@@ -412,12 +435,30 @@
   private class CleanupTask implements Runnable, Comparable<CleanupTask> {
 
     private final String uuid;
-    private final long scheduledTime;
 
+    private final long delayInMs;
+    private long scheduledTime;
+
+    private volatile boolean isLoadTaskRunning = false;
     private volatile boolean isCanceled = false;
 
     private CleanupTask(String uuid, long delayInMs) {
       this.uuid = uuid;
+      this.delayInMs = delayInMs;
+      resetScheduledTime();
+    }
+
+    public void markLoadTaskRunning() {
+      isLoadTaskRunning = true;
+      resetScheduledTime();
+    }
+
+    public void markLoadTaskNotRunning() {
+      isLoadTaskRunning = false;
+      resetScheduledTime();
+    }
+
+    public void resetScheduledTime() {
       scheduledTime = System.currentTimeMillis() + delayInMs;
     }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 0d5f0bb..83a12e0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -24,7 +24,6 @@
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -51,23 +50,29 @@
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import io.airlift.concurrent.SetThreadName;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
 
 public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
+
   private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileDispatcherImpl.class);
 
+  private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 1 day
+  private static final AtomicInteger CONNECTION_TIMEOUT_IN_MS =
+      new AtomicInteger(IoTDBDescriptor.getInstance().getConfig().getConnectionTimeoutInMS());
+
   private String uuid;
   private final String localhostIpAddr;
   private final int localhostInternalPort;
@@ -76,8 +81,6 @@
   private final ExecutorService executor;
   private final boolean isGeneratedByPipe;
 
-  private static final String NODE_CONNECTION_ERROR = "can't connect to node {}";
-
   public LoadTsFileDispatcherImpl(
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager,
       boolean isGeneratedByPipe) {
@@ -137,78 +140,6 @@
     }
   }
 
-  private boolean isDispatchedToLocal(TEndPoint endPoint) {
-    return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port;
-  }
-
-  private void dispatchRemote(TTsFilePieceReq loadTsFileReq, TEndPoint endPoint)
-      throws FragmentInstanceDispatchException {
-    try (SyncDataNodeInternalServiceClient client =
-        internalServiceClientManager.borrowClient(endPoint)) {
-      TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
-      if (!loadResp.isAccepted()) {
-        LOGGER.warn(loadResp.message);
-        throw new FragmentInstanceDispatchException(loadResp.status);
-      }
-    } catch (ClientManagerException | TException e) {
-      String warning = NODE_CONNECTION_ERROR;
-      LOGGER.warn(warning, endPoint, e);
-      TSStatus status = new TSStatus();
-      status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
-      status.setMessage(warning + endPoint);
-      throw new FragmentInstanceDispatchException(status);
-    }
-  }
-
-  private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint endPoint)
-      throws FragmentInstanceDispatchException {
-    try (SyncDataNodeInternalServiceClient client =
-        internalServiceClientManager.borrowClient(endPoint)) {
-      TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
-      if (!loadResp.isAccepted()) {
-        LOGGER.warn(loadResp.message);
-        throw new FragmentInstanceDispatchException(loadResp.status);
-      }
-    } catch (ClientManagerException | TException e) {
-      LOGGER.warn(NODE_CONNECTION_ERROR, endPoint, e);
-      TSStatus status = new TSStatus();
-      status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
-      status.setMessage(
-          "can't connect to node "
-              + endPoint
-              + ", please reset longer dn_connection_timeout_ms "
-              + "in iotdb-datanode.properties and restart iotdb.");
-      throw new FragmentInstanceDispatchException(status);
-    }
-  }
-
-  private void dispatchLocally(TLoadCommandReq loadCommandReq)
-      throws FragmentInstanceDispatchException {
-    final ProgressIndex progressIndex;
-    if (loadCommandReq.isSetProgressIndex()) {
-      progressIndex =
-          ProgressIndexType.deserializeFrom(ByteBuffer.wrap(loadCommandReq.getProgressIndex()));
-    } else {
-      // fallback to use local generated progress index for compatibility
-      progressIndex = PipeAgent.runtime().getNextProgressIndexForTsFileLoad();
-      LOGGER.info(
-          "Use local generated load progress index {} for uuid {}.",
-          progressIndex,
-          loadCommandReq.uuid);
-    }
-
-    final TSStatus resultStatus =
-        StorageEngine.getInstance()
-            .executeLoadCommand(
-                LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
-                loadCommandReq.uuid,
-                loadCommandReq.isSetIsGeneratedByPipe() && loadCommandReq.isGeneratedByPipe,
-                progressIndex);
-    if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
-      throw new FragmentInstanceDispatchException(resultStatus);
-    }
-  }
-
   public void dispatchLocally(FragmentInstance instance) throws FragmentInstanceDispatchException {
     LOGGER.info("Receive load node from uuid {}.", uuid);
 
@@ -259,6 +190,32 @@
     }
   }
 
+  private void dispatchRemote(TTsFilePieceReq loadTsFileReq, TEndPoint endPoint)
+      throws FragmentInstanceDispatchException {
+    try (SyncDataNodeInternalServiceClient client =
+        internalServiceClientManager.borrowClient(endPoint)) {
+      client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get());
+
+      final TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
+      if (!loadResp.isAccepted()) {
+        LOGGER.warn(loadResp.message);
+        throw new FragmentInstanceDispatchException(loadResp.status);
+      }
+    } catch (Exception e) {
+      adjustTimeoutIfNecessary(e);
+
+      final String exceptionMessage =
+          String.format(
+              "failed to dispatch load command %s to node %s because of exception: %s",
+              loadTsFileReq, endPoint, e);
+      LOGGER.warn(exceptionMessage, e);
+      throw new FragmentInstanceDispatchException(
+          new TSStatus()
+              .setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode())
+              .setMessage(exceptionMessage));
+    }
+  }
+
   public Future<FragInstanceDispatchResult> dispatchCommand(
       TLoadCommandReq loadCommandReq, Set<TRegionReplicaSet> replicaSets) {
     Set<TEndPoint> allEndPoint = new HashSet<>();
@@ -290,6 +247,87 @@
     return immediateFuture(new FragInstanceDispatchResult(true));
   }
 
+  private void dispatchLocally(TLoadCommandReq loadCommandReq)
+      throws FragmentInstanceDispatchException {
+    final ProgressIndex progressIndex;
+    if (loadCommandReq.isSetProgressIndex()) {
+      progressIndex =
+          ProgressIndexType.deserializeFrom(ByteBuffer.wrap(loadCommandReq.getProgressIndex()));
+    } else {
+      // fallback to use local generated progress index for compatibility
+      progressIndex = PipeAgent.runtime().getNextProgressIndexForTsFileLoad();
+      LOGGER.info(
+          "Use local generated load progress index {} for uuid {}.",
+          progressIndex,
+          loadCommandReq.uuid);
+    }
+
+    final TSStatus resultStatus =
+        StorageEngine.getInstance()
+            .executeLoadCommand(
+                LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
+                loadCommandReq.uuid,
+                loadCommandReq.isSetIsGeneratedByPipe() && loadCommandReq.isGeneratedByPipe,
+                progressIndex);
+    if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
+      throw new FragmentInstanceDispatchException(resultStatus);
+    }
+  }
+
+  private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint endPoint)
+      throws FragmentInstanceDispatchException {
+    try (SyncDataNodeInternalServiceClient client =
+        internalServiceClientManager.borrowClient(endPoint)) {
+      client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get());
+
+      final TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
+      if (!loadResp.isAccepted()) {
+        LOGGER.warn(loadResp.message);
+        throw new FragmentInstanceDispatchException(loadResp.status);
+      }
+    } catch (Exception e) {
+      adjustTimeoutIfNecessary(e);
+
+      final String exceptionMessage =
+          String.format(
+              "failed to dispatch load command %s to node %s because of exception: %s",
+              loadCommandReq, endPoint, e);
+      LOGGER.warn(exceptionMessage, e);
+      throw new FragmentInstanceDispatchException(
+          new TSStatus()
+              .setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode())
+              .setMessage(exceptionMessage));
+    }
+  }
+
+  private boolean isDispatchedToLocal(TEndPoint endPoint) {
+    return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port;
+  }
+
+  private static void adjustTimeoutIfNecessary(Throwable e) {
+    do {
+      if (e instanceof SocketTimeoutException) {
+        int newConnectionTimeout;
+        try {
+          newConnectionTimeout =
+              Math.min(
+                  Math.toIntExact(CONNECTION_TIMEOUT_IN_MS.get() * 2L), MAX_CONNECTION_TIMEOUT_MS);
+        } catch (ArithmeticException arithmeticException) {
+          newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS;
+        }
+
+        if (newConnectionTimeout != CONNECTION_TIMEOUT_IN_MS.get()) {
+          CONNECTION_TIMEOUT_IN_MS.set(newConnectionTimeout);
+          LOGGER.info(
+              "Load remote procedure call connection timeout is adjusted to {} ms ({} mins)",
+              newConnectionTimeout,
+              newConnectionTimeout / 60000.0);
+        }
+        return;
+      }
+    } while ((e = e.getCause()) != null);
+  }
+
   @Override
   public void abort() {
     // Do nothing
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 5bcff34..c8c35e0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -41,7 +41,6 @@
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor;
 import org.apache.iotdb.db.exception.DataRegionException;
-import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.exception.LoadReadOnlyException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
@@ -903,7 +902,7 @@
           status.setCode(TSStatusCode.ILLEGAL_PARAMETER.getStatusCode());
           status.setMessage(String.format("Wrong load command %s.", loadCommand));
       }
-    } catch (IOException | LoadFileException e) {
+    } catch (Exception e) {
       LOGGER.error("Execute load command {} error.", loadCommand, e);
       status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
       status.setMessage(e.getMessage());
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4e4e76e..493ea06 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -185,8 +185,8 @@
   private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50; // 50B
   private int pipeExtractorMatcherCacheSize = 1024;
 
-  private long pipeConnectorHandshakeTimeoutMs = 10 * 1000L; // 10 seconds
-  private long pipeConnectorTransferTimeoutMs = 15 * 60 * 1000L; // 15 minutes
+  private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
+  private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
   private int pipeConnectorReadFileBufferSize = 8388608;
   private long pipeConnectorRetryIntervalMs = 1000L;
   // recommend to set this value to 3 * pipeSubtaskExecutorMaxThreadNum *
@@ -634,20 +634,32 @@
     this.pipeExtractorMatcherCacheSize = pipeExtractorMatcherCacheSize;
   }
 
-  public long getPipeConnectorHandshakeTimeoutMs() {
+  public int getPipeConnectorHandshakeTimeoutMs() {
     return pipeConnectorHandshakeTimeoutMs;
   }
 
   public void setPipeConnectorHandshakeTimeoutMs(long pipeConnectorHandshakeTimeoutMs) {
-    this.pipeConnectorHandshakeTimeoutMs = pipeConnectorHandshakeTimeoutMs;
+    try {
+      this.pipeConnectorHandshakeTimeoutMs = Math.toIntExact(pipeConnectorHandshakeTimeoutMs);
+    } catch (ArithmeticException e) {
+      this.pipeConnectorHandshakeTimeoutMs = Integer.MAX_VALUE;
+      logger.warn(
+          "Given pipe connector handshake timeout is too large, set to {} ms.", Integer.MAX_VALUE);
+    }
   }
 
-  public long getPipeConnectorTransferTimeoutMs() {
+  public int getPipeConnectorTransferTimeoutMs() {
     return pipeConnectorTransferTimeoutMs;
   }
 
   public void setPipeConnectorTransferTimeoutMs(long pipeConnectorTransferTimeoutMs) {
-    this.pipeConnectorTransferTimeoutMs = pipeConnectorTransferTimeoutMs;
+    try {
+      this.pipeConnectorTransferTimeoutMs = Math.toIntExact(pipeConnectorTransferTimeoutMs);
+    } catch (ArithmeticException e) {
+      this.pipeConnectorTransferTimeoutMs = Integer.MAX_VALUE;
+      logger.warn(
+          "Given pipe connector transfer timeout is too large, set to {} ms.", Integer.MAX_VALUE);
+    }
   }
 
   public int getPipeConnectorReadFileBufferSize() {
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 7a0391d..dbe4b6a 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -95,11 +95,11 @@
 
   /////////////////////////////// Connector ///////////////////////////////
 
-  public long getPipeConnectorHandshakeTimeoutMs() {
+  public int getPipeConnectorHandshakeTimeoutMs() {
     return COMMON_CONFIG.getPipeConnectorHandshakeTimeoutMs();
   }
 
-  public long getPipeConnectorTransferTimeoutMs() {
+  public int getPipeConnectorTransferTimeoutMs() {
     return COMMON_CONFIG.getPipeConnectorTransferTimeoutMs();
   }
 
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index b645b91..bb1f006 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -20,11 +20,18 @@
 package org.apache.iotdb.commons.pipe.connector.client;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketTimeoutException;
 import java.util.List;
 
 public abstract class IoTDBClientManager {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBClientManager.class);
+
   protected final List<TEndPoint> endPointList;
   protected long currentClientIndex = 0;
 
@@ -34,6 +41,9 @@
   // it is a DataNode receiver. The flag is useless for configNode receiver.
   protected boolean supportModsIfIsDataNodeReceiver = true;
 
+  private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 1 day
+  protected int connectionTimeout = PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs();
+
   protected IoTDBClientManager(List<TEndPoint> endPointList, boolean useLeaderCache) {
     this.endPointList = endPointList;
     this.useLeaderCache = useLeaderCache;
@@ -42,4 +52,31 @@
   public boolean supportModsIfIsDataNodeReceiver() {
     return supportModsIfIsDataNodeReceiver;
   }
+
+  public void adjustTimeoutIfNecessary(Throwable e) {
+    do {
+      if (e instanceof SocketTimeoutException) {
+        int newConnectionTimeout;
+        try {
+          newConnectionTimeout =
+              Math.min(Math.toIntExact(connectionTimeout * 2L), MAX_CONNECTION_TIMEOUT_MS);
+        } catch (ArithmeticException arithmeticException) {
+          newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS;
+        }
+
+        if (newConnectionTimeout != connectionTimeout) {
+          connectionTimeout = newConnectionTimeout;
+          LOGGER.info(
+              "Pipe connection timeout is adjusted to {} ms ({} mins)",
+              connectionTimeout,
+              connectionTimeout / 60000.0);
+        }
+        return;
+      }
+    } while ((e = e.getCause()) != null);
+  }
+
+  public int getConnectionTimeout() {
+    return connectionTimeout;
+  }
 }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index c262313..a29d7bd 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -142,7 +142,7 @@
       clientAndStatus.setLeft(
           new IoTDBSyncClient(
               new ThriftClientProperty.Builder()
-                  .setConnectionTimeoutMs((int) PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+                  .setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
                   .setRpcThriftCompressionEnabled(
                       PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
                   .build(),
@@ -193,7 +193,7 @@
             resp.getStatus());
       } else {
         clientAndStatus.setRight(true);
-        client.setTimeout((int) PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+        client.setTimeout(connectionTimeout);
         LOGGER.info(
             "Handshake success. Target server ip: {}, port: {}",
             client.getIpAddress(),
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
index 1977c32..3034b58 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
@@ -188,7 +188,7 @@
     } else {
       supportModsIfIsDataNodeReceiver = true;
     }
-    socket.setSoTimeout((int) PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
+    socket.setSoTimeout(PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
     LOGGER.info("Handshake success. Socket: {}", socket);
   }