| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.queryengine.execution.operator.process.fill.linear; |
| |
| import org.apache.iotdb.db.queryengine.execution.operator.process.fill.ILinearFill; |
| |
| import org.apache.tsfile.block.column.Column; |
| import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; |
| import org.apache.tsfile.read.common.block.column.TimeColumn; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| |
| /** |
| * The result of Linear Fill functions at timestamp "T" is calculated by performing a linear fitting |
| * method on two time series values, one is at the closest timestamp before T, and the other is at |
| * the closest timestamp after T. Linear Fill function calculation only supports numeric types |
| * including long, int, double and float. |
| */ |
| public abstract class LinearFill implements ILinearFill { |
| |
| // whether previous value is null |
| protected boolean previousIsNull = true; |
| |
| // next row index which is not null |
| private long nextRowIndex = -1; |
| // next row index in current column which is not null |
| private long nextRowIndexInCurrentColumn = -1; |
| |
| // previous time coresponding to previous not null value |
| protected long previousTime = -1; |
| |
| private long nextTime = -1; |
| |
| protected long nextTimeInCurrentColumn = -1; |
| |
| @Override |
| public Column fill(TimeColumn timeColumn, Column valueColumn, long startRowIndex) { |
| int size = valueColumn.getPositionCount(); |
| if (size == 0) { |
| return valueColumn; |
| } |
| // if this valueColumn doesn't have any null value, record the last value, and then return |
| // itself. |
| if (!valueColumn.mayHaveNull()) { |
| previousIsNull = false; |
| // update the value using last non-null value |
| previousTime = timeColumn.getEndTime(); |
| updatePreviousValue(valueColumn, valueColumn.getPositionCount() - 1); |
| return valueColumn; |
| } |
| |
| // if its values are all null |
| if (valueColumn instanceof RunLengthEncodedColumn) { |
| return doWithAllNulls(startRowIndex, size, timeColumn, valueColumn); |
| } else { |
| Object array = createValueArray(size); |
| boolean[] isNull = new boolean[size]; |
| // have null value |
| boolean hasNullValue = false; |
| |
| for (int i = 0; i < size; i++) { |
| // current value is null, we need to fill it |
| if (valueColumn.isNull(i)) { |
| hasNullValue = |
| fill(startRowIndex, i, isNull, timeColumn, valueColumn, array) || hasNullValue; |
| } else { // current is not null |
| // fill value using its own value |
| fillValue(valueColumn, i, array); |
| // update previous value |
| previousTime = timeColumn.getLong(i); |
| updatePreviousValue(valueColumn, i); |
| previousIsNull = false; |
| } |
| } |
| return createFilledValueColumn(array, isNull, hasNullValue, size); |
| } |
| } |
| |
| private Column doWithAllNulls( |
| long startRowIndex, int size, TimeColumn timeColumn, Column valueColumn) { |
| // previous value is null or next value is null, we just return NULL_VALUE_BLOCK |
| if (previousIsNull || nextRowIndex < startRowIndex) { |
| return new RunLengthEncodedColumn(createNullValueColumn(), size); |
| } else { |
| prepareForNextValueInCurrentColumn( |
| startRowIndex + timeColumn.getPositionCount() - 1, |
| timeColumn.getPositionCount(), |
| timeColumn, |
| valueColumn); |
| double[] factors = new double[size]; |
| for (int i = 0; i < size; i++) { |
| factors[i] = getFactor(timeColumn.getLong(i)); |
| } |
| return createFilledValueColumn(factors); |
| } |
| } |
| |
| private boolean fill( |
| long startRowIndex, |
| int i, |
| boolean[] isNull, |
| TimeColumn timeColumn, |
| Column valueColumn, |
| Object array) { |
| long currentRowIndex = startRowIndex + i; |
| prepareForNextValueInCurrentColumn(currentRowIndex, i + 1, timeColumn, valueColumn); |
| // we don't fill it, if either previous value or next value is null |
| if (previousIsNull || nextIsNull(currentRowIndex)) { |
| isNull[i] = true; |
| return true; |
| } else { |
| // fill value using previous and next value |
| // factor is (x - x0) / (x1 - x0) |
| double factor = getFactor(timeColumn.getLong(i)); |
| fillValue(array, i, factor); |
| return false; |
| } |
| } |
| |
| private double getFactor(long currentTime) { |
| return nextTimeInCurrentColumn - previousTime == 0 |
| ? 0.0 |
| : ((double) (currentTime - previousTime)) / (nextTimeInCurrentColumn - previousTime); |
| } |
| |
| /** |
| * Whether need prepare for next. |
| * |
| * @param rowIndex end time of current valueColumn that need to be filled |
| * @param valueColumn valueColumn that need to be filled |
| * @return true if valueColumn can't be filled using current information, and we need to get next |
| * TsBlock and then call prepareForNext. false if valueColumn can be filled using current |
| * information, and we can directly call fill() function |
| */ |
| @Override |
| public boolean needPrepareForNext(long rowIndex, Column valueColumn) { |
| return nextRowIndex < rowIndex && valueColumn.isNull(valueColumn.getPositionCount() - 1); |
| } |
| |
| @Override |
| public boolean prepareForNext( |
| long startRowIndex, long endRowIndex, TimeColumn nextTimeColumn, Column nextValueColumn) { |
| checkArgument( |
| nextTimeColumn.getPositionCount() > 0 && endRowIndex < startRowIndex, |
| "nextColumn's time should be greater than current time"); |
| if (endRowIndex <= nextRowIndex) { |
| return true; |
| } |
| |
| for (int i = 0; i < nextValueColumn.getPositionCount(); i++) { |
| if (!nextValueColumn.isNull(i)) { |
| updateNextValue(nextValueColumn, i); |
| this.nextTime = nextTimeColumn.getLong(i); |
| this.nextRowIndex = startRowIndex + i; |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private boolean nextIsNull(long rowIndex) { |
| return nextRowIndexInCurrentColumn <= rowIndex; |
| } |
| |
| private void prepareForNextValueInCurrentColumn( |
| long currentRowIndex, int startIndex, TimeColumn timeColumn, Column valueColumn) { |
| if (currentRowIndex <= nextRowIndexInCurrentColumn) { |
| return; |
| } |
| for (int i = startIndex; i < valueColumn.getPositionCount(); i++) { |
| if (!valueColumn.isNull(i)) { |
| this.nextRowIndexInCurrentColumn = currentRowIndex + (i - startIndex + 1); |
| this.nextTimeInCurrentColumn = timeColumn.getLong(i); |
| updateNextValueInCurrentColumn(valueColumn, i); |
| return; |
| } |
| } |
| |
| // current column's value is not enough for filling, we should use value of next Column |
| this.nextRowIndexInCurrentColumn = this.nextRowIndex; |
| this.nextTimeInCurrentColumn = this.nextTime; |
| updateNextValueInCurrentColumn(); |
| } |
| |
| abstract void fillValue(Column column, int index, Object array); |
| |
| abstract void fillValue(Object array, int index, double factor); |
| |
| abstract Object createValueArray(int size); |
| |
| abstract Column createNullValueColumn(); |
| |
| abstract Column createFilledValueColumn(double[] factors1); |
| |
| abstract Column createFilledValueColumn( |
| Object array, boolean[] isNull, boolean hasNullValue, int size); |
| |
| abstract void updatePreviousValue(Column column, int index); |
| |
| abstract void updateNextValue(Column nextValueColumn, int index); |
| |
| abstract void updateNextValueInCurrentColumn(Column nextValueColumn, int index); |
| |
| /** update nextValueInCurrentColumn using value of next Column. */ |
| abstract void updateNextValueInCurrentColumn(); |
| } |