blob: 7c6b56fd5ffcab54aeb6f3b8556624b436ee9293 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.execution.exchange.source;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.queryengine.execution.exchange.SharedTsBlockQueue;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
import static org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
import static org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet.SOURCE_HANDLE_DESERIALIZE_TSBLOCK_LOCAL;
import static org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet.SOURCE_HANDLE_GET_TSBLOCK_LOCAL;
public class LocalSourceHandle implements ISourceHandle {
private static final Logger LOGGER = LoggerFactory.getLogger(LocalSourceHandle.class);
private TFragmentInstanceId localFragmentInstanceId;
private String localPlanNodeId;
private final SourceHandleListener sourceHandleListener;
protected final SharedTsBlockQueue queue;
private boolean aborted = false;
private boolean closed = false;
private int currSequenceId;
private final String threadName;
private static final TsBlockSerde serde = new TsBlockSerde();
private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET =
DataExchangeCostMetricSet.getInstance();
// For pipeline
public LocalSourceHandle(
SharedTsBlockQueue queue, SourceHandleListener sourceHandleListener, String threadName) {
this.queue = Validate.notNull(queue, "queue can not be null.");
this.queue.setSourceHandle(this);
this.sourceHandleListener =
Validate.notNull(sourceHandleListener, "sourceHandleListener can not be null.");
this.threadName = threadName;
}
// For fragment
public LocalSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
SharedTsBlockQueue queue,
SourceHandleListener sourceHandleListener) {
this.localFragmentInstanceId =
Validate.notNull(localFragmentInstanceId, "localFragmentInstanceId can not be null.");
this.localPlanNodeId = Validate.notNull(localPlanNodeId, "localPlanNodeId can not be null.");
this.queue = Validate.notNull(queue, "queue can not be null.");
this.queue.setSourceHandle(this);
this.sourceHandleListener =
Validate.notNull(sourceHandleListener, "sourceHandleListener can not be null.");
this.threadName = createFullIdFrom(localFragmentInstanceId, localPlanNodeId);
}
@Override
public TFragmentInstanceId getLocalFragmentInstanceId() {
return localFragmentInstanceId;
}
@Override
public String getLocalPlanNodeId() {
return localPlanNodeId;
}
@Override
public long getBufferRetainedSizeInBytes() {
return queue.getBufferRetainedSizeInBytes();
}
@Override
public TsBlock receive() {
long startTime = System.nanoTime();
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
checkState();
if (!queue.isBlocked().isDone()) {
throw new IllegalStateException("Source handle is blocked.");
}
TsBlock tsBlock;
synchronized (queue) {
tsBlock = queue.remove();
}
if (tsBlock != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"[GetTsBlockFromQueue] TsBlock:{} size:{}",
currSequenceId,
tsBlock.getRetainedSizeInBytes());
}
currSequenceId++;
}
checkAndInvokeOnFinished();
return tsBlock;
} finally {
DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(
SOURCE_HANDLE_GET_TSBLOCK_LOCAL, System.nanoTime() - startTime);
}
}
@Override
public ByteBuffer getSerializedTsBlock() throws IoTDBException {
TsBlock tsBlock = receive();
if (tsBlock != null) {
long startTime = System.nanoTime();
try {
return serde.serialize(tsBlock);
} catch (Exception e) {
throw new IoTDBException(e, TSStatusCode.TSBLOCK_SERIALIZE_ERROR.getStatusCode());
} finally {
DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(
SOURCE_HANDLE_DESERIALIZE_TSBLOCK_LOCAL, System.nanoTime() - startTime);
}
} else {
return null;
}
}
@Override
public boolean isFinished() {
synchronized (queue) {
return queue.hasNoMoreTsBlocks() && queue.isEmpty();
}
}
public void checkAndInvokeOnFinished() {
synchronized (queue) {
if (isFinished()) {
// Putting synchronized here rather than marking in method is to avoid deadlock.
// There are two locks need to invoke this method. One is lock of SharedTsBlockQueue,
// the other is lock of LocalSourceHandle.
synchronized (this) {
sourceHandleListener.onFinished(this);
}
}
}
}
@Override
public ListenableFuture<?> isBlocked() {
checkState();
return nonCancellationPropagating(queue.isBlocked());
}
@Override
public boolean isAborted() {
return aborted;
}
@Override
public void abort() {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartAbortLocalSourceHandle]");
}
synchronized (queue) {
synchronized (this) {
if (aborted || closed) {
return;
}
queue.abort();
aborted = true;
sourceHandleListener.onAborted(this);
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndAbortLocalSourceHandle]");
}
}
}
@Override
public void abort(Throwable t) {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartAbortLocalSourceHandle]");
}
synchronized (queue) {
synchronized (this) {
if (aborted || closed) {
return;
}
queue.abort(t);
aborted = true;
sourceHandleListener.onAborted(this);
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndAbortLocalSourceHandle]");
}
}
}
@Override
public void close() {
try (SetThreadName sourceHandleName = new SetThreadName(threadName)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[StartCloseLocalSourceHandle]");
}
synchronized (queue) {
synchronized (this) {
if (aborted || closed) {
return;
}
queue.close();
closed = true;
sourceHandleListener.onFinished(this);
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[EndCloseLocalSourceHandle]");
}
}
}
private void checkState() {
if (aborted) {
throw new IllegalStateException("Source handle is aborted.");
} else if (closed) {
throw new IllegalStateException("Source Handle is closed.");
}
}
public SharedTsBlockQueue getSharedTsBlockQueue() {
return queue;
}
@Override
public void setMaxBytesCanReserve(long maxBytesCanReserve) {
// do nothing, the maxBytesCanReserve of SharedTsBlockQueue should be set by corresponding
// LocalSinkChannel
}
}