降采样补空值查询

GroupByFill 查询的主要逻辑在 GroupByFillDataSet

  • org.apache.iotdb.db.query.dataset.groupby.GroupByFillDataSet

GroupByFill是对原降采样结果进行填充,目前仅支持使用前值填充的方式。

  • 在Group By Fill中,Group by子句不支持滑动步长,否则抛异常
  • Fill子句中仅能使用Previous和PREVIOUSUNTILLAST这两种插值方式,Linear不支持
  • Previous和PREVIOUSUNTILLAST对fill的时间不做限制
  • 填充只针对last_value这一聚合函数,其他的函数不支持,如果其他函数的聚合值查询结果为null,依旧为null,不进行填充

PREVIOUSUNTILLAST与PREVIOUS填充的区别

Previous填充方式的语意没有变,只要前面有值,就可以拿过来填充; PREVIOUSUNTILLAST考虑到在某些业务场景下,所填充的值的时间不能大于该时间序列last的时间戳(从业务角度考虑,取历史数据不能取未来历史数据) 看下面的例子,或许更容易理解

A点时间戳为1,B为5,C为20,D为30,N为8,M为38

原始数据为

select temperature FROM root.ln.wf01.wt01 where time >= 1 and time <= 38

Timeroot.ln.wf01.wt01.temperature
121
323
525
2026
2729
2830
3040

当我们使用Previous插值方式时,即使D到M这一段是未来的数据,我们也会用D点的数据进行填充

SELECT last_value(temperature) as last_temperature FROM root.ln.wf01.wt01 GROUP BY([8, 39), 5m) FILL (int32[previous])

Timelast_temperature
825
1325
1826
2329
2840
3340
3840

当我们使用NONLASTPREVIOUS插值方式时,因为D到M这一段是未来的数据,我们不会进行插值,还是返回null

SELECT last_value(temperature) as last_temperature FROM root.ln.wf01.wt01 GROUP BY([8, 39), 5m) FILL (int32[PREVIOUSUNTILLAST])

Timelast_temperature
825
1325
1826
2329
2840
33null
38null

核心查询逻辑

GroupByFillDataSet中维护了两个主要变量

// the first value for each time series
private Object[] previousValue;
// last timestamp for each time series
private long[] lastTimeArray;

previousValue

previousValue这个变量维护了当前时间窗口的前一个降采样值,在GroupByFillDataSet构造函数中调用了initPreviousParis方法对其进行初始化。

  private void initPreviousParis(QueryContext context, GroupByFillPlan groupByFillPlan)
          throws StorageEngineException, IOException, QueryProcessException {
    previousValue = new Object[paths.size()];
    for (int i = 0; i < paths.size(); i++) {
      Path path = paths.get(i);
      TSDataType dataType = dataTypes.get(i);
      IFill fill = new PreviousFill(dataType, groupByEngineDataSet.getStartTime(), -1L);
      fill.constructReaders(path, groupByFillPlan.getAllMeasurementsInDevice(path.getDevice()), context);

      TimeValuePair timeValuePair = fill.getFillResult();
      if (timeValuePair == null || timeValuePair.getValue() == null) {
        previousValue[i] = null;
      } else {
        previousValue[i] = timeValuePair.getValue().getValue();
      }
    }
  }

initPreviousParis方法主要为每个时间序列构造了一个单点补空值查询,queryTime设置为降采样时间窗口的起始值,beforeRange不作限制。

lastTimeArray

lastTimeArray这个变量维护了每个时间序列的最近时间戳值,主要用于PREVIOUSUNTILLAST这一填充方式,在GroupByFillDataSet构造函数中调用了initLastTimeArray方法对其进行初始化。

  private void initLastTimeArray(QueryContext context)
      throws IOException, StorageEngineException, QueryProcessException {
    lastTimeArray = new long[paths.size()];
    Arrays.fill(lastTimeArray, Long.MAX_VALUE);
    for (int i = 0; i < paths.size(); i++) {
      TimeValuePair lastTimeValuePair =
          LastQueryExecutor.calculateLastPairForOneSeries(paths.get(i), dataTypes.get(i), context);
      if (lastTimeValuePair.getValue() != null) {
        lastTimeArray[i] = lastTimeValuePair.getTimestamp();
      }
    }
  }

initPreviousParis方法主要为每个时间序列构造了一个最近时间戳 Last 查询

填充过程

填充过程在nextWithoutConstraint方法中完成,主要逻辑如下:

protected RowRecord nextWithoutConstraint() throws IOException {

    // 首先通过groupByEngineDataSet,获得原始的降采样查询结果行
    RowRecord rowRecord = groupByEngineDataSet.nextWithoutConstraint();

    // 接下来对每个时间序列判断需不需要填充
    for (int i = 0; i < paths.size(); i++) {
      Field field = rowRecord.getFields().get(i);
      // 当前值为null,需要进行填充
      if (field.getDataType() == null) {
        // 当前一个值不为null 并且 (填充方式不是PREVIOUSUNTILLAST 或者 当前时间小于改时间序列的最近时间戳)
        if (previousValue[i] != null
            && (!((PreviousFill) fillTypes.get(dataTypes.get(i))).isUntilLast()
            || rowRecord.getTimestamp() <= lastTimeArray[i])) {
          rowRecord.getFields().set(i, Field.getField(previousValue[i], dataTypes.get(i)));
        }
      } else {
        // 当前值不为null,不需要填充,用当前值更新previousValue数组
        previousValue[i] = field.getObjectValue(field.getDataType());
      }
    }
    return rowRecord;
  }