blob: d88fadc167c24400f87b826ce893c5ade5b2b5eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.query.dataset.groupby;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.query.UnSupportedFillTypeException;
import org.apache.iotdb.db.mpp.transformation.datastructure.tv.ElasticSerializableTVList;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.fill.IFill;
import org.apache.iotdb.db.query.executor.fill.LinearFill;
import org.apache.iotdb.db.query.executor.fill.PreviousFill;
import org.apache.iotdb.db.query.executor.fill.ValueFill;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
public class GroupByFillDataSet extends GroupByTimeDataSet {
private static final Logger logger = LoggerFactory.getLogger(GroupByFillDataSet.class);
private QueryDataSet dataSet;
private final Map<TSDataType, IFill> fillTypes;
private final IFill singleFill;
private final List<String> aggregations;
private boolean[] unsupportedFillMethod;
// the result datatype for each aggregation
private final TSDataType[] resultDataType;
// the last value and time for each aggregation
private long[] previousTimes;
private Object[] previousValues;
// the next not null and unused rowId for each aggregation
private int[] nextIndices;
// the next value and time for each aggregation
private List<ElasticSerializableTVList> nextTVLists;
private final float groupByFillCacheSizeInMB =
IoTDBDescriptor.getInstance().getConfig().getGroupByFillCacheSizeInMB();
public GroupByFillDataSet(QueryContext context, GroupByTimeFillPlan groupByTimeFillPlan)
throws QueryProcessException {
super(context, groupByTimeFillPlan);
this.aggregations = groupByTimeFillPlan.getDeduplicatedAggregations();
this.fillTypes = groupByTimeFillPlan.getFillType();
this.singleFill = groupByTimeFillPlan.getSingleFill();
this.resultDataType = new TSDataType[aggregations.size()];
initArrays(context);
}
private void initArrays(QueryContext context) throws QueryProcessException {
for (int i = 0; i < aggregations.size(); i++) {
resultDataType[i] = TypeInferenceUtils.getAggrDataType(aggregations.get(i), dataTypes.get(i));
}
previousTimes = new long[aggregations.size()];
previousValues = new Object[aggregations.size()];
nextIndices = new int[aggregations.size()];
unsupportedFillMethod = new boolean[aggregations.size()];
Arrays.fill(previousTimes, Long.MAX_VALUE);
Arrays.fill(previousValues, null);
Arrays.fill(nextIndices, 0);
Arrays.fill(unsupportedFillMethod, false);
nextTVLists = new ArrayList<>(aggregations.size());
for (int i = 0; i < aggregations.size(); i++) {
nextTVLists.add(
ElasticSerializableTVList.newElasticSerializableTVList(
resultDataType[i], context.getQueryId(), groupByFillCacheSizeInMB, 2));
}
}
public void setDataSet(QueryDataSet dataSet) {
this.dataSet = dataSet;
}
public void initCache() throws QueryProcessException {
BitSet cacheSet = new BitSet(aggregations.size());
try {
while (cacheSet.cardinality() < aggregations.size() && dataSet.hasNextWithoutConstraint()) {
RowRecord record = dataSet.nextWithoutConstraint();
long timestamp = record.getTimestamp();
List<Field> fields = record.getFields();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
if (field == null) {
continue;
}
if (ascending && timestamp < startTime) {
previousTimes[i] = timestamp;
previousValues[i] = field.getObjectValue(resultDataType[i]);
} else if (!ascending && timestamp >= endTime) {
previousTimes[i] = timestamp;
previousValues[i] = field.getObjectValue(resultDataType[i]);
} else {
nextTVLists.get(i).put(timestamp, field.getObjectValue(resultDataType[i]));
cacheSet.set(i);
}
}
}
} catch (IOException e) {
logger.error("there has an exception while init: ", e);
throw new QueryProcessException(e.getMessage());
}
}
@Override
public RowRecord nextWithoutConstraint() throws IOException {
if (!hasCachedTimeInterval) {
throw new IOException(
"need to call hasNext() before calling next() " + "in GroupByFillDataSet.");
}
hasCachedTimeInterval = false;
RowRecord record;
long curTimestamp;
if (leftCRightO) {
curTimestamp = curAggrTimeRange.getMin();
record = new RowRecord(curAggrTimeRange.getMin());
} else {
curTimestamp = curAggrTimeRange.getMax() - 1;
record = new RowRecord(curAggrTimeRange.getMax() - 1);
}
for (int i = 0; i < aggregations.size(); i++) {
if (nextTVLists.get(i).size() == nextIndices[i]) {
fillRecord(i, record);
continue;
}
long cacheTime = nextTVLists.get(i).getTime(nextIndices[i]);
if (cacheTime == curTimestamp) {
record.addField(getNextCacheValue(i), resultDataType[i]);
} else {
fillRecord(i, record);
}
}
try {
slideCache(record.getTimestamp());
} catch (QueryProcessException e) {
logger.error("group by fill has an exception while sliding: ", e);
}
return record;
}
private void fillRecord(int index, RowRecord record) throws IOException {
if (unsupportedFillMethod[index]) {
record.addField(null);
return;
}
IFill fill;
if (fillTypes != null) {
// old type fill logic
fill = fillTypes.get(resultDataType[index]);
} else {
fill = singleFill;
}
if (fill == null) {
record.addField(null);
return;
}
Pair<Long, Object> beforePair, afterPair;
if (ascending) {
if (previousValues[index] != null) {
beforePair = new Pair<>(previousTimes[index], previousValues[index]);
} else {
beforePair = null;
}
if (nextIndices[index] < nextTVLists.get(index).size()) {
afterPair =
new Pair<>(
nextTVLists.get(index).getTime(nextIndices[index]), getNextCacheValue(index));
} else {
afterPair = null;
}
} else {
if (nextIndices[index] < nextTVLists.get(index).size()) {
beforePair =
new Pair<>(
nextTVLists.get(index).getTime(nextIndices[index]), getNextCacheValue(index));
} else {
beforePair = null;
}
if (previousValues[index] != null) {
afterPair = new Pair<>(previousTimes[index], previousValues[index]);
} else {
afterPair = null;
}
}
if (fill instanceof PreviousFill) {
if (beforePair != null
&& (fill.getBeforeRange() == -1
|| fill.insideBeforeRange(beforePair.left, record.getTimestamp()))
&& ((!((PreviousFill) fill).isUntilLast())
|| (afterPair != null && afterPair.left < endTime))) {
record.addField(beforePair.right, resultDataType[index]);
} else {
record.addField(null);
}
} else if (fill instanceof LinearFill) {
LinearFill linearFill = new LinearFill();
if (beforePair != null
&& afterPair != null
&& (fill.getBeforeRange() == -1
|| fill.insideBeforeRange(beforePair.left, record.getTimestamp()))
&& (fill.getAfterRange() == -1
|| fill.insideAfterRange(afterPair.left, record.getTimestamp()))) {
try {
TimeValuePair filledPair =
linearFill.averageWithTimeAndDataType(
new TimeValuePair(
beforePair.left,
TsPrimitiveType.getByType(resultDataType[index], beforePair.right)),
new TimeValuePair(
afterPair.left,
TsPrimitiveType.getByType(resultDataType[index], afterPair.right)),
record.getTimestamp(),
resultDataType[index]);
record.addField(filledPair.getValue().getValue(), resultDataType[index]);
} catch (UnSupportedFillTypeException ignore) {
// Don't fill and ignore unsupported fill type exception
record.addField(null);
unsupportedFillMethod[index] = true;
logger.info("Linear fill doesn't support the " + index + "-th column in SQL.");
}
} else {
record.addField(null);
}
} else if (fill instanceof ValueFill) {
try {
TimeValuePair filledPair = fill.getFillResult();
if (filledPair == null) {
filledPair = ((ValueFill) fill).getSpecifiedFillResult(resultDataType[index]);
}
record.addField(filledPair.getValue().getValue(), resultDataType[index]);
} catch (NumberFormatException ignore) {
// Don't fill and ignore type convert exception
record.addField(null);
unsupportedFillMethod[index] = true;
logger.info("Value fill doesn't support the " + index + "-th column in SQL.");
} catch (QueryProcessException | StorageEngineException e) {
throw new IOException(e);
}
}
}
private Object getNextCacheValue(int index) throws IOException {
switch (resultDataType[index]) {
case INT32:
return nextTVLists.get(index).getInt(nextIndices[index]);
case INT64:
return nextTVLists.get(index).getLong(nextIndices[index]);
case FLOAT:
return nextTVLists.get(index).getFloat(nextIndices[index]);
case DOUBLE:
return nextTVLists.get(index).getDouble(nextIndices[index]);
case BOOLEAN:
return nextTVLists.get(index).getBoolean(nextIndices[index]);
case TEXT:
return nextTVLists.get(index).getBinary(nextIndices[index]);
default:
throw new IOException("unknown data type!");
}
}
private void slideCache(long curTimestamp) throws IOException, QueryProcessException {
BitSet slideSet = new BitSet(aggregations.size());
for (int i = 0; i < aggregations.size(); i++) {
if (nextTVLists.get(i).size() == nextIndices[i]) {
continue;
}
// slide cache when the current TV is used
if (nextTVLists.get(i).getTime(nextIndices[i]) == curTimestamp) {
previousTimes[i] = curTimestamp;
previousValues[i] = getNextCacheValue(i);
nextIndices[i]++;
nextTVLists.get(i).setEvictionUpperBound(nextIndices[i]);
slideSet.set(i);
}
}
while (slideSet.cardinality() > 0 && dataSet.hasNextWithoutConstraint()) {
RowRecord record = dataSet.nextWithoutConstraint();
long timestamp = record.getTimestamp();
List<Field> fields = record.getFields();
for (int i = 0; i < fields.size(); i++) {
Field field = fields.get(i);
if (field == null) {
continue;
}
nextTVLists.get(i).put(timestamp, field.getObjectValue(resultDataType[i]));
slideSet.clear(i);
}
}
}
}