blob: 6fbc248b04c6a6e4dfcb17ea0bd9f328ebf0d3c2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.transformation.dag.intermediate;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.transformation.api.LayerPointReader;
import org.apache.iotdb.db.queryengine.transformation.api.LayerRowReader;
import org.apache.iotdb.db.queryengine.transformation.api.LayerRowWindowReader;
import org.apache.iotdb.db.queryengine.transformation.api.YieldableState;
import org.apache.iotdb.db.queryengine.transformation.dag.adapter.ElasticSerializableRowRecordListBackedMultiColumnRow;
import org.apache.iotdb.db.queryengine.transformation.dag.adapter.ElasticSerializableRowRecordListBackedMultiColumnWindow;
import org.apache.iotdb.db.queryengine.transformation.dag.input.IUDFInputDataSet;
import org.apache.iotdb.db.queryengine.transformation.dag.util.InputRowUtils;
import org.apache.iotdb.db.queryengine.transformation.dag.util.LayerCacheUtils;
import org.apache.iotdb.db.queryengine.transformation.datastructure.row.ElasticSerializableRowRecordList;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.access.RowWindow;
import org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public class MultiInputColumnIntermediateLayer extends IntermediateLayer
implements IUDFInputDataSet {
private static final Logger LOGGER =
LoggerFactory.getLogger(MultiInputColumnIntermediateLayer.class);
private final LayerPointReader[] layerPointReaders;
private final TSDataType[] dataTypes;
private final TimeSelector timeHeap;
private final boolean[] shouldMoveNext;
private boolean isFirstIteration = true;
private Object[] cachedRow = null;
public MultiInputColumnIntermediateLayer(
Expression expression,
String queryId,
float memoryBudgetInMB,
List<LayerPointReader> parentLayerPointReaders) {
super(expression, queryId, memoryBudgetInMB);
layerPointReaders = parentLayerPointReaders.toArray(new LayerPointReader[0]);
dataTypes = new TSDataType[layerPointReaders.length];
for (int i = 0; i < layerPointReaders.length; ++i) {
dataTypes[i] = layerPointReaders[i].getDataType();
}
timeHeap = new TimeSelector(layerPointReaders.length << 1, true);
shouldMoveNext = new boolean[dataTypes.length];
}
@Override
public List<TSDataType> getDataTypes() {
return Arrays.asList(dataTypes);
}
@Override
public boolean hasNextRowInObjects() throws IOException {
if (cachedRow != null) {
return true;
}
if (isFirstIteration) {
for (LayerPointReader reader : layerPointReaders) {
if (reader.isConstantPointReader()) {
continue;
}
try {
if (reader.next()) {
timeHeap.add(reader.currentTime());
}
} catch (Exception e) {
throw new IOException(e);
}
}
isFirstIteration = false;
}
if (timeHeap.isEmpty()) {
return false;
}
final long minTime = timeHeap.pollFirst();
final int columnLength = layerPointReaders.length;
cachedRow = new Object[columnLength + 1];
cachedRow[columnLength] = minTime;
try {
for (int i = 0; i < columnLength; ++i) {
final LayerPointReader reader = layerPointReaders[i];
if (!reader.next()
|| (!reader.isConstantPointReader() && reader.currentTime() != minTime)) {
continue;
}
if (!reader.isCurrentNull()) {
switch (reader.getDataType()) {
case INT32:
cachedRow[i] = reader.currentInt();
break;
case INT64:
cachedRow[i] = reader.currentLong();
break;
case FLOAT:
cachedRow[i] = reader.currentFloat();
break;
case DOUBLE:
cachedRow[i] = reader.currentDouble();
break;
case BOOLEAN:
cachedRow[i] = reader.currentBoolean();
break;
case TEXT:
cachedRow[i] = reader.currentBinary();
break;
default:
throw new UnSupportedDataTypeException("Unsupported data type.");
}
}
reader.readyForNext();
if (!reader.isConstantPointReader() && reader.next()) {
timeHeap.add(reader.currentTime());
}
}
} catch (QueryProcessException e) {
throw new IOException(e.getMessage());
}
return true;
}
@Override
public YieldableState canYieldNextRowInObjects() throws Exception {
if (cachedRow != null) {
return YieldableState.YIELDABLE;
}
if (isFirstIteration) {
for (LayerPointReader reader : layerPointReaders) {
if (reader.isConstantPointReader()) {
continue;
}
try {
final YieldableState yieldableState = reader.yield();
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
}
if (yieldableState == YieldableState.YIELDABLE) {
timeHeap.add(reader.currentTime());
}
} catch (Exception e) {
throw new IOException(e);
}
}
isFirstIteration = false;
} else {
for (int i = 0, columnLength = layerPointReaders.length; i < columnLength; ++i) {
if (shouldMoveNext[i]) {
layerPointReaders[i].readyForNext();
shouldMoveNext[i] = false;
}
}
for (LayerPointReader layerPointReader : layerPointReaders) {
try {
if (!layerPointReader.isConstantPointReader()) {
final YieldableState yieldableState = layerPointReader.yield();
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
}
if (yieldableState == YieldableState.YIELDABLE) {
timeHeap.add(layerPointReader.currentTime());
}
}
} catch (Exception e) {
throw new IOException(e);
}
}
}
if (timeHeap.isEmpty()) {
return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}
final long minTime = timeHeap.pollFirst();
final int columnLength = layerPointReaders.length;
final Object[] row = new Object[columnLength + 1];
row[columnLength] = minTime;
try {
for (int i = 0; i < columnLength; ++i) {
final LayerPointReader reader = layerPointReaders[i];
final YieldableState yieldableState = reader.yield();
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
for (int j = 0; j <= i; ++j) {
shouldMoveNext[j] = false;
}
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
}
if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
continue;
}
if (!reader.isConstantPointReader() && reader.currentTime() != minTime) {
continue;
}
if (!reader.isCurrentNull()) {
switch (reader.getDataType()) {
case INT32:
row[i] = reader.currentInt();
break;
case INT64:
row[i] = reader.currentLong();
break;
case FLOAT:
row[i] = reader.currentFloat();
break;
case DOUBLE:
row[i] = reader.currentDouble();
break;
case BOOLEAN:
row[i] = reader.currentBoolean();
break;
case TEXT:
row[i] = reader.currentBinary();
break;
default:
throw new UnSupportedDataTypeException("Unsupported data type.");
}
}
shouldMoveNext[i] = true;
}
cachedRow = row;
} catch (QueryProcessException e) {
throw new IOException(e.getMessage());
}
return YieldableState.YIELDABLE;
}
@Override
public Object[] nextRowInObjects() {
final Object[] returnedRow = cachedRow;
cachedRow = null;
return returnedRow;
}
@Override
public LayerPointReader constructPointReader() {
throw new UnsupportedOperationException();
}
@Override
public LayerRowReader constructRowReader() {
return new LayerRowReader() {
private final ElasticSerializableRowRecordListBackedMultiColumnRow row =
new ElasticSerializableRowRecordListBackedMultiColumnRow(dataTypes);
private boolean hasCached = false;
private boolean currentNull = false;
@Override
public YieldableState yield() throws Exception {
if (hasCached) {
return YieldableState.YIELDABLE;
}
final YieldableState yieldableState = canYieldNextRowInObjects();
if (yieldableState != YieldableState.YIELDABLE) {
return yieldableState;
}
Object[] rowRecords = nextRowInObjects();
currentNull = InputRowUtils.isAllNull(rowRecords);
row.setRowRecord(rowRecords);
hasCached = true;
return YieldableState.YIELDABLE;
}
@Override
public boolean next() throws IOException {
if (hasCached) {
return true;
}
if (!hasNextRowInObjects()) {
return false;
}
Object[] rowRecords = nextRowInObjects();
currentNull = InputRowUtils.isAllNull(rowRecords);
row.setRowRecord(rowRecords);
hasCached = true;
return true;
}
@Override
public void readyForNext() {
hasCached = false;
currentNull = false;
}
@Override
public TSDataType[] getDataTypes() {
return dataTypes;
}
@Override
public long currentTime() {
return row.getTime();
}
@Override
public Row currentRow() {
return row;
}
@Override
public boolean isCurrentNull() {
return currentNull;
}
};
}
@Override
protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB)
throws QueryProcessException {
final IUDFInputDataSet udfInputDataSet = this;
return new LayerRowWindowReader() {
private final int windowSize = strategy.getWindowSize();
private final int slidingStep = strategy.getSlidingStep();
private final ElasticSerializableRowRecordList rowRecordList =
new ElasticSerializableRowRecordList(
dataTypes, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
private final ElasticSerializableRowRecordListBackedMultiColumnWindow window =
new ElasticSerializableRowRecordListBackedMultiColumnWindow(rowRecordList);
private boolean hasCached = false;
private int beginIndex = -slidingStep;
@Override
public YieldableState yield() throws Exception {
if (hasCached) {
return YieldableState.YIELDABLE;
}
beginIndex += slidingStep;
int endIndex = beginIndex + windowSize;
if (beginIndex < 0 || endIndex < 0) {
LOGGER.warn(
"LayerRowWindowReader index overflow. beginIndex: {}, endIndex: {}, windowSize: {}.",
beginIndex,
endIndex,
windowSize);
return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}
final int rowsToBeCollected = endIndex - rowRecordList.size();
if (0 < rowsToBeCollected) {
final YieldableState yieldableState =
LayerCacheUtils.yieldRows(udfInputDataSet, rowRecordList, rowsToBeCollected);
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
beginIndex -= slidingStep;
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
}
if (rowRecordList.size() <= beginIndex) {
return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}
window.seek(
beginIndex,
rowRecordList.size(),
rowRecordList.getTime(beginIndex),
rowRecordList.getTime(rowRecordList.size() - 1));
} else {
window.seek(
beginIndex,
endIndex,
rowRecordList.getTime(beginIndex),
rowRecordList.getTime(endIndex - 1));
}
hasCached = true;
return YieldableState.YIELDABLE;
}
@Override
public boolean next() throws IOException, QueryProcessException {
if (hasCached) {
return true;
}
beginIndex += slidingStep;
int endIndex = beginIndex + windowSize;
if (beginIndex < 0 || endIndex < 0) {
LOGGER.warn(
"LayerRowWindowReader index overflow. beginIndex: {}, endIndex: {}, windowSize: {}.",
beginIndex,
endIndex,
windowSize);
return false;
}
int rowsToBeCollected = endIndex - rowRecordList.size();
if (0 < rowsToBeCollected) {
LayerCacheUtils.cacheRows(udfInputDataSet, rowRecordList, rowsToBeCollected);
if (rowRecordList.size() <= beginIndex) {
return false;
}
window.seek(
beginIndex,
rowRecordList.size(),
rowRecordList.getTime(beginIndex),
rowRecordList.getTime(rowRecordList.size() - 1));
} else {
window.seek(
beginIndex,
endIndex,
rowRecordList.getTime(beginIndex),
rowRecordList.getTime(endIndex - 1));
}
hasCached = true;
return true;
}
@Override
public void readyForNext() {
hasCached = false;
rowRecordList.setEvictionUpperBound(beginIndex + 1);
}
@Override
public TSDataType[] getDataTypes() {
return dataTypes;
}
@Override
public RowWindow currentWindow() {
return window;
}
};
}
@Override
protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
throws QueryProcessException {
final long timeInterval = strategy.getTimeInterval();
final long slidingStep = strategy.getSlidingStep();
final long displayWindowEnd = strategy.getDisplayWindowEnd();
final IUDFInputDataSet udfInputDataSet = this;
final ElasticSerializableRowRecordList rowRecordList =
new ElasticSerializableRowRecordList(
dataTypes, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
final ElasticSerializableRowRecordListBackedMultiColumnWindow window =
new ElasticSerializableRowRecordListBackedMultiColumnWindow(rowRecordList);
return new LayerRowWindowReader() {
private boolean isFirstIteration = true;
private boolean hasAtLeastOneRow = false;
private boolean hasCached = false;
private long nextWindowTimeBegin = strategy.getDisplayWindowBegin();
private int nextIndexBegin = 0;
@Override
public YieldableState yield() throws Exception {
if (isFirstIteration) {
if (rowRecordList.size() == 0) {
final YieldableState yieldableState =
LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
if (yieldableState != YieldableState.YIELDABLE) {
return yieldableState;
}
if (nextWindowTimeBegin == Long.MIN_VALUE) {
// display window begin should be set to the same as the min timestamp of the query
// result set
nextWindowTimeBegin = rowRecordList.getTime(0);
}
}
hasAtLeastOneRow = rowRecordList.size() != 0;
isFirstIteration = false;
}
if (hasCached) {
return YieldableState.YIELDABLE;
}
if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}
long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, displayWindowEnd);
while (rowRecordList.getTime(rowRecordList.size() - 1) < nextWindowTimeEnd) {
final YieldableState yieldableState =
LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
}
if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
break;
}
}
for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
if (nextWindowTimeBegin <= rowRecordList.getTime(i)) {
nextIndexBegin = i;
break;
}
if (i == rowRecordList.size() - 1) {
nextIndexBegin = rowRecordList.size();
}
}
int nextIndexEnd = rowRecordList.size();
for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
if (nextWindowTimeEnd <= rowRecordList.getTime(i)) {
nextIndexEnd = i;
break;
}
}
if ((nextIndexEnd == nextIndexBegin)
&& nextWindowTimeEnd < rowRecordList.getTime(rowRecordList.size() - 1)) {
window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
return YieldableState.YIELDABLE;
}
window.seek(
nextIndexBegin,
nextIndexEnd,
nextWindowTimeBegin,
nextWindowTimeBegin + timeInterval - 1);
hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == rowRecordList.size());
return hasCached ? YieldableState.YIELDABLE : YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}
@Override
public boolean next() throws IOException, QueryProcessException {
if (isFirstIteration) {
if (rowRecordList.size() == 0
&& LayerCacheUtils.cacheRow(udfInputDataSet, rowRecordList)
&& nextWindowTimeBegin == Long.MIN_VALUE) {
// display window begin should be set to the same as the min timestamp of the query
// result set
nextWindowTimeBegin = rowRecordList.getTime(0);
}
hasAtLeastOneRow = rowRecordList.size() != 0;
isFirstIteration = false;
}
if (hasCached) {
return true;
}
if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
return false;
}
long nextWindowTimeEnd = Math.min(nextWindowTimeBegin + timeInterval, displayWindowEnd);
while (rowRecordList.getTime(rowRecordList.size() - 1) < nextWindowTimeEnd) {
if (!LayerCacheUtils.cacheRow(udfInputDataSet, rowRecordList)) {
break;
}
}
for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
if (nextWindowTimeBegin <= rowRecordList.getTime(i)) {
nextIndexBegin = i;
break;
}
if (i == rowRecordList.size() - 1) {
nextIndexBegin = rowRecordList.size();
}
}
int nextIndexEnd = rowRecordList.size();
for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
if (nextWindowTimeEnd <= rowRecordList.getTime(i)) {
nextIndexEnd = i;
break;
}
}
if ((nextIndexEnd == nextIndexBegin)
&& nextWindowTimeEnd < rowRecordList.getTime(rowRecordList.size() - 1)) {
window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
return true;
}
window.seek(
nextIndexBegin,
nextIndexEnd,
nextWindowTimeBegin,
nextWindowTimeBegin + timeInterval - 1);
hasCached = !(nextIndexBegin == nextIndexEnd && nextIndexEnd == rowRecordList.size());
return hasCached;
}
@Override
public void readyForNext() {
hasCached = false;
nextWindowTimeBegin += slidingStep;
rowRecordList.setEvictionUpperBound(nextIndexBegin + 1);
}
@Override
public TSDataType[] getDataTypes() {
return dataTypes;
}
@Override
public RowWindow currentWindow() {
return window;
}
};
}
@Override
protected LayerRowWindowReader constructRowSessionTimeWindowReader(
SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
throws QueryProcessException {
final long displayWindowBegin = strategy.getDisplayWindowBegin();
final long displayWindowEnd = strategy.getDisplayWindowEnd();
final long sessionTimeGap = strategy.getSessionTimeGap();
final IUDFInputDataSet udfInputDataSet = this;
final ElasticSerializableRowRecordList rowRecordList =
new ElasticSerializableRowRecordList(
dataTypes, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
final ElasticSerializableRowRecordListBackedMultiColumnWindow window =
new ElasticSerializableRowRecordListBackedMultiColumnWindow(rowRecordList);
return new LayerRowWindowReader() {
private boolean isFirstIteration = true;
private boolean hasAtLeastOneRow = false;
private long nextWindowTimeBegin = displayWindowBegin;
private long nextWindowTimeEnd = 0;
private int nextIndexBegin = 0;
private int nextIndexEnd = 1;
@Override
public YieldableState yield() throws Exception {
if (isFirstIteration) {
if (rowRecordList.size() == 0) {
final YieldableState yieldableState =
LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
if (yieldableState != YieldableState.YIELDABLE) {
return yieldableState;
}
}
nextWindowTimeBegin = Math.max(displayWindowBegin, rowRecordList.getTime(0));
hasAtLeastOneRow = rowRecordList.size() != 0;
isFirstIteration = false;
}
if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}
while (rowRecordList.getTime(rowRecordList.size() - 1) < displayWindowEnd) {
final YieldableState yieldableState =
LayerCacheUtils.yieldRow(udfInputDataSet, rowRecordList);
if (yieldableState == YieldableState.YIELDABLE) {
if (rowRecordList.getTime(rowRecordList.size() - 2) >= displayWindowBegin
&& rowRecordList.getTime(rowRecordList.size() - 1)
- rowRecordList.getTime(rowRecordList.size() - 2)
> sessionTimeGap) {
nextIndexEnd = rowRecordList.size() - 1;
break;
} else {
nextIndexEnd++;
}
} else if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
} else if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
nextIndexEnd = rowRecordList.size();
break;
}
}
nextWindowTimeEnd = rowRecordList.getTime(nextIndexEnd - 1);
if (nextIndexBegin == nextIndexEnd) {
return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}
// Only if encounter user set the strategy's displayWindowBegin, which will go into the for
// loop to find the true index of the first window begin.
// For other situation, we will only go into if (nextWindowTimeBegin <= tvList.getTime(i))
// once.
for (int i = nextIndexBegin; i < rowRecordList.size(); ++i) {
if (nextWindowTimeBegin <= rowRecordList.getTime(i)) {
nextIndexBegin = i;
break;
}
// The first window's beginning time is greater than all the timestamp of the query result
// set
if (i == rowRecordList.size() - 1) {
return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
}
}
window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin, nextWindowTimeEnd);
return YieldableState.YIELDABLE;
}
@Override
public boolean next() throws IOException, QueryProcessException {
return false;
}
@Override
public void readyForNext() throws IOException, QueryProcessException {
if (nextIndexEnd < rowRecordList.size()) {
nextWindowTimeBegin = rowRecordList.getTime(nextIndexEnd);
}
rowRecordList.setEvictionUpperBound(nextIndexBegin + 1);
nextIndexBegin = nextIndexEnd;
}
@Override
public TSDataType[] getDataTypes() {
return dataTypes;
}
@Override
public RowWindow currentWindow() {
return window;
}
};
}
@Override
protected LayerRowWindowReader constructRowStateWindowReader(
StateWindowAccessStrategy strategy, float memoryBudgetInMB) {
throw new UnsupportedOperationException(
"StateWindowAccessStrategy only support one input series for now.");
}
}