blob: f54b1d5e9cc29e2276dcf774ba2722136166bf75 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.execution.aggregation.slidingwindow;
import org.apache.iotdb.db.queryengine.execution.aggregation.Accumulator;
import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import java.util.Arrays;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
public abstract class SlidingWindowAggregator extends Aggregator {
// cached partial aggregation result of pre-aggregate windows
protected Deque<PartialAggregationResult> deque;
protected TimeRange curTimeRange;
protected SlidingWindowAggregator(
Accumulator accumulator, List<InputLocation[]> inputLocationList, AggregationStep step) {
super(accumulator, step, inputLocationList);
this.deque = new LinkedList<>();
}
public void processTsBlock(TsBlock tsBlock) {
checkArgument(
step.isInputPartial(),
"Step in SlidingWindowAggregationOperator can only process partial result");
TimeColumn timeColumn = tsBlock.getTimeColumn();
Column[] valueColumn = new Column[inputLocationList.get(0).length];
for (int i = 0; i < inputLocationList.get(0).length; i++) {
InputLocation inputLocation = inputLocationList.get(0)[i];
checkArgument(
inputLocation.getTsBlockIndex() == 0,
"SlidingWindowAggregationOperator can only process one tsBlock input.");
valueColumn[i] = tsBlock.getColumn(inputLocation.getValueColumnIndex());
}
processPartialResult(new PartialAggregationResult(timeColumn, valueColumn));
}
public void updateTimeRange(TimeRange curTimeRange) {
this.curTimeRange = curTimeRange;
evictingExpiredValue();
}
/** evicting expired element in queue and reset expired aggregateResult. */
protected abstract void evictingExpiredValue();
/** update queue and aggregateResult. */
public abstract void processPartialResult(PartialAggregationResult partialResult);
protected static class PartialAggregationResult {
private final TimeColumn timeColumn;
private final Column[] partialResultColumns;
public PartialAggregationResult(TimeColumn timeColumn, Column[] partialResultColumns) {
this.timeColumn = timeColumn;
this.partialResultColumns = partialResultColumns;
}
public boolean isNull() {
return partialResultColumns[0].isNull(0);
}
public long getTime() {
return timeColumn.getLong(0);
}
public Column[] getPartialResult() {
return partialResultColumns;
}
public List<TSDataType> getDataTypes() {
return Arrays.stream(partialResultColumns)
.sequential()
.map(Column::getDataType)
.collect(Collectors.toList());
}
}
}