blob: 0920eefb9ce9fa7efa7a37b9e8ec9fd2123d8762 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.execution.exchange.sink;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.Validate;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.column.TsBlockSerde;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static org.apache.iotdb.db.queryengine.common.FragmentInstanceId.createFullId;
import static org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet.SEND_NEW_DATA_BLOCK_EVENT_TASK_CALLER;
import static org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_REMOTE;
import static org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet.SEND_NEW_DATA_BLOCK_NUM_CALLER;
public class SinkChannel implements ISinkChannel {
private static final Logger LOGGER = LoggerFactory.getLogger(SinkChannel.class);
public static final int MAX_ATTEMPT_TIMES = 3;
private static final long DEFAULT_RETRY_INTERVAL_IN_MS = 1000L;
private final TEndPoint remoteEndpoint;
private final TFragmentInstanceId remoteFragmentInstanceId;
private final String remotePlanNodeId;
private final String localPlanNodeId;
private final TFragmentInstanceId localFragmentInstanceId;
private final String fullFragmentInstanceId;
private final LocalMemoryManager localMemoryManager;
private final ExecutorService executorService;
private final TsBlockSerde serde;
private final SinkListener sinkListener;
private final String threadName;
private long retryIntervalInMs;
private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
// Use LinkedHashMap to meet 2 needs,
// 1. Predictable iteration order so that removing buffered TsBlocks can be efficient.
// 2. Fast lookup.
private final LinkedHashMap<Integer, Pair<TsBlock, Long>> sequenceIdToTsBlock =
new LinkedHashMap<>();
// size for current TsBlock to reserve and free
private long currentTsBlockSize;
private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager;
@SuppressWarnings("squid:S3077")
private volatile ListenableFuture<Void> blocked;
private int nextSequenceId = 0;
/** The actual buffered memory in bytes, including the amount of memory being reserved. */
private long bufferRetainedSizeInBytes;
private boolean aborted = false;
private boolean closed = false;
private boolean noMoreTsBlocks = false;
private final AtomicBoolean invokedOnFinished = new AtomicBoolean(false);
/** max bytes this SinkChannel can reserve. */
private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET =
DataExchangeCostMetricSet.getInstance();
private static final DataExchangeCountMetricSet DATA_EXCHANGE_COUNT_METRIC_SET =
DataExchangeCountMetricSet.getInstance();
@SuppressWarnings("squid:S107")
public SinkChannel(
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
String remotePlanNodeId,
String localPlanNodeId,
TFragmentInstanceId localFragmentInstanceId,
LocalMemoryManager localMemoryManager,
ExecutorService executorService,
TsBlockSerde serde,
SinkListener sinkListener,
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager) {
this.remoteEndpoint = Validate.notNull(remoteEndpoint, "remoteEndPoint can not be null.");
this.remoteFragmentInstanceId =
Validate.notNull(remoteFragmentInstanceId, "remoteFragmentInstanceId can not be null.");
this.remotePlanNodeId = Validate.notNull(remotePlanNodeId, "remotePlanNodeId can not be null.");
this.localPlanNodeId = Validate.notNull(localPlanNodeId, "localPlanNodeId can not be null.");
this.localFragmentInstanceId =
Validate.notNull(localFragmentInstanceId, "localFragmentInstanceId can not be null.");
this.fullFragmentInstanceId =
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
this.localMemoryManager =
Validate.notNull(localMemoryManager, "localMemoryManager can not be null.");
this.executorService = Validate.notNull(executorService, "executorService can not be null.");
this.serde = Validate.notNull(serde, "serde can not be null.");
this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not be null.");
this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager;
this.retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
this.threadName =
createFullId(
localFragmentInstanceId.queryId,
localFragmentInstanceId.fragmentId,
localFragmentInstanceId.instanceId);
this.bufferRetainedSizeInBytes = 0;
this.currentTsBlockSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
localMemoryManager
.getQueryPool()
.registerPlanNodeIdToQueryMemoryMap(
localFragmentInstanceId.queryId, fullFragmentInstanceId, localPlanNodeId);
}
@Override
public synchronized ListenableFuture<?> isFull() {
checkState();
// blocked could be null if this channel is closed before it is opened by ShuffleSinkHandle
// return immediateVoidFuture() to avoid NPE
if (closed) {
return immediateVoidFuture();
}
return nonCancellationPropagating(blocked);
}
private void submitSendNewDataBlockEventTask(int startSequenceId, List<Long> blockSizes) {
executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
}
@Override
public synchronized void send(TsBlock tsBlock) {
long startTime = System.nanoTime();
try {
Validate.notNull(tsBlock, "tsBlocks is null");
if (closed) {
// SinkChannel may have been closed by its downstream SourceHandle
return;
}
checkState();
if (!blocked.isDone()) {
throw new IllegalStateException("Sink handle is blocked.");
}
if (noMoreTsBlocks) {
return;
}
long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
int startSequenceId;
startSequenceId = nextSequenceId;
blocked =
localMemoryManager
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
retainedSizeInBytes,
maxBytesCanReserve)
.left;
bufferRetainedSizeInBytes += retainedSizeInBytes;
sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, currentTsBlockSize));
nextSequenceId += 1;
currentTsBlockSize = retainedSizeInBytes;
// TODO: consider merge multiple NewDataBlockEvent for less network traffic.
submitSendNewDataBlockEventTask(startSequenceId, ImmutableList.of(retainedSizeInBytes));
} finally {
DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(
SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - startTime);
}
}
@Override
public synchronized void setNoMoreTsBlocks() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartSetNoMoreTsBlocks]");
}
if (aborted || closed) {
return;
}
executorService.submit(new SendEndOfDataBlockEventTask());
}
@Override
public synchronized void abort() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartAbortSinkChannel]");
}
if (aborted || closed) {
return;
}
sequenceIdToTsBlock.clear();
if (blocked != null) {
bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked);
}
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
sinkListener.onAborted(this);
aborted = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndAbortSinkChannel]");
}
}
@Override
public synchronized void close() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartCloseSinkChannel]");
}
if (closed || aborted) {
return;
}
sequenceIdToTsBlock.clear();
if (blocked != null) {
bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blocked);
}
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
invokeOnFinished();
closed = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndCloseSinkChannel]");
}
}
private void invokeOnFinished() {
if (invokedOnFinished.compareAndSet(false, true)) {
sinkListener.onFinish(this);
}
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public synchronized boolean isAborted() {
return aborted;
}
@Override
public synchronized boolean isFinished() {
return noMoreTsBlocks && sequenceIdToTsBlock.isEmpty();
}
@Override
public synchronized long getBufferRetainedSizeInBytes() {
return bufferRetainedSizeInBytes;
}
public ByteBuffer getSerializedTsBlock(int partition, int sequenceId) {
throw new UnsupportedOperationException();
}
public synchronized ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
if (aborted || closed) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"SinkChannel still receive getting TsBlock request after being aborted={} or closed={}",
aborted,
closed);
}
throw new GetTsBlockFromClosedOrAbortedChannelException("SinkChannel is aborted or closed. ");
}
Pair<TsBlock, Long> pair = sequenceIdToTsBlock.get(sequenceId);
if (pair == null || pair.left == null) {
LOGGER.warn(
"The TsBlock doesn't exist. Sequence ID is {}, remaining map is {}",
sequenceId,
sequenceIdToTsBlock.entrySet());
throw new IllegalStateException("The data block doesn't exist. Sequence ID: " + sequenceId);
}
return serde.serialize(pair.left);
}
public void acknowledgeTsBlock(int startSequenceId, int endSequenceId) {
long freedBytes = 0L;
synchronized (this) {
if (aborted || closed) {
return;
}
Iterator<Entry<Integer, Pair<TsBlock, Long>>> iterator =
sequenceIdToTsBlock.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Integer, Pair<TsBlock, Long>> entry = iterator.next();
if (entry.getKey() < startSequenceId) {
continue;
}
if (entry.getKey() >= endSequenceId) {
break;
}
freedBytes += entry.getValue().right;
bufferRetainedSizeInBytes -= entry.getValue().right;
iterator.remove();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[ACKTsBlock] {}.", entry.getKey());
}
}
// there may exist duplicate ack message in network caused by caller retrying, if so duplicate
// ack message's freedBytes may be zero
if (freedBytes > 0) {
localMemoryManager
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
freedBytes);
}
}
if (isFinished()) {
invokeOnFinished();
}
}
@Override
public TFragmentInstanceId getLocalFragmentInstanceId() {
return localFragmentInstanceId;
}
@Override
public void setMaxBytesCanReserve(long maxBytesCanReserve) {
this.maxBytesCanReserve = Math.min(this.maxBytesCanReserve, maxBytesCanReserve);
}
@Override
public String toString() {
return String.format(
"Query[%s]-[%s-%s-SinkChannel]:",
localFragmentInstanceId.queryId,
localFragmentInstanceId.fragmentId,
localFragmentInstanceId.instanceId);
}
private void checkState() {
if (aborted) {
throw new IllegalStateException("SinkChannel is aborted.");
}
}
// region ============ ISinkChannel related ============
@Override
public synchronized void open() {
if (aborted || closed) {
return;
}
// SinkChannel is opened when ShuffleSinkHandle choose it as the next channel
this.blocked =
localMemoryManager
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
maxBytesCanReserve) // actually we only know maxBytesCanReserve after
// the handle is created, so we use DEFAULT here. It is ok to use DEFAULT here because
// at first this SinkChannel has not reserved memory.
.left;
this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
}
@Override
public boolean isNoMoreTsBlocks() {
return noMoreTsBlocks;
}
@Override
public int getNumOfBufferedTsBlocks() {
return sequenceIdToTsBlock.size();
}
// endregion
// region ============ TestOnly ============
@TestOnly
public void setRetryIntervalInMs(long retryIntervalInMs) {
this.retryIntervalInMs = retryIntervalInMs;
}
// endregion
// region ============ inner class ============
/**
* Send a {@link org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent} to downstream fragment
* instance.
*/
class SendNewDataBlockEventTask implements Runnable {
private final int startSequenceId;
private final List<Long> blockSizes;
SendNewDataBlockEventTask(int startSequenceId, List<Long> blockSizes) {
Validate.isTrue(
startSequenceId >= 0,
"Start sequence ID should be greater than or equal to zero, but was: "
+ startSequenceId
+ ".");
this.startSequenceId = startSequenceId;
this.blockSizes = Validate.notNull(blockSizes);
}
@Override
public void run() {
try (SetThreadName sinkChannelName = new SetThreadName(threadName)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"[NotifyNewTsBlock] [{}, {}) to {}.{}",
startSequenceId,
startSequenceId + blockSizes.size(),
remoteFragmentInstanceId,
remotePlanNodeId);
}
int attempt = 0;
TNewDataBlockEvent newDataBlockEvent =
new TNewDataBlockEvent(
remoteFragmentInstanceId,
remotePlanNodeId,
localFragmentInstanceId,
startSequenceId,
blockSizes);
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
long startTime = System.nanoTime();
try (SyncDataNodeMPPDataExchangeServiceClient client =
mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
client.onNewDataBlockEvent(newDataBlockEvent);
break;
} catch (Exception e) {
LOGGER.warn("Failed to send new data block event, attempt times: {}", attempt, e);
if (attempt == MAX_ATTEMPT_TIMES) {
sinkListener.onFailure(SinkChannel.this, e);
}
try {
Thread.sleep(retryIntervalInMs);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
sinkListener.onFailure(SinkChannel.this, e);
}
} finally {
DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(
SEND_NEW_DATA_BLOCK_EVENT_TASK_CALLER, System.nanoTime() - startTime);
DATA_EXCHANGE_COUNT_METRIC_SET.recordDataBlockNum(
SEND_NEW_DATA_BLOCK_NUM_CALLER, blockSizes.size());
}
}
}
}
}
/**
* Send a {@link org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent} to downstream fragment
* instance.
*/
class SendEndOfDataBlockEventTask implements Runnable {
@Override
public void run() {
try (SetThreadName sinkChannelName = new SetThreadName(threadName)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[NotifyNoMoreTsBlock]");
}
int attempt = 0;
TEndOfDataBlockEvent endOfDataBlockEvent =
new TEndOfDataBlockEvent(
remoteFragmentInstanceId,
remotePlanNodeId,
localFragmentInstanceId,
nextSequenceId - 1);
while (attempt < MAX_ATTEMPT_TIMES) {
attempt += 1;
try (SyncDataNodeMPPDataExchangeServiceClient client =
mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
client.onEndOfDataBlockEvent(endOfDataBlockEvent);
break;
} catch (Exception e) {
LOGGER.warn("Failed to send end of data block event, attempt times: {}", attempt, e);
if (attempt == MAX_ATTEMPT_TIMES) {
LOGGER.warn("Failed to send end of data block event after all retry", e);
sinkListener.onFailure(SinkChannel.this, e);
return;
}
try {
Thread.sleep(retryIntervalInMs);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
sinkListener.onFailure(SinkChannel.this, e);
}
}
}
noMoreTsBlocks = true;
if (isFinished()) {
invokeOnFinished();
}
sinkListener.onEndOfBlocks(SinkChannel.this);
}
}
}
// endregion
}