blob: c841db1386b7b35144e7254b76eaf1b9222b0d4b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.process;
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.commons.udf.service.UDFManagementService;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.NodeRef;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.queryengine.transformation.api.LayerPointReader;
import org.apache.iotdb.db.queryengine.transformation.api.YieldableState;
import org.apache.iotdb.db.queryengine.transformation.dag.builder.EvaluationDAGBuilder;
import org.apache.iotdb.db.queryengine.transformation.dag.input.QueryDataSetInputLayer;
import org.apache.iotdb.db.queryengine.transformation.dag.input.TsBlockInputDataSet;
import org.apache.iotdb.db.queryengine.transformation.dag.udf.UDTFContext;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class TransformOperator implements ProcessOperator {
protected final float udfReaderMemoryBudgetInMB =
IoTDBDescriptor.getInstance().getConfig().getUdfReaderMemoryBudgetInMB();
protected final float udfTransformerMemoryBudgetInMB =
IoTDBDescriptor.getInstance().getConfig().getUdfTransformerMemoryBudgetInMB();
protected final float udfCollectorMemoryBudgetInMB =
IoTDBDescriptor.getInstance().getConfig().getUdfCollectorMemoryBudgetInMB();
protected final OperatorContext operatorContext;
protected final Operator inputOperator;
protected final boolean keepNull;
protected QueryDataSetInputLayer inputLayer;
protected UDTFContext udtfContext;
protected LayerPointReader[] transformers;
protected List<TSDataType> outputDataTypes;
protected TimeSelector timeHeap;
protected boolean[] shouldIterateReadersToNextValid;
private final String udtfQueryId;
@SuppressWarnings("squid:S107")
public TransformOperator(
OperatorContext operatorContext,
Operator inputOperator,
List<TSDataType> inputDataTypes,
Map<String, List<InputLocation>> inputLocations,
Expression[] outputExpressions,
boolean keepNull,
ZoneId zoneId,
Map<NodeRef<Expression>, TSDataType> expressionTypes,
boolean isAscending)
throws QueryProcessException {
this.operatorContext = operatorContext;
this.inputOperator = inputOperator;
this.keepNull = keepNull;
// use DriverTaskID().getFullId() to ensure that udtfQueryId for each TransformOperator is
// unique
this.udtfQueryId = operatorContext.getDriverContext().getDriverTaskID().getFullId();
initInputLayer(inputDataTypes);
initUdtfContext(outputExpressions, zoneId);
initTransformers(inputLocations, outputExpressions, expressionTypes);
timeHeap = new TimeSelector(transformers.length << 1, isAscending);
shouldIterateReadersToNextValid = new boolean[outputExpressions.length];
Arrays.fill(shouldIterateReadersToNextValid, true);
}
private void initInputLayer(List<TSDataType> inputDataTypes) throws QueryProcessException {
inputLayer =
new QueryDataSetInputLayer(
udtfQueryId,
udfReaderMemoryBudgetInMB,
new TsBlockInputDataSet(inputOperator, inputDataTypes));
}
private void initUdtfContext(Expression[] outputExpressions, ZoneId zoneId) {
udtfContext = new UDTFContext(zoneId);
udtfContext.constructUdfExecutors(outputExpressions);
}
protected void initTransformers(
Map<String, List<InputLocation>> inputLocations,
Expression[] outputExpressions,
Map<NodeRef<Expression>, TSDataType> expressionTypes) {
UDFManagementService.getInstance().acquireLock();
try {
// This statement must be surrounded by the registration lock.
UDFClassLoaderManager.getInstance().initializeUDFQuery(udtfQueryId);
// UDF executors will be initialized at the same time
transformers =
new EvaluationDAGBuilder(
udtfQueryId,
inputLayer,
inputLocations,
outputExpressions,
expressionTypes,
udtfContext,
udfTransformerMemoryBudgetInMB + udfCollectorMemoryBudgetInMB)
.buildLayerMemoryAssigner()
.bindInputLayerColumnIndexWithExpression()
.buildResultColumnPointReaders()
.getOutputPointReaders();
} finally {
UDFManagementService.getInstance().releaseLock();
}
}
protected YieldableState iterateAllColumnsToNextValid() throws Exception {
for (int i = 0, n = shouldIterateReadersToNextValid.length; i < n; ++i) {
if (shouldIterateReadersToNextValid[i]) {
final YieldableState yieldableState = iterateReaderToNextValid(transformers[i]);
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
}
shouldIterateReadersToNextValid[i] = false;
}
}
return YieldableState.YIELDABLE;
}
@SuppressWarnings("squid:S135")
protected YieldableState iterateReaderToNextValid(LayerPointReader reader) throws Exception {
// Since a constant operand is not allowed to be a result column, the reader will not be
// a ConstantLayerPointReader.
// If keepNull is false, we must iterate the reader until a non-null row is returned.
YieldableState yieldableState;
while ((yieldableState = reader.yield()) == YieldableState.YIELDABLE) {
if (reader.isCurrentNull() && !keepNull) {
reader.readyForNext();
continue;
}
timeHeap.add(reader.currentTime());
break;
}
return yieldableState;
}
@SuppressWarnings("squid:S112")
@Override
public final boolean hasNext() throws Exception {
if (!timeHeap.isEmpty()) {
return true;
}
try {
if (iterateAllColumnsToNextValid() == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
return true;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return !timeHeap.isEmpty();
}
@SuppressWarnings("squid:S112")
@Override
public TsBlock next() throws Exception {
try {
YieldableState yieldableState = iterateAllColumnsToNextValid();
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
return null;
}
final TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
prepareTsBlockBuilder(tsBlockBuilder);
final TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
final ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
final int columnCount = columnBuilders.length;
int rowCount = 0;
while (!timeHeap.isEmpty()) {
final long currentTime = timeHeap.pollFirst();
// time
timeBuilder.writeLong(currentTime);
// values
for (int i = 0; i < columnCount; ++i) {
yieldableState = collectDataPoint(transformers[i], columnBuilders[i], currentTime, i);
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
for (int j = 0; j <= i; ++j) {
shouldIterateReadersToNextValid[j] = false;
}
timeHeap.add(currentTime);
tsBlockBuilder.declarePositions(rowCount);
return tsBlockBuilder.build();
}
}
prepareEachColumn(columnCount);
++rowCount;
yieldableState = iterateAllColumnsToNextValid();
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
tsBlockBuilder.declarePositions(rowCount);
return tsBlockBuilder.build();
}
inputLayer.updateRowRecordListEvictionUpperBound();
}
tsBlockBuilder.declarePositions(rowCount);
return tsBlockBuilder.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
protected void prepareTsBlockBuilder(TsBlockBuilder tsBlockBuilder) {
if (outputDataTypes == null) {
outputDataTypes = new ArrayList<>();
for (LayerPointReader reader : transformers) {
outputDataTypes.add(reader.getDataType());
}
}
tsBlockBuilder.buildValueColumnBuilders(outputDataTypes);
}
private void prepareEachColumn(int columnCount) {
for (int i = 0; i < columnCount; ++i) {
if (shouldIterateReadersToNextValid[i]) {
transformers[i].readyForNext();
}
}
}
protected boolean collectReaderAppendIsNull(LayerPointReader reader, long currentTime)
throws Exception {
final YieldableState yieldableState = reader.yield();
if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
return true;
}
if (yieldableState != YieldableState.YIELDABLE) {
return false;
}
if (reader.currentTime() != currentTime) {
return true;
}
return reader.isCurrentNull();
}
protected YieldableState collectDataPoint(
LayerPointReader reader, ColumnBuilder writer, long currentTime, int readerIndex)
throws Exception {
final YieldableState yieldableState = reader.yield();
if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
writer.appendNull();
return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}
if (yieldableState != YieldableState.YIELDABLE) {
return yieldableState;
}
if (reader.currentTime() != currentTime) {
writer.appendNull();
return YieldableState.YIELDABLE;
}
if (reader.isCurrentNull()) {
writer.appendNull();
} else {
TSDataType type = reader.getDataType();
switch (type) {
case INT32:
writer.writeInt(reader.currentInt());
break;
case INT64:
writer.writeLong(reader.currentLong());
break;
case FLOAT:
writer.writeFloat(reader.currentFloat());
break;
case DOUBLE:
writer.writeDouble(reader.currentDouble());
break;
case BOOLEAN:
writer.writeBoolean(reader.currentBoolean());
break;
case TEXT:
writer.writeBinary(reader.currentBinary());
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", type));
}
}
shouldIterateReadersToNextValid[readerIndex] = true;
return YieldableState.YIELDABLE;
}
@Override
public void close() throws Exception {
udtfContext.finalizeUDFExecutors(udtfQueryId);
inputOperator.close();
}
@Override
public ListenableFuture<?> isBlocked() {
return inputOperator.isBlocked();
}
@Override
public boolean isFinished() throws Exception {
// call hasNext first, or data of inputOperator could be missing
boolean flag = !hasNextWithTimer();
return timeHeap.isEmpty() && (flag || inputOperator.isFinished());
}
@Override
public OperatorContext getOperatorContext() {
return operatorContext;
}
@Override
public long calculateMaxPeekMemory() {
// here we use maximum estimated memory usage
return (long)
(udfCollectorMemoryBudgetInMB
+ udfTransformerMemoryBudgetInMB
+ inputOperator.calculateMaxReturnSize());
}
@Override
public long calculateMaxReturnSize() {
// time + all value columns
return (long) (1 + transformers.length)
* TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
// Collector may cache points, here we use maximum usage
return (long)
(inputOperator.calculateRetainedSizeAfterCallingNext() + udfCollectorMemoryBudgetInMB);
}
@TestOnly
public LayerPointReader[] getTransformers() {
return transformers;
}
}