design draft
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java index a1f73ef..397319f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java
@@ -19,6 +19,7 @@ package org.apache.iotdb.db.mpp.aggregation; +import org.apache.iotdb.db.mpp.execution.operator.IWindow; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -40,7 +41,7 @@ protected List<InputLocation[]> inputLocationList; protected final AggregationStep step; - protected TimeRange curTimeRange = new TimeRange(0, Long.MAX_VALUE); + protected IWindow curWindow; // Used for SeriesAggregateScanOperator public Aggregator(Accumulator accumulator, AggregationStep step) { @@ -64,7 +65,7 @@ step.isInputRaw(), "Step in SeriesAggregateScanOperator and RawDataAggregateOperator can only process raw input"); if (inputLocationList == null) { - return accumulator.addInput(tsBlock.getTimeAndValueColumn(0), curTimeRange); + return accumulator.addInput(tsBlock.getTimeAndValueColumn(0), curWindow); } else { int lastReadReadIndex = 0; for (InputLocation[] inputLocations : inputLocationList) { @@ -75,7 +76,7 @@ timeValueColumn[0] = tsBlock.getTimeColumn(); timeValueColumn[1] = tsBlock.getColumn(inputLocations[0].getValueColumnIndex()); lastReadReadIndex = - Math.max(lastReadReadIndex, accumulator.addInput(timeValueColumn, curTimeRange)); + Math.max(lastReadReadIndex, accumulator.addInput(timeValueColumn, curWindow)); } return lastReadReadIndex; } @@ -128,20 +129,24 @@ } public void reset() { - curTimeRange = new TimeRange(0, Long.MAX_VALUE); + curWindow = null; accumulator.reset(); } public boolean hasFinalResult() { + if (!curWindow.isTimeWindow()) { + return false; + } return accumulator.hasFinalResult(); } public void updateTimeRange(TimeRange curTimeRange) { reset(); - this.curTimeRange = curTimeRange; + this.curWindow = new TimeWindow(curTimeRange); } - public TimeRange getCurTimeRange() { - return curTimeRange; + public void updateWindow(IWindow curWindow) { + reset(); + this.curWindow = curWindow; } }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java index e58d239..23ed041 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java
@@ -19,6 +19,7 @@ package org.apache.iotdb.db.mpp.aggregation; +import org.apache.iotdb.db.mpp.execution.operator.IWindow; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; @@ -222,21 +223,25 @@ return firstValue.getDataType(); } - protected int addIntInput(Column[] column, TimeRange timeRange) { - int curPositionCount = column[0].getPositionCount(); - long curMinTime = timeRange.getMin(); - long curMaxTime = timeRange.getMax(); + protected int addIntInput(Column[] column, IWindow curWindow) { + int windowControlColumnIndex = curWindow.getControlColumnIndex(); + boolean isTimeWindow = curWindow.isTimeWindow(); + int curPositionCount = column[windowControlColumnIndex].getPositionCount(); + for (int i = 0; i < curPositionCount; i++) { - long curTime = column[0].getLong(i); - if (curTime > curMaxTime || curTime < curMinTime) { + if (!curWindow.satisfy(column[windowControlColumnIndex])) { return i; } + + curWindow.acceptOnePoint(); if (!column[1].isNull(i)) { updateIntFirstValue(column[1].getInt(i), curTime); - return i; + if (isTimeWindow) { + return i; + } } } - return column[0].getPositionCount(); + return column[windowControlColumnIndex].getPositionCount(); } protected void updateIntFirstValue(int value, long curTime) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java index 10035dd..eb0acde 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueAccumulator.java
@@ -19,6 +19,7 @@ package org.apache.iotdb.db.mpp.aggregation; +import org.apache.iotdb.db.mpp.execution.operator.IWindow; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; @@ -221,7 +222,7 @@ return lastValue.getDataType(); } - protected int addIntInput(Column[] column, TimeRange timeRange) { + protected int addIntInput(Column[] column, IWindow curWindow) { int curPositionCount = column[0].getPositionCount(); long curMinTime = timeRange.getMin(); long curMaxTime = timeRange.getMax();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java index ec848b3..8d328d9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/slidingwindow/SlidingWindowAggregator.java
@@ -45,6 +45,8 @@ // cached partial aggregation result of pre-aggregate windows protected Deque<PartialAggregationResult> deque; + protected TimeRange curTimeRange; + public SlidingWindowAggregator( Accumulator accumulator, List<InputLocation[]> inputLocationList, AggregationStep step) { super(accumulator, step, inputLocationList);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java index 5bbb1ad..fb33af7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/AggregationUtil.java
@@ -132,12 +132,10 @@ /** Append a row of aggregation results to the result tsBlock. */ public static void appendAggregationResult( - TsBlockBuilder tsBlockBuilder, - List<? extends Aggregator> aggregators, - ITimeRangeIterator timeRangeIterator) { + TsBlockBuilder tsBlockBuilder, List<? extends Aggregator> aggregators, long ouputTime) { TimeColumnBuilder timeColumnBuilder = tsBlockBuilder.getTimeColumnBuilder(); // Use start time of current time range as time column - timeColumnBuilder.writeLong(timeRangeIterator.currentOutputTime()); + timeColumnBuilder.writeLong(ouputTime); ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders(); int columnIndex = 0; for (Aggregator aggregator : aggregators) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/IWindow.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/IWindow.java new file mode 100644 index 0000000..21c4b70 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/IWindow.java
@@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.execution.operator; + +import org.apache.iotdb.tsfile.read.common.block.column.Column; + +public interface IWindow { + + int getControlColumnIndex(); + + boolean satisfy(Column column); + + boolean isTimeWindow(); + + void acceptOnePoint(); +}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/IWindowManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/IWindowManager.java new file mode 100644 index 0000000..67cbb4a --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/IWindowManager.java
@@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.execution.operator; + +import org.apache.iotdb.tsfile.read.common.block.TsBlock; + +public interface IWindowManager { + + boolean isCurWindowReady(); + + void initCurWindow(TsBlock tsBlock); + + boolean hasNext(); + + void next(); + + long currentOutputTime(); + + IWindow getCurWindow(); + + TsBlock skipOutOfWindowPoints(TsBlock inputTsBlock); +}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java index d959688..a0023e1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -21,14 +21,14 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator; import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator; +import org.apache.iotdb.db.mpp.execution.operator.IWindow; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; -import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import org.apache.iotdb.tsfile.utils.Pair; import java.util.List; -import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.calculateAggregationFromRawData; +import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.appendAggregationResult; +import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.isAllAggregatorsHasFinalResult; /** * RawDataAggregationOperator is used to process raw data tsBlock input calculating using value @@ -69,16 +69,54 @@ } } - // update result using aggregators - updateResultTsBlock(); - return true; } private boolean calcFromRawData() { - Pair<Boolean, TsBlock> calcResult = - calculateAggregationFromRawData(inputTsBlock, aggregators, curTimeRange, ascending); - inputTsBlock = calcResult.getRight(); - return calcResult.getLeft(); + if (inputTsBlock == null || inputTsBlock.isEmpty()) { + return false; + } + + if (!windowManager.isCurWindowReady()) { + windowManager.initCurWindow(inputTsBlock); + IWindow curWindow = windowManager.getCurWindow(); + for (Aggregator aggregator : aggregators) { + aggregator.updateWindow(curWindow); + } + } + + inputTsBlock = windowManager.skipOutOfWindowPoints(inputTsBlock); + + int lastReadRowIndex = 0; + for (Aggregator aggregator : aggregators) { + // current agg method has been calculated + if (aggregator.hasFinalResult()) { + continue; + } + + lastReadRowIndex = Math.max(lastReadRowIndex, aggregator.processTsBlock(inputTsBlock)); + } + if (lastReadRowIndex >= inputTsBlock.getPositionCount()) { + inputTsBlock = null; + if (isAllAggregatorsHasFinalResult(aggregators)) { + updateResultTsBlock(); + return true; + } + return false; + } else { + inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex); + updateResultTsBlock(); + return true; + } + } + + @Override + protected void updateResultTsBlock() { + appendAggregationResult(resultTsBlockBuilder, aggregators, windowManager.currentOutputTime()); + if (windowManager.hasNext()) { + windowManager.next(); + } else { + finish = true; + } } }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java index 8632041..d9c7592 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -21,6 +21,7 @@ import org.apache.iotdb.db.mpp.aggregation.Aggregator; import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator; +import org.apache.iotdb.db.mpp.execution.operator.IWindowManager; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -50,6 +51,8 @@ // current interval of aggregation window [curStartTime, curEndTime) protected TimeRange curTimeRange; + protected IWindowManager windowManager; + protected final List<Aggregator> aggregators; // using for building result tsBlock @@ -91,9 +94,14 @@ return child.isBlocked(); } + protected boolean finish; + @Override public boolean hasNext() { - return curTimeRange != null || timeRangeIterator.hasNextTimeRange(); + if (finish) { + return false; + } + return inputTsBlock != null || child.hasNext(); } @Override @@ -105,18 +113,17 @@ // reset operator state canCallNext = true; - while (System.nanoTime() - start < maxRuntime - && (curTimeRange != null || timeRangeIterator.hasNextTimeRange()) - && !resultTsBlockBuilder.isFull()) { - if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) { - // move to next time window - curTimeRange = timeRangeIterator.nextTimeRange(); + while (System.nanoTime() - start < maxRuntime && hasNext() && !resultTsBlockBuilder.isFull()) { - // clear previous aggregation result - for (Aggregator aggregator : aggregators) { - aggregator.updateTimeRange(curTimeRange); - } - } + // if (curTimeRange == null && timeRangeIterator.hasNextTimeRange()) { + // // move to next time window + // curTimeRange = timeRangeIterator.nextTimeRange(); + // + // // clear previous aggregation result + // for (Aggregator aggregator : aggregators) { + // aggregator.updateTimeRange(curTimeRange); + // } + // } // calculate aggregation result on current time window if (!calculateNextAggregationResult()) { @@ -147,7 +154,8 @@ protected void updateResultTsBlock() { curTimeRange = null; - appendAggregationResult(resultTsBlockBuilder, aggregators, timeRangeIterator); + appendAggregationResult( + resultTsBlockBuilder, aggregators, timeRangeIterator.currentOutputTime()); } @Override