Pipe: dynamically adjust connection timeout to handle SocketTimeoutException & Avoid resource cleaning when load task is in process (#12485)
* Pipe: dynamically adjust connection timeout to handle SocketTimeoutException
* Load: dynamically adjust connection timeout to handle SocketTimeoutException & Avoid resource cleaning when load task is in process
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 56ad0b9..02c02b9 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -291,7 +291,7 @@
AsyncPipeDataTransferServiceClient client = null;
try {
client = clientManager.borrowClient();
- pipeTransferTsFileInsertionEventHandler.transfer(client);
+ pipeTransferTsFileInsertionEventHandler.transfer(clientManager, client);
} catch (final Exception ex) {
logOnClientException(client, ex);
pipeTransferTsFileInsertionEventHandler.onError(ex);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index bcafce0..9e426f4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -23,6 +23,7 @@
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
+import org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
@@ -68,6 +69,7 @@
private final AtomicBoolean isSealSignalSent;
+ private IoTDBDataNodeAsyncClientManager clientManager;
private AsyncPipeDataTransferServiceClient client;
public PipeTransferTsFileInsertionEventHandler(
@@ -93,10 +95,15 @@
isSealSignalSent = new AtomicBoolean(false);
}
- public void transfer(final AsyncPipeDataTransferServiceClient client)
+ public void transfer(
+ IoTDBDataNodeAsyncClientManager clientManager,
+ final AsyncPipeDataTransferServiceClient client)
throws TException, IOException {
+ this.clientManager = clientManager;
this.client = client;
+
client.setShouldReturnSelf(false);
+ client.setTimeout(clientManager.getConnectionTimeout());
final int readLength = reader.read(readBuffer);
@@ -110,7 +117,7 @@
LOGGER.warn("Failed to close file reader when successfully transferred mod file.", e);
}
reader = new RandomAccessFile(tsFile, "r");
- transfer(client);
+ transfer(clientManager, client);
} else if (currentFile == tsFile) {
isSealSignalSent.set(true);
client.pipeTransfer(
@@ -205,7 +212,7 @@
}
}
- transfer(client);
+ transfer(clientManager, client);
} catch (final Exception e) {
onError(e);
}
@@ -221,6 +228,12 @@
exception);
try {
+ clientManager.adjustTimeoutIfNecessary(exception);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to adjust timeout when failed to transfer file.", e);
+ }
+
+ try {
if (reader != null) {
reader.close();
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 9919d954..f873cc2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -350,6 +350,7 @@
modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length()));
} catch (final Exception e) {
clientAndStatus.setRight(false);
+ clientManager.adjustTimeoutIfNecessary(e);
throw new PipeConnectionException(
String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()),
e);
@@ -366,6 +367,7 @@
tsFile.getName(), tsFile.length()));
} catch (final Exception e) {
clientAndStatus.setRight(false);
+ clientManager.adjustTimeoutIfNecessary(e);
throw new PipeConnectionException(
String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()),
e);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
index 90c6a2f..a24b6c4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java
@@ -208,7 +208,7 @@
private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws TTransportException {
return new IoTDBSyncClient(
new ThriftClientProperty.Builder()
- .setConnectionTimeoutMs((int) PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+ .setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
.setRpcThriftCompressionEnabled(
PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
.build(),
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
index 99d454d..4cd97d7 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java
@@ -68,7 +68,7 @@
@Override
public void runMayThrow() throws Throwable {
- socket.setSoTimeout((int) PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+ socket.setSoTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
socket.setKeepAlive(true);
LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId, socket);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index ef4ed7f..b568b88 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -54,6 +54,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
@@ -104,6 +105,13 @@
final CleanupTask cleanupTask = cleanupTaskQueue.peek();
if (cleanupTask.scheduledTime <= System.currentTimeMillis()) {
+ if (cleanupTask.isLoadTaskRunning) {
+ cleanupTaskQueue.poll();
+ cleanupTask.resetScheduledTime();
+ cleanupTaskQueue.add(cleanupTask);
+ continue;
+ }
+
cleanupTask.run();
uuid2CleanupTask.remove(cleanupTask.uuid);
@@ -157,17 +165,24 @@
}
}
- final TsFileWriterManager writerManager =
- uuid2WriterManager.computeIfAbsent(
- uuid, o -> new TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid)));
- for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
- if (!tsFileData.isModification()) {
- ChunkData chunkData = (ChunkData) tsFileData;
- writerManager.write(
- new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData);
- } else {
- writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData);
+ final Optional<CleanupTask> cleanupTask = Optional.of(uuid2CleanupTask.get(uuid));
+ cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
+ try {
+ final TsFileWriterManager writerManager =
+ uuid2WriterManager.computeIfAbsent(
+ uuid,
+ o -> new TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid)));
+ for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
+ if (!tsFileData.isModification()) {
+ ChunkData chunkData = (ChunkData) tsFileData;
+ writerManager.write(
+ new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData);
+ } else {
+ writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData);
+ }
}
+ } finally {
+ cleanupTask.ifPresent(CleanupTask::markLoadTaskNotRunning);
}
}
@@ -176,7 +191,15 @@
if (!uuid2WriterManager.containsKey(uuid)) {
return false;
}
- uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, progressIndex);
+
+ final Optional<CleanupTask> cleanupTask = Optional.of(uuid2CleanupTask.get(uuid));
+ cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
+ try {
+ uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, progressIndex);
+ } finally {
+ cleanupTask.ifPresent(CleanupTask::markLoadTaskNotRunning);
+ }
+
clean(uuid);
return true;
}
@@ -412,12 +435,30 @@
private class CleanupTask implements Runnable, Comparable<CleanupTask> {
private final String uuid;
- private final long scheduledTime;
+ private final long delayInMs;
+ private long scheduledTime;
+
+ private volatile boolean isLoadTaskRunning = false;
private volatile boolean isCanceled = false;
private CleanupTask(String uuid, long delayInMs) {
this.uuid = uuid;
+ this.delayInMs = delayInMs;
+ resetScheduledTime();
+ }
+
+ public void markLoadTaskRunning() {
+ isLoadTaskRunning = true;
+ resetScheduledTime();
+ }
+
+ public void markLoadTaskNotRunning() {
+ isLoadTaskRunning = false;
+ resetScheduledTime();
+ }
+
+ public void resetScheduledTime() {
scheduledTime = System.currentTimeMillis() + delayInMs;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 0d5f0bb..83a12e0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -24,7 +24,6 @@
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -51,23 +50,29 @@
import org.apache.iotdb.rpc.TSStatusCode;
import io.airlift.concurrent.SetThreadName;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.util.concurrent.Futures.immediateFuture;
public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
+
private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileDispatcherImpl.class);
+ private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 1 day
+ private static final AtomicInteger CONNECTION_TIMEOUT_IN_MS =
+ new AtomicInteger(IoTDBDescriptor.getInstance().getConfig().getConnectionTimeoutInMS());
+
private String uuid;
private final String localhostIpAddr;
private final int localhostInternalPort;
@@ -76,8 +81,6 @@
private final ExecutorService executor;
private final boolean isGeneratedByPipe;
- private static final String NODE_CONNECTION_ERROR = "can't connect to node {}";
-
public LoadTsFileDispatcherImpl(
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager,
boolean isGeneratedByPipe) {
@@ -137,78 +140,6 @@
}
}
- private boolean isDispatchedToLocal(TEndPoint endPoint) {
- return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port;
- }
-
- private void dispatchRemote(TTsFilePieceReq loadTsFileReq, TEndPoint endPoint)
- throws FragmentInstanceDispatchException {
- try (SyncDataNodeInternalServiceClient client =
- internalServiceClientManager.borrowClient(endPoint)) {
- TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
- if (!loadResp.isAccepted()) {
- LOGGER.warn(loadResp.message);
- throw new FragmentInstanceDispatchException(loadResp.status);
- }
- } catch (ClientManagerException | TException e) {
- String warning = NODE_CONNECTION_ERROR;
- LOGGER.warn(warning, endPoint, e);
- TSStatus status = new TSStatus();
- status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
- status.setMessage(warning + endPoint);
- throw new FragmentInstanceDispatchException(status);
- }
- }
-
- private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint endPoint)
- throws FragmentInstanceDispatchException {
- try (SyncDataNodeInternalServiceClient client =
- internalServiceClientManager.borrowClient(endPoint)) {
- TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
- if (!loadResp.isAccepted()) {
- LOGGER.warn(loadResp.message);
- throw new FragmentInstanceDispatchException(loadResp.status);
- }
- } catch (ClientManagerException | TException e) {
- LOGGER.warn(NODE_CONNECTION_ERROR, endPoint, e);
- TSStatus status = new TSStatus();
- status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
- status.setMessage(
- "can't connect to node "
- + endPoint
- + ", please reset longer dn_connection_timeout_ms "
- + "in iotdb-datanode.properties and restart iotdb.");
- throw new FragmentInstanceDispatchException(status);
- }
- }
-
- private void dispatchLocally(TLoadCommandReq loadCommandReq)
- throws FragmentInstanceDispatchException {
- final ProgressIndex progressIndex;
- if (loadCommandReq.isSetProgressIndex()) {
- progressIndex =
- ProgressIndexType.deserializeFrom(ByteBuffer.wrap(loadCommandReq.getProgressIndex()));
- } else {
- // fallback to use local generated progress index for compatibility
- progressIndex = PipeAgent.runtime().getNextProgressIndexForTsFileLoad();
- LOGGER.info(
- "Use local generated load progress index {} for uuid {}.",
- progressIndex,
- loadCommandReq.uuid);
- }
-
- final TSStatus resultStatus =
- StorageEngine.getInstance()
- .executeLoadCommand(
- LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
- loadCommandReq.uuid,
- loadCommandReq.isSetIsGeneratedByPipe() && loadCommandReq.isGeneratedByPipe,
- progressIndex);
- if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
- throw new FragmentInstanceDispatchException(resultStatus);
- }
- }
-
public void dispatchLocally(FragmentInstance instance) throws FragmentInstanceDispatchException {
LOGGER.info("Receive load node from uuid {}.", uuid);
@@ -259,6 +190,32 @@
}
}
+ private void dispatchRemote(TTsFilePieceReq loadTsFileReq, TEndPoint endPoint)
+ throws FragmentInstanceDispatchException {
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
+ client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get());
+
+ final TLoadResp loadResp = client.sendTsFilePieceNode(loadTsFileReq);
+ if (!loadResp.isAccepted()) {
+ LOGGER.warn(loadResp.message);
+ throw new FragmentInstanceDispatchException(loadResp.status);
+ }
+ } catch (Exception e) {
+ adjustTimeoutIfNecessary(e);
+
+ final String exceptionMessage =
+ String.format(
+ "failed to dispatch load command %s to node %s because of exception: %s",
+ loadTsFileReq, endPoint, e);
+ LOGGER.warn(exceptionMessage, e);
+ throw new FragmentInstanceDispatchException(
+ new TSStatus()
+ .setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode())
+ .setMessage(exceptionMessage));
+ }
+ }
+
public Future<FragInstanceDispatchResult> dispatchCommand(
TLoadCommandReq loadCommandReq, Set<TRegionReplicaSet> replicaSets) {
Set<TEndPoint> allEndPoint = new HashSet<>();
@@ -290,6 +247,87 @@
return immediateFuture(new FragInstanceDispatchResult(true));
}
+ private void dispatchLocally(TLoadCommandReq loadCommandReq)
+ throws FragmentInstanceDispatchException {
+ final ProgressIndex progressIndex;
+ if (loadCommandReq.isSetProgressIndex()) {
+ progressIndex =
+ ProgressIndexType.deserializeFrom(ByteBuffer.wrap(loadCommandReq.getProgressIndex()));
+ } else {
+ // fallback to use local generated progress index for compatibility
+ progressIndex = PipeAgent.runtime().getNextProgressIndexForTsFileLoad();
+ LOGGER.info(
+ "Use local generated load progress index {} for uuid {}.",
+ progressIndex,
+ loadCommandReq.uuid);
+ }
+
+ final TSStatus resultStatus =
+ StorageEngine.getInstance()
+ .executeLoadCommand(
+ LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
+ loadCommandReq.uuid,
+ loadCommandReq.isSetIsGeneratedByPipe() && loadCommandReq.isGeneratedByPipe,
+ progressIndex);
+ if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
+ throw new FragmentInstanceDispatchException(resultStatus);
+ }
+ }
+
+ private void dispatchRemote(TLoadCommandReq loadCommandReq, TEndPoint endPoint)
+ throws FragmentInstanceDispatchException {
+ try (SyncDataNodeInternalServiceClient client =
+ internalServiceClientManager.borrowClient(endPoint)) {
+ client.setTimeout(CONNECTION_TIMEOUT_IN_MS.get());
+
+ final TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
+ if (!loadResp.isAccepted()) {
+ LOGGER.warn(loadResp.message);
+ throw new FragmentInstanceDispatchException(loadResp.status);
+ }
+ } catch (Exception e) {
+ adjustTimeoutIfNecessary(e);
+
+ final String exceptionMessage =
+ String.format(
+ "failed to dispatch load command %s to node %s because of exception: %s",
+ loadCommandReq, endPoint, e);
+ LOGGER.warn(exceptionMessage, e);
+ throw new FragmentInstanceDispatchException(
+ new TSStatus()
+ .setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode())
+ .setMessage(exceptionMessage));
+ }
+ }
+
+ private boolean isDispatchedToLocal(TEndPoint endPoint) {
+ return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port;
+ }
+
+ private static void adjustTimeoutIfNecessary(Throwable e) {
+ do {
+ if (e instanceof SocketTimeoutException) {
+ int newConnectionTimeout;
+ try {
+ newConnectionTimeout =
+ Math.min(
+ Math.toIntExact(CONNECTION_TIMEOUT_IN_MS.get() * 2L), MAX_CONNECTION_TIMEOUT_MS);
+ } catch (ArithmeticException arithmeticException) {
+ newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS;
+ }
+
+ if (newConnectionTimeout != CONNECTION_TIMEOUT_IN_MS.get()) {
+ CONNECTION_TIMEOUT_IN_MS.set(newConnectionTimeout);
+ LOGGER.info(
+ "Load remote procedure call connection timeout is adjusted to {} ms ({} mins)",
+ newConnectionTimeout,
+ newConnectionTimeout / 60000.0);
+ }
+ return;
+ }
+ } while ((e = e.getCause()) != null);
+ }
+
@Override
public void abort() {
// Do nothing
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 5bcff34..c8c35e0 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -41,7 +41,6 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor;
import org.apache.iotdb.db.exception.DataRegionException;
-import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.LoadReadOnlyException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
@@ -903,7 +902,7 @@
status.setCode(TSStatusCode.ILLEGAL_PARAMETER.getStatusCode());
status.setMessage(String.format("Wrong load command %s.", loadCommand));
}
- } catch (IOException | LoadFileException e) {
+ } catch (Exception e) {
LOGGER.error("Execute load command {} error.", loadCommand, e);
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
status.setMessage(e.getMessage());
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 4e4e76e..493ea06 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -185,8 +185,8 @@
private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50; // 50B
private int pipeExtractorMatcherCacheSize = 1024;
- private long pipeConnectorHandshakeTimeoutMs = 10 * 1000L; // 10 seconds
- private long pipeConnectorTransferTimeoutMs = 15 * 60 * 1000L; // 15 minutes
+ private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
+ private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
private int pipeConnectorReadFileBufferSize = 8388608;
private long pipeConnectorRetryIntervalMs = 1000L;
// recommend to set this value to 3 * pipeSubtaskExecutorMaxThreadNum *
@@ -634,20 +634,32 @@
this.pipeExtractorMatcherCacheSize = pipeExtractorMatcherCacheSize;
}
- public long getPipeConnectorHandshakeTimeoutMs() {
+ public int getPipeConnectorHandshakeTimeoutMs() {
return pipeConnectorHandshakeTimeoutMs;
}
public void setPipeConnectorHandshakeTimeoutMs(long pipeConnectorHandshakeTimeoutMs) {
- this.pipeConnectorHandshakeTimeoutMs = pipeConnectorHandshakeTimeoutMs;
+ try {
+ this.pipeConnectorHandshakeTimeoutMs = Math.toIntExact(pipeConnectorHandshakeTimeoutMs);
+ } catch (ArithmeticException e) {
+ this.pipeConnectorHandshakeTimeoutMs = Integer.MAX_VALUE;
+ logger.warn(
+ "Given pipe connector handshake timeout is too large, set to {} ms.", Integer.MAX_VALUE);
+ }
}
- public long getPipeConnectorTransferTimeoutMs() {
+ public int getPipeConnectorTransferTimeoutMs() {
return pipeConnectorTransferTimeoutMs;
}
public void setPipeConnectorTransferTimeoutMs(long pipeConnectorTransferTimeoutMs) {
- this.pipeConnectorTransferTimeoutMs = pipeConnectorTransferTimeoutMs;
+ try {
+ this.pipeConnectorTransferTimeoutMs = Math.toIntExact(pipeConnectorTransferTimeoutMs);
+ } catch (ArithmeticException e) {
+ this.pipeConnectorTransferTimeoutMs = Integer.MAX_VALUE;
+ logger.warn(
+ "Given pipe connector transfer timeout is too large, set to {} ms.", Integer.MAX_VALUE);
+ }
}
public int getPipeConnectorReadFileBufferSize() {
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 7a0391d..dbe4b6a 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -95,11 +95,11 @@
/////////////////////////////// Connector ///////////////////////////////
- public long getPipeConnectorHandshakeTimeoutMs() {
+ public int getPipeConnectorHandshakeTimeoutMs() {
return COMMON_CONFIG.getPipeConnectorHandshakeTimeoutMs();
}
- public long getPipeConnectorTransferTimeoutMs() {
+ public int getPipeConnectorTransferTimeoutMs() {
return COMMON_CONFIG.getPipeConnectorTransferTimeoutMs();
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
index b645b91..bb1f006 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBClientManager.java
@@ -20,11 +20,18 @@
package org.apache.iotdb.commons.pipe.connector.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketTimeoutException;
import java.util.List;
public abstract class IoTDBClientManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBClientManager.class);
+
protected final List<TEndPoint> endPointList;
protected long currentClientIndex = 0;
@@ -34,6 +41,9 @@
// it is a DataNode receiver. The flag is useless for configNode receiver.
protected boolean supportModsIfIsDataNodeReceiver = true;
+ private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 1 day
+ protected int connectionTimeout = PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs();
+
protected IoTDBClientManager(List<TEndPoint> endPointList, boolean useLeaderCache) {
this.endPointList = endPointList;
this.useLeaderCache = useLeaderCache;
@@ -42,4 +52,31 @@
public boolean supportModsIfIsDataNodeReceiver() {
return supportModsIfIsDataNodeReceiver;
}
+
+ public void adjustTimeoutIfNecessary(Throwable e) {
+ do {
+ if (e instanceof SocketTimeoutException) {
+ int newConnectionTimeout;
+ try {
+ newConnectionTimeout =
+ Math.min(Math.toIntExact(connectionTimeout * 2L), MAX_CONNECTION_TIMEOUT_MS);
+ } catch (ArithmeticException arithmeticException) {
+ newConnectionTimeout = MAX_CONNECTION_TIMEOUT_MS;
+ }
+
+ if (newConnectionTimeout != connectionTimeout) {
+ connectionTimeout = newConnectionTimeout;
+ LOGGER.info(
+ "Pipe connection timeout is adjusted to {} ms ({} mins)",
+ connectionTimeout,
+ connectionTimeout / 60000.0);
+ }
+ return;
+ }
+ } while ((e = e.getCause()) != null);
+ }
+
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
index c262313..a29d7bd 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/client/IoTDBSyncClientManager.java
@@ -142,7 +142,7 @@
clientAndStatus.setLeft(
new IoTDBSyncClient(
new ThriftClientProperty.Builder()
- .setConnectionTimeoutMs((int) PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
+ .setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
.setRpcThriftCompressionEnabled(
PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
.build(),
@@ -193,7 +193,7 @@
resp.getStatus());
} else {
clientAndStatus.setRight(true);
- client.setTimeout((int) PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+ client.setTimeout(connectionTimeout);
LOGGER.info(
"Handshake success. Target server ip: {}, port: {}",
client.getIpAddress(),
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
index 1977c32..3034b58 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBAirGapConnector.java
@@ -188,7 +188,7 @@
} else {
supportModsIfIsDataNodeReceiver = true;
}
- socket.setSoTimeout((int) PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
+ socket.setSoTimeout(PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
LOGGER.info("Handshake success. Socket: {}", socket);
}