blob: c34df6bedac39149db7d7084c3c47bb12be84cf7 [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# 降采样补空值查询
GroupByFill 查询的主要逻辑在 `GroupByFillDataSet`
* `org.apache.iotdb.db.query.dataset.groupby.GroupByFillDataSet`
GroupByFill是对原降采样结果进行填充,目前仅支持使用前值填充的方式。
* 在Group By Fill中,Group by子句不支持滑动步长,否则抛异常
* Fill子句中仅能使用Previous和PREVIOUSUNTILLAST这两种插值方式,Linear不支持
* Previous和PREVIOUSUNTILLAST对fill的时间不做限制
* 填充只针对last_value这一聚合函数,其他的函数不支持,如果其他函数的聚合值查询结果为null,依旧为null,不进行填充
## PREVIOUSUNTILLAST与PREVIOUS填充的区别
Previous填充方式的语意没有变,只要前面有值,就可以拿过来填充;
PREVIOUSUNTILLAST考虑到在某些业务场景下,所填充的值的时间不能大于该时间序列last的时间戳(从业务角度考虑,取历史数据不能取未来历史数据)
看下面的例子,或许更容易理解
A点时间戳为1,B为5,C为20,D为30,N为8,M为38
原始数据为<img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://user-images.githubusercontent.com/16079446/78784824-9f41ae00-79d8-11ea-9920-0825e081cae0.png">
`select temperature FROM root.ln.wf01.wt01 where time >= 1 and time <= 38`
| Time | root.ln.wf01.wt01.temperature |
| ------ | ------------------------------ |
| 1 | 21 |
| 3 | 23 |
| 5 | 25 |
| 20 | 26 |
| 27 | 29 |
| 28 | 30 |
| 30 | 40 |
当我们使用Previous插值方式时,即使D到M这一段是未来的数据,我们也会用D点的数据进行填充
`SELECT last_value(temperature) as last_temperature FROM root.ln.wf01.wt01 GROUP BY([8, 39), 5m) FILL (int32[previous])`
| Time | last_temperature |
| ------ | ---------------- |
| 8 | 25 |
| 13 | 25 |
| 18 | 26 |
| 23 | 29 |
| 28 | 40 |
| 33 | 40 |
| 38 | 40 |
当我们使用NONLASTPREVIOUS插值方式时,因为D到M这一段是未来的数据,我们不会进行插值,还是返回null
`SELECT last_value(temperature) as last_temperature FROM root.ln.wf01.wt01 GROUP BY([8, 39), 5m) FILL (int32[PREVIOUSUNTILLAST])`
| Time | last_temperature |
| ------ | ---------------- |
| 8 | 25 |
| 13 | 25 |
| 18 | 26 |
| 23 | 29 |
| 28 | 40 |
| 33 | null |
| 38 | null |
## 核心查询逻辑
在`GroupByFillDataSet`中维护了两个主要变量
```
// the first value for each time series
private Object[] previousValue;
// last timestamp for each time series
private long[] lastTimeArray;
```
### `previousValue`
`previousValue`这个变量维护了当前时间窗口的前一个降采样值,在`GroupByFillDataSet`构造函数中调用了`initPreviousParis`方法对其进行初始化。
```
private void initPreviousParis(QueryContext context, GroupByFillPlan groupByFillPlan)
throws StorageEngineException, IOException, QueryProcessException {
previousValue = new Object[paths.size()];
for (int i = 0; i < paths.size(); i++) {
Path path = paths.get(i);
TSDataType dataType = dataTypes.get(i);
IFill fill = new PreviousFill(dataType, groupByEngineDataSet.getStartTime(), -1L);
fill.constructReaders(path, groupByFillPlan.getAllMeasurementsInDevice(path.getDevice()), context);
TimeValuePair timeValuePair = fill.getFillResult();
if (timeValuePair == null || timeValuePair.getValue() == null) {
previousValue[i] = null;
} else {
previousValue[i] = timeValuePair.getValue().getValue();
}
}
}
```
`initPreviousParis`方法主要为每个时间序列构造了一个单点补空值查询,`queryTime`设置为降采样时间窗口的起始值,`beforeRange`不作限制。
### `lastTimeArray`
`lastTimeArray`这个变量维护了每个时间序列的最近时间戳值,主要用于`PREVIOUSUNTILLAST`这一填充方式,在`GroupByFillDataSet`构造函数中调用了`initLastTimeArray`方法对其进行初始化。
```
private void initLastTimeArray(QueryContext context)
throws IOException, StorageEngineException, QueryProcessException {
lastTimeArray = new long[paths.size()];
Arrays.fill(lastTimeArray, Long.MAX_VALUE);
for (int i = 0; i < paths.size(); i++) {
TimeValuePair lastTimeValuePair =
LastQueryExecutor.calculateLastPairForOneSeries(paths.get(i), dataTypes.get(i), context);
if (lastTimeValuePair.getValue() != null) {
lastTimeArray[i] = lastTimeValuePair.getTimestamp();
}
}
}
```
`initPreviousParis`方法主要为每个时间序列构造了一个最近时间戳 Last 查询
### 填充过程
填充过程在`nextWithoutConstraint`方法中完成,主要逻辑如下:
```
protected RowRecord nextWithoutConstraint() throws IOException {
// 首先通过groupByEngineDataSet,获得原始的降采样查询结果行
RowRecord rowRecord = groupByEngineDataSet.nextWithoutConstraint();
// 接下来对每个时间序列判断需不需要填充
for (int i = 0; i < paths.size(); i++) {
Field field = rowRecord.getFields().get(i);
// 当前值为null,需要进行填充
if (field.getDataType() == null) {
// 当前一个值不为null 并且 (填充方式不是PREVIOUSUNTILLAST 或者 当前时间小于改时间序列的最近时间戳)
if (previousValue[i] != null
&& (!((PreviousFill) fillTypes.get(dataTypes.get(i))).isUntilLast()
|| rowRecord.getTimestamp() <= lastTimeArray[i])) {
rowRecord.getFields().set(i, Field.getField(previousValue[i], dataTypes.get(i)));
}
} else {
// 当前值不为null,不需要填充,用当前值更新previousValue数组
previousValue[i] = field.getObjectValue(field.getDataType());
}
}
return rowRecord;
}
```