Pipe: Globally adjust timeout when syncing huge tsfiles & Speed up file transfer after sync task failed (#12491)

diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 02c02b9..1e01b82 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -161,6 +161,11 @@
       return;
     }
 
+    transferWithoutCheck(tabletInsertionEvent);
+  }
+
+  private void transferWithoutCheck(final TabletInsertionEvent tabletInsertionEvent)
+      throws Exception {
     if (isTabletBatchModeEnabled) {
       if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
         final PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler =
@@ -265,6 +270,11 @@
       return;
     }
 
+    transferWithoutCheck(tsFileInsertionEvent);
+  }
+
+  private void transferWithoutCheck(final TsFileInsertionEvent tsFileInsertionEvent)
+      throws Exception {
     final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
         (PipeTsFileInsertionEvent) tsFileInsertionEvent;
     // We increase the reference count for this event to determine if the event may be released.
@@ -275,15 +285,23 @@
       return;
     }
 
-    // Just in case. To avoid the case that exception occurred when constructing the handler.
-    if (!pipeTsFileInsertionEvent.getTsFile().exists()) {
-      throw new FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
+    // We assume that no exceptions will be thrown after reference count is increased.
+    try {
+      // Just in case. To avoid the case that exception occurred when constructing the handler.
+      if (!pipeTsFileInsertionEvent.getTsFile().exists()) {
+        throw new FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
+      }
+
+      final PipeTransferTsFileInsertionEventHandler pipeTransferTsFileInsertionEventHandler =
+          new PipeTransferTsFileInsertionEventHandler(pipeTsFileInsertionEvent, this);
+
+      transfer(pipeTransferTsFileInsertionEventHandler);
+    } catch (Exception e) {
+      // Just in case. To avoid the case that exception occurred when constructing the handler.
+      pipeTsFileInsertionEvent.decreaseReferenceCount(
+          IoTDBDataRegionAsyncConnector.class.getName(), false);
+      throw e;
     }
-
-    final PipeTransferTsFileInsertionEventHandler pipeTransferTsFileInsertionEventHandler =
-        new PipeTransferTsFileInsertionEventHandler(pipeTsFileInsertionEvent, this);
-
-    transfer(pipeTransferTsFileInsertionEventHandler);
   }
 
   private void transfer(
@@ -350,7 +368,8 @@
       } else if (peekedEvent instanceof PipeRawTabletInsertionEvent) {
         retryConnector.transfer((PipeRawTabletInsertionEvent) peekedEvent);
       } else if (peekedEvent instanceof PipeTsFileInsertionEvent) {
-        retryConnector.transfer((PipeTsFileInsertionEvent) peekedEvent);
+        // Using the async connector to transfer the event for performance.
+        transferWithoutCheck((PipeTsFileInsertionEvent) peekedEvent);
       } else {
         LOGGER.warn(
             "IoTDBThriftAsyncConnector does not support transfer generic event: {}.", peekedEvent);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 9e426f4..213d980 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -103,7 +103,7 @@
     this.client = client;
 
     client.setShouldReturnSelf(false);
-    client.setTimeout(clientManager.getConnectionTimeout());
+    client.setTimeoutDynamically(clientManager.getConnectionTimeout());
 
     final int readLength = reader.read(readBuffer);
 
@@ -228,7 +228,9 @@
         exception);
 
     try {
-      clientManager.adjustTimeoutIfNecessary(exception);
+      if (Objects.nonNull(clientManager)) {
+        clientManager.adjustTimeoutIfNecessary(exception);
+      }
     } catch (Exception e) {
       LOGGER.warn("Failed to adjust timeout when failed to transfer file.", e);
     }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 83a12e0..a630c94 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -61,6 +61,7 @@
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
@@ -70,7 +71,8 @@
   private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileDispatcherImpl.class);
 
   private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 1 day
-  private static final AtomicInteger CONNECTION_TIMEOUT_IN_MS =
+  private static final int FIRST_ADJUSTMENT_TIMEOUT_MS = 6 * 60 * 60 * 1000; // 6 hours
+  private static final AtomicInteger CONNECTION_TIMEOUT_MS =
       new AtomicInteger(IoTDBDescriptor.getInstance().getConfig().getConnectionTimeoutInMS());
 
   private String uuid;
@@ -194,7 +196,7 @@
       throws FragmentInstanceDispatchException {
     try (SyncDataNodeInternalServiceClient client =
         internalServiceClientManager.borrowClient(endPoint)) {
-      client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get());
+      client.setTimeout(CONNECTION_TIMEOUT_MS.get());
 
       final TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
       if (!loadResp.isAccepted()) {
@@ -278,7 +280,7 @@
       throws FragmentInstanceDispatchException {
     try (SyncDataNodeInternalServiceClient client =
         internalServiceClientManager.borrowClient(endPoint)) {
-      client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get());
+      client.setTimeout(CONNECTION_TIMEOUT_MS.get());
 
       final TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
       if (!loadResp.isAccepted()) {
@@ -306,18 +308,21 @@
 
   private static void adjustTimeoutIfNecessary(Throwable e) {
     do {
-      if (e instanceof SocketTimeoutException) {
+      if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
         int newConnectionTimeout;
         try {
           newConnectionTimeout =
               Math.min(
-                  Math.toIntExact(CONNECTION_TIMEOUT_IN_MS.get() * 2L), MAX_CONNECTION_TIMEOUT_MS);
+                  Math.max(
+                      FIRST_ADJUSTMENT_TIMEOUT_MS,
+                      Math.toIntExact(CONNECTION_TIMEOUT_MS.get() * 2L)),
+                  MAX_CONNECTION_TIMEOUT_MS);
         } catch (ArithmeticException arithmeticException) {
           newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS;
         }
 
-        if (newConnectionTimeout != CONNECTION_TIMEOUT_IN_MS.get()) {
-          CONNECTION_TIMEOUT_IN_MS.set(newConnectionTimeout);
+        if (newConnectionTimeout != CONNECTION_TIMEOUT_MS.get()) {
+          CONNECTION_TIMEOUT_MS.set(newConnectionTimeout);
           LOGGER.info(
               "Load remote procedure call connection timeout is adjusted to {} ms ({} mins)",
               newConnectionTimeout,
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 2b8110b..186db8e 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -31,6 +31,7 @@
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.transport.TNonblockingSocket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,6 +118,15 @@
     this.shouldReturnSelf.set(shouldReturnSelf);
   }
 
+  public void setTimeoutDynamically(int timeout) {
+    try {
+      ((TNonblockingSocket) ___transport).setTimeout(timeout);
+    } catch (Exception e) {
+      setTimeout(timeout);
+      LOGGER.error("Failed to set timeout dynamically, set it statically", e);
+    }
+  }
+
   private void close() {
     ___transport.close();
     ___currentMethod = null;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index bb1f006..73e0543 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -27,6 +27,8 @@
 
 import java.net.SocketTimeoutException;
 import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class IoTDBClientManager {
 
@@ -42,7 +44,9 @@
   protected boolean supportModsIfIsDataNodeReceiver = true;
 
   private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 1 day
-  protected int connectionTimeout = PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs();
+  private static final int FIRST_ADJUSTMENT_TIMEOUT_MS = 6 * 60 * 60 * 1000; // 6 hours
+  protected static final AtomicInteger CONNECTION_TIMEOUT_MS =
+      new AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
 
   protected IoTDBClientManager(List<TEndPoint> endPointList, boolean useLeaderCache) {
     this.endPointList = endPointList;
@@ -55,21 +59,25 @@
 
   public void adjustTimeoutIfNecessary(Throwable e) {
     do {
-      if (e instanceof SocketTimeoutException) {
+      if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
         int newConnectionTimeout;
         try {
           newConnectionTimeout =
-              Math.min(Math.toIntExact(connectionTimeout * 2L), MAX_CONNECTION_TIMEOUT_MS);
+              Math.min(
+                  Math.max(
+                      FIRST_ADJUSTMENT_TIMEOUT_MS,
+                      Math.toIntExact(CONNECTION_TIMEOUT_MS.get() * 2L)),
+                  MAX_CONNECTION_TIMEOUT_MS);
         } catch (ArithmeticException arithmeticException) {
           newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS;
         }
 
-        if (newConnectionTimeout != connectionTimeout) {
-          connectionTimeout = newConnectionTimeout;
+        if (newConnectionTimeout != CONNECTION_TIMEOUT_MS.get()) {
+          CONNECTION_TIMEOUT_MS.set(newConnectionTimeout);
           LOGGER.info(
               "Pipe connection timeout is adjusted to {} ms ({} mins)",
-              connectionTimeout,
-              connectionTimeout / 60000.0);
+              newConnectionTimeout,
+              newConnectionTimeout / 60000.0);
         }
         return;
       }
@@ -77,6 +85,6 @@
   }
 
   public int getConnectionTimeout() {
-    return connectionTimeout;
+    return CONNECTION_TIMEOUT_MS.get();
   }
 }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index a29d7bd..667313e 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -193,7 +193,7 @@
             resp.getStatus());
       } else {
         clientAndStatus.setRight(true);
-        client.setTimeout(connectionTimeout);
+        client.setTimeout(CONNECTION_TIMEOUT_MS.get());
         LOGGER.info(
             "Handshake success. Target server ip: {}, port: {}",
             client.getIpAddress(),
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
index eeec9c8..fb0830d 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
@@ -63,6 +63,15 @@
   public synchronized void commit(EnrichedEvent event) {
     commitQueue.offer(event);
 
+    LOGGER.info(
+        "COMMIT QUEUE OFFER: pipe name {}, creation time {}, region id {}, event commit id {}, last commit id {}, commit queue size {}",
+        pipeName,
+        creationTime,
+        dataRegionId,
+        event.getCommitId(),
+        lastCommitId.get(),
+        commitQueue.size());
+
     while (!commitQueue.isEmpty()) {
       final EnrichedEvent e = commitQueue.peek();
 
@@ -84,6 +93,14 @@
       e.onCommitted();
       lastCommitId.incrementAndGet();
       commitQueue.poll();
+
+      LOGGER.info(
+          "COMMIT QUEUE POLL: pipe name {}, creation time {}, region id {}, last commit id {}, commit queue size after commit {}",
+          pipeName,
+          creationTime,
+          dataRegionId,
+          lastCommitId.get(),
+          commitQueue.size());
     }
   }