降采样补空值查询

GroupByFill 查询的主要逻辑在 GroupByFillDataSet

  • org.apache.iotdb.db.query.dataset.groupby.GroupByFillDataSet

GroupByFill 是对原降采样结果进行填充,目前仅支持使用前值填充的方式。

  • 在 Group By Fill 中,Group by 子句不支持滑动步长,否则抛异常
  • Fill 子句中仅能使用 Previous 和 PREVIOUSUNTILLAST 这两种插值方式,Linear 不支持
  • Previous 和 PREVIOUSUNTILLAST 对 fill 的时间不做限制
  • 填充只针对 last_value 这一聚合函数,其他的函数不支持,如果其他函数的聚合值查询结果为 null,依旧为 null,不进行填充

PREVIOUSUNTILLAST 与 PREVIOUS 填充的区别

Previous 填充方式的语意没有变,只要前面有值,就可以拿过来填充; PREVIOUSUNTILLAST 考虑到在某些业务场景下,所填充的值的时间不能大于该时间序列 last 的时间戳(从业务角度考虑,取历史数据不能取未来历史数据) 看下面的例子,或许更容易理解

A 点时间戳为 1,B 为 5,C 为 20,D 为 30,N 为 8,M 为 38

原始数据为

select temperature FROM root.ln.wf01.wt01 where time >= 1 and time <= 38

Timeroot.ln.wf01.wt01.temperature
121
323
525
2026
2729
2830
3040

当我们使用 Previous 插值方式时,即使 D 到 M 这一段是未来的数据,我们也会用 D 点的数据进行填充

SELECT last_value(temperature) as last_temperature FROM root.ln.wf01.wt01 GROUP BY([8, 39), 5m) FILL (int32[previous])

Timelast_temperature
825
1325
1826
2329
2840
3340
3840

当我们使用 NONLASTPREVIOUS 插值方式时,因为 D 到 M 这一段是未来的数据,我们不会进行插值,还是返回 null

SELECT last_value(temperature) as last_temperature FROM root.ln.wf01.wt01 GROUP BY([8, 39), 5m) FILL (int32[PREVIOUSUNTILLAST])

Timelast_temperature
825
1325
1826
2329
2840
33null
38null

核心查询逻辑

GroupByFillDataSet中维护了两个主要变量

// the first value for each time series
private Object[] previousValue;
// last timestamp for each time series
private long[] lastTimeArray;

previousValue

previousValue这个变量维护了当前时间窗口的前一个降采样值,在GroupByFillDataSet构造函数中调用了initPreviousParis方法对其进行初始化。

  private void initPreviousParis(QueryContext context, GroupByFillPlan groupByFillPlan)
          throws StorageEngineException, IOException, QueryProcessException {
    previousValue = new Object[paths.size()];
    for (int i = 0; i < paths.size(); i++) {
      Path path = paths.get(i);
      TSDataType dataType = dataTypes.get(i);
      IFill fill = new PreviousFill(dataType, groupByEngineDataSet.getStartTime(), -1L);
      fill.constructReaders(path, groupByFillPlan.getAllMeasurementsInDevice(path.getDevice()), context);

      TimeValuePair timeValuePair = fill.getFillResult();
      if (timeValuePair == null || timeValuePair.getValue() == null) {
        previousValue[i] = null;
      } else {
        previousValue[i] = timeValuePair.getValue().getValue();
      }
    }
  }

initPreviousParis方法主要为每个时间序列构造了一个单点补空值查询,queryTime设置为降采样时间窗口的起始值,beforeRange不作限制。

lastTimeArray

lastTimeArray这个变量维护了每个时间序列的最近时间戳值,主要用于PREVIOUSUNTILLAST这一填充方式,在GroupByFillDataSet构造函数中调用了initLastTimeArray方法对其进行初始化。

  private void initLastTimeArray(QueryContext context)
      throws IOException, StorageEngineException, QueryProcessException {
    lastTimeArray = new long[paths.size()];
    Arrays.fill(lastTimeArray, Long.MAX_VALUE);
    for (int i = 0; i < paths.size(); i++) {
      TimeValuePair lastTimeValuePair =
          LastQueryExecutor.calculateLastPairForOneSeries(paths.get(i), dataTypes.get(i), context);
      if (lastTimeValuePair.getValue() != null) {
        lastTimeArray[i] = lastTimeValuePair.getTimestamp();
      }
    }
  }

initPreviousParis方法主要为每个时间序列构造了一个最近时间戳 Last 查询

填充过程

填充过程在nextWithoutConstraint方法中完成,主要逻辑如下:

protected RowRecord nextWithoutConstraint() throws IOException {

    // 首先通过 groupByEngineDataSet,获得原始的降采样查询结果行
    RowRecord rowRecord = groupByEngineDataSet.nextWithoutConstraint();

    // 接下来对每个时间序列判断需不需要填充
    for (int i = 0; i < paths.size(); i++) {
      Field field = rowRecord.getFields().get(i);
      // 当前值为 null,需要进行填充
      if (field.getDataType() == null) {
        // 当前一个值不为 null 并且 (填充方式不是 PREVIOUSUNTILLAST 或者 当前时间小于改时间序列的最近时间戳)
        if (previousValue[i] != null
            && (!((PreviousFill) fillTypes.get(dataTypes.get(i))).isUntilLast()
            || rowRecord.getTimestamp() <= lastTimeArray[i])) {
          rowRecord.getFields().set(i, Field.getField(previousValue[i], dataTypes.get(i)));
        }
      } else {
        // 当前值不为 null,不需要填充,用当前值更新 previousValue 数组
        previousValue[i] = field.getObjectValue(field.getDataType());
      }
    }
    return rowRecord;
  }