GroupByFill 查询的主要逻辑在 GroupByFillDataSet
org.apache.iotdb.db.query.dataset.groupby.GroupByFillDataSetGroupByFill是对原降采样结果进行填充,目前仅支持使用前值填充的方式。
Previous填充方式的语意没有变,只要前面有值,就可以拿过来填充; PREVIOUSUNTILLAST考虑到在某些业务场景下,所填充的值的时间不能大于该时间序列last的时间戳(从业务角度考虑,取历史数据不能取未来历史数据) 看下面的例子,或许更容易理解
A点时间戳为1,B为5,C为20,D为30,N为8,M为38
原始数据为
select temperature FROM root.ln.wf01.wt01 where time >= 1 and time <= 38
| Time | root.ln.wf01.wt01.temperature |
|---|---|
| 1 | 21 |
| 3 | 23 |
| 5 | 25 |
| 20 | 26 |
| 27 | 29 |
| 28 | 30 |
| 30 | 40 |
当我们使用Previous插值方式时,即使D到M这一段是未来的数据,我们也会用D点的数据进行填充
SELECT last_value(temperature) as last_temperature FROM root.ln.wf01.wt01 GROUP BY([8, 39), 5m) FILL (int32[previous])
| Time | last_temperature |
|---|---|
| 8 | 25 |
| 13 | 25 |
| 18 | 26 |
| 23 | 29 |
| 28 | 40 |
| 33 | 40 |
| 38 | 40 |
当我们使用NONLASTPREVIOUS插值方式时,因为D到M这一段是未来的数据,我们不会进行插值,还是返回null
SELECT last_value(temperature) as last_temperature FROM root.ln.wf01.wt01 GROUP BY([8, 39), 5m) FILL (int32[PREVIOUSUNTILLAST])
| Time | last_temperature |
|---|---|
| 8 | 25 |
| 13 | 25 |
| 18 | 26 |
| 23 | 29 |
| 28 | 40 |
| 33 | null |
| 38 | null |
在GroupByFillDataSet中维护了两个主要变量
// the first value for each time series private Object[] previousValue; // last timestamp for each time series private long[] lastTimeArray;
previousValuepreviousValue这个变量维护了当前时间窗口的前一个降采样值,在GroupByFillDataSet构造函数中调用了initPreviousParis方法对其进行初始化。
private void initPreviousParis(QueryContext context, GroupByFillPlan groupByFillPlan)
throws StorageEngineException, IOException, QueryProcessException {
previousValue = new Object[paths.size()];
for (int i = 0; i < paths.size(); i++) {
Path path = paths.get(i);
TSDataType dataType = dataTypes.get(i);
IFill fill = new PreviousFill(dataType, groupByEngineDataSet.getStartTime(), -1L);
fill.constructReaders(path, groupByFillPlan.getAllMeasurementsInDevice(path.getDevice()), context);
TimeValuePair timeValuePair = fill.getFillResult();
if (timeValuePair == null || timeValuePair.getValue() == null) {
previousValue[i] = null;
} else {
previousValue[i] = timeValuePair.getValue().getValue();
}
}
}
initPreviousParis方法主要为每个时间序列构造了一个单点补空值查询,queryTime设置为降采样时间窗口的起始值,beforeRange不作限制。
lastTimeArraylastTimeArray这个变量维护了每个时间序列的最近时间戳值,主要用于PREVIOUSUNTILLAST这一填充方式,在GroupByFillDataSet构造函数中调用了initLastTimeArray方法对其进行初始化。
private void initLastTimeArray(QueryContext context)
throws IOException, StorageEngineException, QueryProcessException {
lastTimeArray = new long[paths.size()];
Arrays.fill(lastTimeArray, Long.MAX_VALUE);
for (int i = 0; i < paths.size(); i++) {
TimeValuePair lastTimeValuePair =
LastQueryExecutor.calculateLastPairForOneSeries(paths.get(i), dataTypes.get(i), context);
if (lastTimeValuePair.getValue() != null) {
lastTimeArray[i] = lastTimeValuePair.getTimestamp();
}
}
}
initPreviousParis方法主要为每个时间序列构造了一个最近时间戳 Last 查询
填充过程在nextWithoutConstraint方法中完成,主要逻辑如下:
protected RowRecord nextWithoutConstraint() throws IOException {
// 首先通过groupByEngineDataSet,获得原始的降采样查询结果行
RowRecord rowRecord = groupByEngineDataSet.nextWithoutConstraint();
// 接下来对每个时间序列判断需不需要填充
for (int i = 0; i < paths.size(); i++) {
Field field = rowRecord.getFields().get(i);
// 当前值为null,需要进行填充
if (field.getDataType() == null) {
// 当前一个值不为null 并且 (填充方式不是PREVIOUSUNTILLAST 或者 当前时间小于改时间序列的最近时间戳)
if (previousValue[i] != null
&& (!((PreviousFill) fillTypes.get(dataTypes.get(i))).isUntilLast()
|| rowRecord.getTimestamp() <= lastTimeArray[i])) {
rowRecord.getFields().set(i, Field.getField(previousValue[i], dataTypes.get(i)));
}
} else {
// 当前值不为null,不需要填充,用当前值更新previousValue数组
previousValue[i] = field.getObjectValue(field.getDataType());
}
}
return rowRecord;
}