Pipe: Globally adjust timeout when syncing huge tsfiles & Speed up file transfer after sync task failed (#12491)
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 02c02b9..1e01b82 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -161,6 +161,11 @@
return;
}
+ transferWithoutCheck(tabletInsertionEvent);
+ }
+
+ private void transferWithoutCheck(final TabletInsertionEvent tabletInsertionEvent)
+ throws Exception {
if (isTabletBatchModeEnabled) {
if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
final PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler =
@@ -265,6 +270,11 @@
return;
}
+ transferWithoutCheck(tsFileInsertionEvent);
+ }
+
+ private void transferWithoutCheck(final TsFileInsertionEvent tsFileInsertionEvent)
+ throws Exception {
final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
(PipeTsFileInsertionEvent) tsFileInsertionEvent;
// We increase the reference count for this event to determine if the event may be released.
@@ -275,15 +285,23 @@
return;
}
- // Just in case. To avoid the case that exception occurred when constructing the handler.
- if (!pipeTsFileInsertionEvent.getTsFile().exists()) {
- throw new FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
+ // We assume that no exceptions will be thrown after reference count is increased.
+ try {
+ // Just in case. To avoid the case that exception occurred when constructing the handler.
+ if (!pipeTsFileInsertionEvent.getTsFile().exists()) {
+ throw new FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
+ }
+
+ final PipeTransferTsFileInsertionEventHandler pipeTransferTsFileInsertionEventHandler =
+ new PipeTransferTsFileInsertionEventHandler(pipeTsFileInsertionEvent, this);
+
+ transfer(pipeTransferTsFileInsertionEventHandler);
+ } catch (Exception e) {
+ // Just in case. To avoid the case that exception occurred when constructing the handler.
+ pipeTsFileInsertionEvent.decreaseReferenceCount(
+ IoTDBDataRegionAsyncConnector.class.getName(), false);
+ throw e;
}
-
- final PipeTransferTsFileInsertionEventHandler pipeTransferTsFileInsertionEventHandler =
- new PipeTransferTsFileInsertionEventHandler(pipeTsFileInsertionEvent, this);
-
- transfer(pipeTransferTsFileInsertionEventHandler);
}
private void transfer(
@@ -350,7 +368,8 @@
} else if (peekedEvent instanceof PipeRawTabletInsertionEvent) {
retryConnector.transfer((PipeRawTabletInsertionEvent) peekedEvent);
} else if (peekedEvent instanceof PipeTsFileInsertionEvent) {
- retryConnector.transfer((PipeTsFileInsertionEvent) peekedEvent);
+ // Using the async connector to transfer the event for performance.
+ transferWithoutCheck((PipeTsFileInsertionEvent) peekedEvent);
} else {
LOGGER.warn(
"IoTDBThriftAsyncConnector does not support transfer generic event: {}.", peekedEvent);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 9e426f4..213d980 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -103,7 +103,7 @@
this.client = client;
client.setShouldReturnSelf(false);
- client.setTimeout(clientManager.getConnectionTimeout());
+ client.setTimeoutDynamically(clientManager.getConnectionTimeout());
final int readLength = reader.read(readBuffer);
@@ -228,7 +228,9 @@
exception);
try {
- clientManager.adjustTimeoutIfNecessary(exception);
+ if (Objects.nonNull(clientManager)) {
+ clientManager.adjustTimeoutIfNecessary(exception);
+ }
} catch (Exception e) {
LOGGER.warn("Failed to adjust timeout when failed to transfer file.", e);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 83a12e0..a630c94 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -61,6 +61,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.util.concurrent.Futures.immediateFuture;
@@ -70,7 +71,8 @@
private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileDispatcherImpl.class);
private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 1 day
- private static final AtomicInteger CONNECTION_TIMEOUT_IN_MS =
+ private static final int FIRST_ADJUSTMENT_TIMEOUT_MS = 6 * 60 * 60 * 1000; // 6 hours
+ private static final AtomicInteger CONNECTION_TIMEOUT_MS =
new AtomicInteger(IoTDBDescriptor.getInstance().getConfig().getConnectionTimeoutInMS());
private String uuid;
@@ -194,7 +196,7 @@
throws FragmentInstanceDispatchException {
try (SyncDataNodeInternalServiceClient client =
internalServiceClientManager.borrowClient(endPoint)) {
- client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get());
+ client.setTimeout(CONNECTION_TIMEOUT_MS.get());
final TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
if (!loadResp.isAccepted()) {
@@ -278,7 +280,7 @@
throws FragmentInstanceDispatchException {
try (SyncDataNodeInternalServiceClient client =
internalServiceClientManager.borrowClient(endPoint)) {
- client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get());
+ client.setTimeout(CONNECTION_TIMEOUT_MS.get());
final TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
if (!loadResp.isAccepted()) {
@@ -306,18 +308,21 @@
private static void adjustTimeoutIfNecessary(Throwable e) {
do {
- if (e instanceof SocketTimeoutException) {
+ if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
int newConnectionTimeout;
try {
newConnectionTimeout =
Math.min(
- Math.toIntExact(CONNECTION_TIMEOUT_IN_MS.get() * 2L), MAX_CONNECTION_TIMEOUT_MS);
+ Math.max(
+ FIRST_ADJUSTMENT_TIMEOUT_MS,
+ Math.toIntExact(CONNECTION_TIMEOUT_MS.get() * 2L)),
+ MAX_CONNECTION_TIMEOUT_MS);
} catch (ArithmeticException arithmeticException) {
newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS;
}
- if (newConnectionTimeout != CONNECTION_TIMEOUT_IN_MS.get()) {
- CONNECTION_TIMEOUT_IN_MS.set(newConnectionTimeout);
+ if (newConnectionTimeout != CONNECTION_TIMEOUT_MS.get()) {
+ CONNECTION_TIMEOUT_MS.set(newConnectionTimeout);
LOGGER.info(
"Load remote procedure call connection timeout is adjusted to {} ms ({} mins)",
newConnectionTimeout,
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index 2b8110b..186db8e 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -31,6 +31,7 @@
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.transport.TNonblockingSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,6 +118,15 @@
this.shouldReturnSelf.set(shouldReturnSelf);
}
+ public void setTimeoutDynamically(int timeout) {
+ try {
+ ((TNonblockingSocket) ___transport).setTimeout(timeout);
+ } catch (Exception e) {
+ setTimeout(timeout);
+ LOGGER.error("Failed to set timeout dynamically, set it statically", e);
+ }
+ }
+
private void close() {
___transport.close();
___currentMethod = null;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index bb1f006..73e0543 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -27,6 +27,8 @@
import java.net.SocketTimeoutException;
import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
public abstract class IoTDBClientManager {
@@ -42,7 +44,9 @@
protected boolean supportModsIfIsDataNodeReceiver = true;
private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 1 day
- protected int connectionTimeout = PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs();
+ private static final int FIRST_ADJUSTMENT_TIMEOUT_MS = 6 * 60 * 60 * 1000; // 6 hours
+ protected static final AtomicInteger CONNECTION_TIMEOUT_MS =
+ new AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
protected IoTDBClientManager(List<TEndPoint> endPointList, boolean useLeaderCache) {
this.endPointList = endPointList;
@@ -55,21 +59,25 @@
public void adjustTimeoutIfNecessary(Throwable e) {
do {
- if (e instanceof SocketTimeoutException) {
+ if (e instanceof SocketTimeoutException || e instanceof TimeoutException) {
int newConnectionTimeout;
try {
newConnectionTimeout =
- Math.min(Math.toIntExact(connectionTimeout * 2L), MAX_CONNECTION_TIMEOUT_MS);
+ Math.min(
+ Math.max(
+ FIRST_ADJUSTMENT_TIMEOUT_MS,
+ Math.toIntExact(CONNECTION_TIMEOUT_MS.get() * 2L)),
+ MAX_CONNECTION_TIMEOUT_MS);
} catch (ArithmeticException arithmeticException) {
newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS;
}
- if (newConnectionTimeout != connectionTimeout) {
- connectionTimeout = newConnectionTimeout;
+ if (newConnectionTimeout != CONNECTION_TIMEOUT_MS.get()) {
+ CONNECTION_TIMEOUT_MS.set(newConnectionTimeout);
LOGGER.info(
"Pipe connection timeout is adjusted to {} ms ({} mins)",
- connectionTimeout,
- connectionTimeout / 60000.0);
+ newConnectionTimeout,
+ newConnectionTimeout / 60000.0);
}
return;
}
@@ -77,6 +85,6 @@
}
public int getConnectionTimeout() {
- return connectionTimeout;
+ return CONNECTION_TIMEOUT_MS.get();
}
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index a29d7bd..667313e 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -193,7 +193,7 @@
resp.getStatus());
} else {
clientAndStatus.setRight(true);
- client.setTimeout(connectionTimeout);
+ client.setTimeout(CONNECTION_TIMEOUT_MS.get());
LOGGER.info(
"Handshake success. Target server ip: {}, port: {}",
client.getIpAddress(),
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
index eeec9c8..fb0830d 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
@@ -63,6 +63,15 @@
public synchronized void commit(EnrichedEvent event) {
commitQueue.offer(event);
+ LOGGER.info(
+ "COMMIT QUEUE OFFER: pipe name {}, creation time {}, region id {}, event commit id {}, last commit id {}, commit queue size {}",
+ pipeName,
+ creationTime,
+ dataRegionId,
+ event.getCommitId(),
+ lastCommitId.get(),
+ commitQueue.size());
+
while (!commitQueue.isEmpty()) {
final EnrichedEvent e = commitQueue.peek();
@@ -84,6 +93,14 @@
e.onCommitted();
lastCommitId.incrementAndGet();
commitQueue.poll();
+
+ LOGGER.info(
+ "COMMIT QUEUE POLL: pipe name {}, creation time {}, region id {}, last commit id {}, commit queue size after commit {}",
+ pipeName,
+ creationTime,
+ dataRegionId,
+ lastCommitId.get(),
+ commitQueue.size());
}
}