blob: 2fe94bbeb5139530825eb587dbda2db4c8ecabfd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.execution.exchange.sink;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener;
import org.apache.iotdb.db.queryengine.execution.exchange.SharedTsBlockQueue;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_LOCAL;
public class LocalSinkChannel implements ISinkChannel {
private static final Logger LOGGER = LoggerFactory.getLogger(LocalSinkChannel.class);
private TFragmentInstanceId localFragmentInstanceId;
private final SinkListener sinkListener;
private final SharedTsBlockQueue queue;
@SuppressWarnings("squid:S3077")
private volatile ListenableFuture<Void> blocked;
private boolean aborted = false;
private boolean closed = false;
private boolean invokedOnFinished = false;
private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET =
DataExchangeCostMetricSet.getInstance();
public LocalSinkChannel(SharedTsBlockQueue queue, SinkListener sinkListener) {
this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not be null.");
this.queue = Validate.notNull(queue, "queue can not be null.");
this.queue.setSinkChannel(this);
blocked = queue.getCanAddTsBlock();
}
public LocalSinkChannel(
TFragmentInstanceId localFragmentInstanceId,
SharedTsBlockQueue queue,
SinkListener sinkListener) {
this.localFragmentInstanceId =
Validate.notNull(localFragmentInstanceId, "localFragmentInstanceId can not be null.");
this.sinkListener = Validate.notNull(sinkListener, "sinkListener can not be null.");
this.queue = Validate.notNull(queue, "queue can not be null.");
this.queue.setSinkChannel(this);
// SinkChannel can send data after SourceHandle asks it to
blocked = queue.getCanAddTsBlock();
}
@Override
public TFragmentInstanceId getLocalFragmentInstanceId() {
return localFragmentInstanceId;
}
@Override
public long getBufferRetainedSizeInBytes() {
synchronized (queue) {
return queue.getBufferRetainedSizeInBytes();
}
}
@Override
public synchronized ListenableFuture<?> isFull() {
checkState();
if (closed) {
return immediateVoidFuture();
}
return nonCancellationPropagating(blocked);
}
@Override
public boolean isAborted() {
return aborted;
}
@Override
public boolean isFinished() {
synchronized (queue) {
return queue.hasNoMoreTsBlocks() && queue.isEmpty();
}
}
public void checkAndInvokeOnFinished() {
synchronized (queue) {
if (isFinished()) {
synchronized (this) {
if (!invokedOnFinished) {
sinkListener.onFinish(this);
invokedOnFinished = true;
}
}
}
}
}
@Override
public void send(TsBlock tsBlock) {
long startTime = System.nanoTime();
try {
Validate.notNull(tsBlock, "tsBlocks is null");
synchronized (this) {
checkState();
if (closed) {
return;
}
if (!blocked.isDone()) {
throw new IllegalStateException("Sink handle is blocked.");
}
}
synchronized (queue) {
if (queue.hasNoMoreTsBlocks()) {
return;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartSendTsBlockOnLocal]");
}
synchronized (this) {
blocked = queue.add(tsBlock);
}
}
} finally {
DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(
SINK_HANDLE_SEND_TSBLOCK_LOCAL, System.nanoTime() - startTime);
}
}
@Override
public void setNoMoreTsBlocks() {
synchronized (queue) {
synchronized (this) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartSetNoMoreTsBlocksOnLocal]");
}
if (aborted || closed) {
return;
}
queue.setNoMoreTsBlocks(true);
sinkListener.onEndOfBlocks(this);
}
}
checkAndInvokeOnFinished();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndSetNoMoreTsBlocksOnLocal]");
}
}
@Override
public void abort() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartAbortLocalSinkChannel]");
}
synchronized (queue) {
synchronized (this) {
if (aborted || closed) {
return;
}
aborted = true;
Optional<Throwable> t = sinkListener.onAborted(this);
if (t.isPresent()) {
queue.abort(t.get());
} else {
queue.abort();
}
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndAbortLocalSinkChannel]");
}
}
@Override
public void close() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartCloseLocalSinkChannel]");
}
synchronized (queue) {
synchronized (this) {
if (aborted || closed) {
return;
}
closed = true;
queue.close();
if (!invokedOnFinished) {
sinkListener.onFinish(this);
invokedOnFinished = true;
}
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndCloseLocalSinkChannel]");
}
}
public SharedTsBlockQueue getSharedTsBlockQueue() {
return queue;
}
private void checkState() {
if (aborted) {
throw new IllegalStateException("LocalSinkChannel is aborted.");
}
}
@Override
public void setMaxBytesCanReserve(long maxBytesCanReserve) {
if (maxBytesCanReserve < queue.getMaxBytesCanReserve()) {
queue.setMaxBytesCanReserve(maxBytesCanReserve);
}
}
// region ============ ISinkChannel related ============
@Override
public void open() {
// do nothing
}
@Override
public boolean isNoMoreTsBlocks() {
synchronized (queue) {
return queue.hasNoMoreTsBlocks();
}
}
@Override
public int getNumOfBufferedTsBlocks() {
synchronized (queue) {
return queue.getNumOfBufferedTsBlocks();
}
}
@Override
public boolean isClosed() {
synchronized (queue) {
return queue.isClosed();
}
}
// end region
}