blob: b9bf277b68bb54e11ef60a3bbeb07861318b9132 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.execution.exchange.sink;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_REMOTE;
public class ShuffleSinkHandle implements ISinkHandle {
private static final Logger LOGGER = LoggerFactory.getLogger(ShuffleSinkHandle.class);
/** Each ISinkChannel in the list matches one downstream ISourceHandle. */
private final List<ISinkChannel> downStreamChannelList;
private final boolean[] hasSetNoMoreTsBlocks;
private final boolean[] channelOpened;
private final DownStreamChannelIndex downStreamChannelIndex;
private final int channelNum;
private final ShuffleStrategy shuffleStrategy;
private final TFragmentInstanceId localFragmentInstanceId;
private final MPPDataExchangeManager.SinkListener sinkListener;
private volatile boolean aborted = false;
private volatile boolean closed = false;
private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET =
DataExchangeCostMetricSet.getInstance();
private final Lock lock = new ReentrantLock();
/** max bytes this ShuffleSinkHandle can reserve. */
private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
public ShuffleSinkHandle(
TFragmentInstanceId localFragmentInstanceId,
List<ISinkChannel> downStreamChannelList,
DownStreamChannelIndex downStreamChannelIndex,
ShuffleStrategyEnum shuffleStrategyEnum,
MPPDataExchangeManager.SinkListener sinkListener) {
this.localFragmentInstanceId =
Validate.notNull(localFragmentInstanceId, "localFragmentInstanceId can not be null.");
this.downStreamChannelList =
Validate.notNull(downStreamChannelList, "downStreamChannelList can not be null.");
this.downStreamChannelIndex =
Validate.notNull(downStreamChannelIndex, "downStreamChannelIndex can not be null.");
this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not be null.");
this.channelNum = downStreamChannelList.size();
this.shuffleStrategy = getShuffleStrategy(shuffleStrategyEnum);
this.hasSetNoMoreTsBlocks = new boolean[channelNum];
this.channelOpened = new boolean[channelNum];
}
@Override
public TFragmentInstanceId getLocalFragmentInstanceId() {
return localFragmentInstanceId;
}
public ISinkChannel getChannel(int index) {
return downStreamChannelList.get(index);
}
@Override
public synchronized ListenableFuture<?> isFull() {
int currentIndex = downStreamChannelIndex.getCurrentIndex();
// try open channel
tryOpenChannel(currentIndex);
// It is safe to use currentChannel.isFull() to judge whether we can send a TsBlock only when
// downStreamChannelIndex will not be changed between we call isFull() and send() of
// ShuffleSinkHandle
return downStreamChannelList.get(currentIndex).isFull();
}
@Override
public synchronized void send(TsBlock tsBlock) {
long startTime = System.nanoTime();
try {
checkState();
if (closed) {
return;
}
ISinkChannel currentChannel =
downStreamChannelList.get(downStreamChannelIndex.getCurrentIndex());
currentChannel.send(tsBlock);
} finally {
switchChannelIfNecessary();
DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(
SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - startTime);
}
}
@Override
public void setNoMoreTsBlocks() {
if (closed || aborted) {
return;
}
try {
lock.lock();
for (int i = 0; i < downStreamChannelList.size(); i++) {
if (!hasSetNoMoreTsBlocks[i]) {
downStreamChannelList.get(i).setNoMoreTsBlocks();
hasSetNoMoreTsBlocks[i] = true;
}
}
sinkListener.onEndOfBlocks(this);
} finally {
lock.unlock();
}
}
@Override
public void setNoMoreTsBlocksOfOneChannel(int channelIndex) {
if (closed || aborted) {
// if this ShuffleSinkHandle has been closed, Driver.close() will attempt to setNoMoreTsBlocks
// for all the channels
return;
}
try {
lock.lock();
if (!hasSetNoMoreTsBlocks[channelIndex]) {
downStreamChannelList.get(channelIndex).setNoMoreTsBlocks();
hasSetNoMoreTsBlocks[channelIndex] = true;
}
} finally {
lock.unlock();
}
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public synchronized boolean isAborted() {
return aborted;
}
@Override
public synchronized boolean isFinished() {
for (ISink channel : downStreamChannelList) {
if (!channel.isFinished()) {
return false;
}
}
return true;
}
@Override
public void abort() {
if (aborted || closed) {
return;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartAbortShuffleSinkHandle]");
}
boolean meetError = false;
Exception firstException = null;
for (ISink channel : downStreamChannelList) {
try {
channel.abort();
} catch (Exception e) {
if (!meetError) {
firstException = e;
meetError = true;
}
}
}
if (meetError) {
LOGGER.warn("Error occurred when try to abort channel.", firstException);
}
sinkListener.onAborted(this);
aborted = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndAbortShuffleSinkHandle]");
}
}
// Add synchronized on this method may lead to Dead Lock
// It is possible that when LocalSinkChannel revokes this close method and try to get Lock
// ShuffleSinkHandle while synchronized methods of ShuffleSinkHandle
// Lock ShuffleSinkHandle and wait to lock LocalSinkChannel
@Override
public void close() {
if (closed || aborted) {
return;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartCloseShuffleSinkHandle]");
}
boolean meetError = false;
Exception firstException = null;
for (ISink channel : downStreamChannelList) {
try {
channel.close();
} catch (Exception e) {
if (!meetError) {
firstException = e;
meetError = true;
}
}
}
if (meetError) {
LOGGER.warn("Error occurred when try to close channel.", firstException);
}
sinkListener.onFinish(this);
closed = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndCloseShuffleSinkHandle]");
}
}
@Override
public void setMaxBytesCanReserve(long maxBytesCanReserve) {
this.maxBytesCanReserve = maxBytesCanReserve;
downStreamChannelList.forEach(
sinkHandle -> sinkHandle.setMaxBytesCanReserve(maxBytesCanReserve));
}
private void checkState() {
if (aborted) {
throw new IllegalStateException("ShuffleSinkHandle is aborted.");
}
}
private void switchChannelIfNecessary() {
shuffleStrategy.shuffle();
}
public void tryOpenChannel(int channelIndex) {
if (!channelOpened[channelIndex]) {
downStreamChannelList.get(channelIndex).open();
channelOpened[channelIndex] = true;
}
}
@Override
public boolean isChannelClosed(int index) {
return downStreamChannelList.get(index).isClosed();
}
// region ============ Shuffle Related ============
public enum ShuffleStrategyEnum {
PLAIN,
SIMPLE_ROUND_ROBIN,
}
@FunctionalInterface
interface ShuffleStrategy {
/**
* SinkHandle may have multiple channels. we need to choose the next channel each time we send a
* TsBlock.
*/
void shuffle();
}
class PlainShuffleStrategy implements ShuffleStrategy {
@Override
public void shuffle() {
// do nothing
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"PlainShuffleStrategy needs to do nothing, current channel index is {}",
downStreamChannelIndex.getCurrentIndex());
}
}
}
class SimpleRoundRobinStrategy implements ShuffleStrategy {
private final long channelMemoryThreshold = maxBytesCanReserve / channelNum * 3;
@Override
public void shuffle() {
int currentIndex = downStreamChannelIndex.getCurrentIndex();
for (int i = 1; i < channelNum; i++) {
int nextIndex = (currentIndex + i) % channelNum;
if (satisfy(nextIndex)) {
downStreamChannelIndex.setCurrentIndex(nextIndex);
return;
}
}
}
private boolean satisfy(int channelIndex) {
// downStreamChannel is always an ISinkChannel
ISinkChannel channel = downStreamChannelList.get(channelIndex);
if (channel.isNoMoreTsBlocks() || channel.isClosed()) {
return false;
}
return channel.getBufferRetainedSizeInBytes() <= channelMemoryThreshold
&& channel.getNumOfBufferedTsBlocks() < 3;
}
}
private ShuffleStrategy getShuffleStrategy(ShuffleStrategyEnum strategyEnum) {
switch (strategyEnum) {
case PLAIN:
return new PlainShuffleStrategy();
case SIMPLE_ROUND_ROBIN:
return new SimpleRoundRobinStrategy();
default:
throw new UnsupportedOperationException("Unsupported type of shuffle strategy");
}
}
// endregion
@Override
public long getBufferRetainedSizeInBytes() {
return downStreamChannelList.stream()
.map(ISink::getBufferRetainedSizeInBytes)
.reduce(Long::sum)
.orElse(0L);
}
}