blob: 78de2ecf3868474150aa9958e33d4d27a77b66d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.execution.exchange.source;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TCloseSinkChannelEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockResponse;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import org.apache.iotdb.tsfile.utils.Pair;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
import static org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet.GET_DATA_BLOCK_TASK_CALLER;
import static org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet.ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_CALLER;
import static org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet.SOURCE_HANDLE_DESERIALIZE_TSBLOCK_REMOTE;
import static org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet.SOURCE_HANDLE_GET_TSBLOCK_REMOTE;
import static org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet.GET_DATA_BLOCK_NUM_CALLER;
import static org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet.ON_ACKNOWLEDGE_DATA_BLOCK_NUM_CALLER;
public class SourceHandle implements ISourceHandle {
private static final Logger LOGGER = LoggerFactory.getLogger(SourceHandle.class);
public static final int MAX_ATTEMPT_TIMES = 3;
private static final long DEFAULT_RETRY_INTERVAL_IN_MS = 1000;
private final TEndPoint remoteEndpoint;
private final TFragmentInstanceId remoteFragmentInstanceId;
private final TFragmentInstanceId localFragmentInstanceId;
private final String fullFragmentInstanceId;
private final String localPlanNodeId;
private final int indexOfUpstreamSinkHandle;
private final LocalMemoryManager localMemoryManager;
private final ExecutorService executorService;
private final TsBlockSerde serde;
private final SourceHandleListener sourceHandleListener;
private final Map<Integer, Long> sequenceIdToDataBlockSize = new HashMap<>();
private final Map<Integer, ByteBuffer> sequenceIdToTsBlock = new HashMap<>();
private final String threadName;
private long retryIntervalInMs;
private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager;
private SettableFuture<Void> blocked = SettableFuture.create();
private ListenableFuture<Void> blockedOnMemory;
/** The actual buffered memory in bytes, including the amount of memory being reserved. */
private long bufferRetainedSizeInBytes = 0L;
private int currSequenceId = 0;
private int nextSequenceId = 0;
private int lastSequenceId = Integer.MAX_VALUE;
private boolean aborted = false;
private boolean closed = false;
/** max bytes this SourceHandle can reserve. */
private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
/**
* this is set to true after calling isBlocked() at least once which indicates that this
* SourceHandle needs to output data.
*/
private boolean canGetTsBlockFromRemote = false;
private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET =
DataExchangeCostMetricSet.getInstance();
private static final DataExchangeCountMetricSet DATA_EXCHANGE_COUNT_METRIC_SET =
DataExchangeCountMetricSet.getInstance();
@SuppressWarnings("squid:S107")
public SourceHandle(
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
int indexOfUpstreamSinkHandle,
LocalMemoryManager localMemoryManager,
ExecutorService executorService,
TsBlockSerde serde,
SourceHandleListener sourceHandleListener,
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager) {
this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndpoint can not be null.");
this.remoteFragmentInstanceId =
Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId can not be null.");
this.localFragmentInstanceId =
Validate.notNull(localFragmentInstanceId, "localFragmentInstanceId can not be null.");
this.fullFragmentInstanceId =
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
this.localPlanNodeId = Validate.notNull(localPlanNodeId, "localPlanNodeId can not be null.");
this.indexOfUpstreamSinkHandle = indexOfUpstreamSinkHandle;
this.localMemoryManager =
Validate.notNull(localMemoryManager, "localMemoryManager can not be null.");
this.executorService = Validate.notNull(executorService, "executorService can not be null.");
this.serde = Validate.notNull(serde, "serde can not be null.");
this.sourceHandleListener =
Validate.notNull(sourceHandleListener, "sourceHandleListener can not be null.");
this.bufferRetainedSizeInBytes = 0L;
this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager;
this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
this.threadName = createFullIdFrom(localFragmentInstanceId, localPlanNodeId);
localMemoryManager
.getQueryPool()
.registerPlanNodeIdToQueryMemoryMap(
localFragmentInstanceId.queryId, fullFragmentInstanceId, localPlanNodeId);
}
@Override
public synchronized TsBlock receive() {
ByteBuffer tsBlock = getSerializedTsBlock();
if (tsBlock != null) {
long startTime = System.nanoTime();
try {
return serde.deserialize(tsBlock);
} finally {
DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(
SOURCE_HANDLE_DESERIALIZE_TSBLOCK_REMOTE, System.nanoTime() - startTime);
}
} else {
return null;
}
}
@Override
public synchronized ByteBuffer getSerializedTsBlock() {
long startTime = System.nanoTime();
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
checkState();
if (!blocked.isDone()) {
throw new IllegalStateException("Source handle is blocked.");
}
ByteBuffer tsBlock = sequenceIdToTsBlock.remove(currSequenceId);
if (tsBlock == null) {
return null;
}
long retainedSize = sequenceIdToDataBlockSize.remove(currSequenceId);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[GetTsBlockFromBuffer] sequenceId:{}, size:{}", currSequenceId, retainedSize);
}
currSequenceId += 1;
bufferRetainedSizeInBytes -= retainedSize;
localMemoryManager
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
retainedSize);
if (sequenceIdToTsBlock.isEmpty() && !isFinished()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[WaitForMoreTsBlock]");
}
blocked = SettableFuture.create();
}
if (isFinished()) {
sourceHandleListener.onFinished(this);
}
trySubmitGetDataBlocksTask();
return tsBlock;
} finally {
DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(
SOURCE_HANDLE_GET_TSBLOCK_REMOTE, System.nanoTime() - startTime);
}
}
private synchronized void trySubmitGetDataBlocksTask() {
if (aborted || closed) {
return;
}
if (blockedOnMemory != null && !blockedOnMemory.isDone()) {
return;
}
final int startSequenceId = nextSequenceId;
int endSequenceId = nextSequenceId;
long reservedBytes = 0L;
Pair<ListenableFuture<Void>, Boolean> pair = null;
long blockedSize = 0L;
while (sequenceIdToDataBlockSize.containsKey(endSequenceId)) {
Long bytesToReserve = sequenceIdToDataBlockSize.get(endSequenceId);
if (bytesToReserve == null) {
throw new IllegalStateException("Data block size is null.");
}
pair =
localMemoryManager
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
bytesToReserve,
maxBytesCanReserve);
bufferRetainedSizeInBytes += bytesToReserve;
endSequenceId += 1;
reservedBytes += bytesToReserve;
if (!Boolean.TRUE.equals(pair.right)) {
blockedSize = bytesToReserve;
break;
}
}
if (pair == null) {
// Next data block not generated yet. Do nothing.
return;
}
nextSequenceId = endSequenceId;
if (!Boolean.TRUE.equals(pair.right)) {
endSequenceId--;
reservedBytes -= blockedSize;
// The future being not completed indicates,
// 1. Memory has been reserved for blocks in [startSequenceId, endSequenceId).
// 2. Memory reservation for block whose sequence ID equals endSequenceId - 1 is blocked.
// 3. Have not reserve memory for the rest of blocks.
//
// startSequenceId endSequenceId - 1 endSequenceId
// |-------- reserved --------|--- blocked ---|--- not reserved ---|
// Schedule another call of trySubmitGetDataBlocksTask for the rest of blocks.
blockedOnMemory = pair.left;
final int blockedSequenceId = endSequenceId;
final long blockedRetainedSize = blockedSize;
blockedOnMemory.addListener(
() ->
executorService.submit(
new GetDataBlocksTask(
blockedSequenceId, blockedSequenceId + 1, blockedRetainedSize)),
executorService);
}
if (endSequenceId > startSequenceId) {
executorService.submit(new GetDataBlocksTask(startSequenceId, endSequenceId, reservedBytes));
}
}
@Override
public synchronized ListenableFuture<?> isBlocked() {
checkState();
if (!canGetTsBlockFromRemote) {
canGetTsBlockFromRemote = true;
// submit get data task once isBlocked is called to ensure that the blocked future will be
// completed in case that trySubmitGetDataBlocksTask() is not called.
trySubmitGetDataBlocksTask();
}
return nonCancellationPropagating(blocked);
}
public synchronized void setNoMoreTsBlocks(int lastSequenceId) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[ReceiveNoMoreTsBlockEvent]");
}
this.lastSequenceId = lastSequenceId;
if (!blocked.isDone() && remoteTsBlockedConsumedUp()) {
blocked.set(null);
}
if (isFinished()) {
sourceHandleListener.onFinished(this);
}
}
public synchronized void updatePendingDataBlockInfo(
int startSequenceId, List<Long> dataBlockSizes) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"[ReceiveNewTsBlockNotification] [{}, {}), each size is: {}",
startSequenceId,
startSequenceId + dataBlockSizes.size(),
dataBlockSizes);
}
for (int i = 0; i < dataBlockSizes.size(); i++) {
sequenceIdToDataBlockSize.put(i + startSequenceId, dataBlockSizes.get(i));
}
if (canGetTsBlockFromRemote) {
trySubmitGetDataBlocksTask();
}
}
@Override
public synchronized void abort() {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
if (aborted || closed) {
return;
}
if (blocked != null && !blocked.isDone()) {
blocked.cancel(true);
}
if (blockedOnMemory != null) {
bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
}
sequenceIdToDataBlockSize.clear();
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
aborted = true;
sourceHandleListener.onAborted(this);
}
}
@Override
public synchronized void abort(Throwable t) {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
if (aborted || closed) {
return;
}
if (blocked != null && !blocked.isDone()) {
blocked.setException(t);
}
if (blockedOnMemory != null) {
bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
}
sequenceIdToDataBlockSize.clear();
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
aborted = true;
sourceHandleListener.onAborted(this);
}
}
@Override
public synchronized void close() {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
if (aborted || closed) {
return;
}
if (blocked != null && !blocked.isDone()) {
blocked.set(null);
}
if (blockedOnMemory != null) {
bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
}
sequenceIdToDataBlockSize.clear();
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
closed = true;
executorService.submit(new SendCloseSinkChannelEventTask());
currSequenceId = lastSequenceId + 1;
sourceHandleListener.onFinished(this);
}
}
@Override
public boolean isFinished() {
return remoteTsBlockedConsumedUp();
}
// Return true indicates two points:
// 1. Remote SinkHandle has told SourceHandle the total count of TsBlocks by lastSequenceId
// 2. All the TsBlocks has been consumed up
private synchronized boolean remoteTsBlockedConsumedUp() {
return currSequenceId - 1 == lastSequenceId;
}
public TEndPoint getRemoteEndpoint() {
return remoteEndpoint;
}
public TFragmentInstanceId getRemoteFragmentInstanceId() {
return remoteFragmentInstanceId.deepCopy();
}
public TFragmentInstanceId getLocalFragmentInstanceId() {
return localFragmentInstanceId;
}
public String getLocalPlanNodeId() {
return localPlanNodeId;
}
@Override
public long getBufferRetainedSizeInBytes() {
return bufferRetainedSizeInBytes;
}
@Override
public void setMaxBytesCanReserve(long maxBytesCanReserve) {
this.maxBytesCanReserve = Math.min(this.maxBytesCanReserve, maxBytesCanReserve);
}
@Override
public boolean isAborted() {
return aborted;
}
private void checkState() {
if (aborted) {
throw new IllegalStateException("Source handle is aborted.");
} else if (closed) {
throw new IllegalStateException("SourceHandle is closed.");
}
}
@Override
public String toString() {
return String.format(
"Query[%s]-[%s-%s-SourceHandle-%s]",
localFragmentInstanceId.getQueryId(),
localFragmentInstanceId.getFragmentId(),
fullFragmentInstanceId,
localPlanNodeId);
}
@TestOnly
public void setRetryIntervalInMs(long retryIntervalInMs) {
this.retryIntervalInMs = retryIntervalInMs;
}
/** Get data blocks from an upstream fragment instance. */
class GetDataBlocksTask implements Runnable {
private final int startSequenceId;
private final int endSequenceId;
private final long reservedBytes;
GetDataBlocksTask(int startSequenceId, int endSequenceId, long reservedBytes) {
Validate.isTrue(
startSequenceId >= 0,
"Start sequence ID should be greater than or equal to zero. Start sequence ID: "
+ startSequenceId);
this.startSequenceId = startSequenceId;
Validate.isTrue(
endSequenceId > startSequenceId,
"End sequence ID should be greater than the start sequence ID. Start sequence ID: "
+ startSequenceId
+ ", end sequence ID: "
+ endSequenceId);
this.endSequenceId = endSequenceId;
Validate.isTrue(reservedBytes > 0L, "Reserved bytes should be greater than zero.");
this.reservedBytes = reservedBytes;
}
@Override
@SuppressWarnings("squid:S3776")
public void run() {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"[StartPullTsBlocksFromRemote] {}-{} [{}, {}) ",
remoteFragmentInstanceId,
indexOfUpstreamSinkHandle,
startSequenceId,
endSequenceId);
}
TGetDataBlockRequest req =
new TGetDataBlockRequest(
remoteFragmentInstanceId,
startSequenceId,
endSequenceId,
indexOfUpstreamSinkHandle);
int attempt = 0;
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
long startTime = System.nanoTime();
try (SyncDataNodeMPPDataExchangeServiceClient client =
mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
TGetDataBlockResponse resp = client.getDataBlock(req);
int tsBlockNum = resp.getTsBlocks().size();
if (tsBlockNum == 0) {
if (!closed) {
// failed to pull TsBlocks
LOGGER.warn(
"{} failed to pull TsBlocks [{}] to [{}] from SinkHandle {}, channel index {},",
localFragmentInstanceId,
startSequenceId,
endSequenceId,
remoteFragmentInstanceId,
indexOfUpstreamSinkHandle);
}
return;
}
List<ByteBuffer> tsBlocks = new ArrayList<>(tsBlockNum);
tsBlocks.addAll(resp.getTsBlocks());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndPullTsBlocksFromRemote] Count:{}", tsBlockNum);
}
DATA_EXCHANGE_COUNT_METRIC_SET.recordDataBlockNum(
GET_DATA_BLOCK_NUM_CALLER, tsBlockNum);
executorService.submit(
new SendAcknowledgeDataBlockEventTask(startSequenceId, endSequenceId));
synchronized (SourceHandle.this) {
if (aborted || closed) {
return;
}
for (int i = startSequenceId; i < endSequenceId; i++) {
sequenceIdToTsBlock.put(i, tsBlocks.get(i - startSequenceId));
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[PutTsBlocksIntoBuffer]");
}
if (!blocked.isDone()) {
blocked.set(null);
}
}
break;
} catch (Throwable e) {
LOGGER.warn(
"failed to get data block [{}, {}), attempt times: {}",
startSequenceId,
endSequenceId,
attempt,
e);
// reach retry max times
if (attempt == MAX_ATTEMPT_TIMES) {
fail(e);
return;
}
// sleep some time before retrying
try {
Thread.sleep(retryIntervalInMs);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
// if interrupted during sleeping, fast fail and don't retry any more
fail(e);
}
} finally {
DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(
GET_DATA_BLOCK_TASK_CALLER, System.nanoTime() - startTime);
}
}
}
}
private void fail(Throwable t) {
synchronized (SourceHandle.this) {
if (aborted || closed) {
return;
}
bufferRetainedSizeInBytes -= reservedBytes;
localMemoryManager
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
reservedBytes);
sourceHandleListener.onFailure(SourceHandle.this, t);
}
}
}
class SendAcknowledgeDataBlockEventTask implements Runnable {
private final int startSequenceId;
private final int endSequenceId;
public SendAcknowledgeDataBlockEventTask(int startSequenceId, int endSequenceId) {
this.startSequenceId = startSequenceId;
this.endSequenceId = endSequenceId;
}
@Override
public void run() {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[SendACKTsBlock] [{}, {}).", startSequenceId, endSequenceId);
}
int attempt = 0;
TAcknowledgeDataBlockEvent acknowledgeDataBlockEvent =
new TAcknowledgeDataBlockEvent(
remoteFragmentInstanceId,
startSequenceId,
endSequenceId,
indexOfUpstreamSinkHandle);
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
long startTime = System.nanoTime();
try (SyncDataNodeMPPDataExchangeServiceClient client =
mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
client.onAcknowledgeDataBlockEvent(acknowledgeDataBlockEvent);
break;
} catch (Throwable e) {
LOGGER.warn(
"failed to send ack data block event [{}, {}), attempt times: {}",
startSequenceId,
endSequenceId,
attempt,
e);
if (attempt == MAX_ATTEMPT_TIMES) {
synchronized (SourceHandle.this) {
sourceHandleListener.onFailure(SourceHandle.this, e);
}
}
try {
Thread.sleep(retryIntervalInMs);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
synchronized (SourceHandle.this) {
sourceHandleListener.onFailure(SourceHandle.this, e);
}
}
} finally {
DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(
ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_CALLER, System.nanoTime() - startTime);
DATA_EXCHANGE_COUNT_METRIC_SET.recordDataBlockNum(
ON_ACKNOWLEDGE_DATA_BLOCK_NUM_CALLER, endSequenceId - startSequenceId);
}
}
}
}
}
class SendCloseSinkChannelEventTask implements Runnable {
@Override
public void run() {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"[SendCloseSinkChannelEvent] to [ShuffleSinkHandle: {}, index: {}]).",
remoteFragmentInstanceId,
indexOfUpstreamSinkHandle);
}
int attempt = 0;
TCloseSinkChannelEvent closeSinkChannelEvent =
new TCloseSinkChannelEvent(remoteFragmentInstanceId, indexOfUpstreamSinkHandle);
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
try (SyncDataNodeMPPDataExchangeServiceClient client =
mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
client.onCloseSinkChannelEvent(closeSinkChannelEvent);
break;
} catch (Throwable e) {
LOGGER.warn(
"[SendCloseSinkChannelEvent] to [ShuffleSinkHandle: {}, index: {}] failed.).",
remoteFragmentInstanceId,
indexOfUpstreamSinkHandle);
if (attempt == MAX_ATTEMPT_TIMES) {
synchronized (SourceHandle.this) {
sourceHandleListener.onFailure(SourceHandle.this, e);
}
}
try {
Thread.sleep(retryIntervalInMs);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
synchronized (SourceHandle.this) {
sourceHandleListener.onFailure(SourceHandle.this, e);
}
}
}
}
}
}
}
}