| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.queryengine.execution.operator.process; |
| |
| import org.apache.iotdb.db.queryengine.execution.operator.Operator; |
| import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; |
| import org.apache.iotdb.db.queryengine.execution.operator.process.fill.ILinearFill; |
| |
| import com.google.common.util.concurrent.ListenableFuture; |
| import org.apache.tsfile.block.column.Column; |
| import org.apache.tsfile.read.common.block.TsBlock; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static java.util.Objects.requireNonNull; |
| |
| /** Used for linear fill. */ |
| public class LinearFillOperator implements ProcessOperator { |
| |
| private final OperatorContext operatorContext; |
| private final ILinearFill[] fillArray; |
| private final Operator child; |
| private final int outputColumnCount; |
| private final List<TsBlock> cachedTsBlock; |
| |
| private final List<Long> cachedRowIndex; |
| |
| private long currentRowIndex = 0; |
| // next TsBlock Index for each Column |
| private final int[] nextTsBlockIndex; |
| |
| /** |
| * indicate whether we can call child.next(). it's used to make sure that child.next() will only |
| * be called once in LinearFillOperator.next(). |
| */ |
| private boolean canCallNext; |
| // indicate whether there is more TsBlock for child operator |
| private boolean noMoreTsBlock; |
| |
| public LinearFillOperator( |
| OperatorContext operatorContext, ILinearFill[] fillArray, Operator child) { |
| this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); |
| checkArgument( |
| fillArray != null && fillArray.length > 0, "fillArray should not be null or empty"); |
| this.fillArray = fillArray; |
| this.child = requireNonNull(child, "child operator is null"); |
| this.outputColumnCount = fillArray.length; |
| this.cachedTsBlock = new ArrayList<>(); |
| this.cachedRowIndex = new ArrayList<>(); |
| this.nextTsBlockIndex = new int[outputColumnCount]; |
| Arrays.fill(this.nextTsBlockIndex, 1); |
| this.canCallNext = false; |
| this.noMoreTsBlock = true; |
| } |
| |
| @Override |
| public OperatorContext getOperatorContext() { |
| return operatorContext; |
| } |
| |
| @Override |
| public ListenableFuture<?> isBlocked() { |
| return child.isBlocked(); |
| } |
| |
| @SuppressWarnings("squid:S3776") |
| @Override |
| public TsBlock next() throws Exception { |
| |
| // make sure we call child.next() at most once |
| if (cachedTsBlock.isEmpty()) { |
| canCallNext = false; |
| TsBlock nextTsBlock = child.nextWithTimer(); |
| // child operator's calculation is not finished, so we just return null |
| if (nextTsBlock == null || nextTsBlock.isEmpty()) { |
| return nextTsBlock; |
| } else { // otherwise, we cache it |
| cachedTsBlock.add(nextTsBlock); |
| cachedRowIndex.add(currentRowIndex); |
| currentRowIndex += nextTsBlock.getPositionCount(); |
| } |
| } |
| |
| TsBlock originTsBlock = cachedTsBlock.get(0); |
| long currentEndRowIndex = cachedRowIndex.get(0) + originTsBlock.getPositionCount() - 1; |
| // Step 1: judge whether we can fill current TsBlock, if TsBlock that we can get is not enough, |
| // we just return null |
| for (int columnIndex = 0; columnIndex < outputColumnCount; columnIndex++) { |
| // current valueColumn can't be filled using current information |
| if (fillArray[columnIndex].needPrepareForNext( |
| currentEndRowIndex, originTsBlock.getColumn(columnIndex))) { |
| // current cached TsBlock is not enough to fill this column |
| while (!isCachedTsBlockEnough(columnIndex, currentEndRowIndex)) { |
| // if we failed to get next TsBlock |
| if (!tryToGetNextTsBlock()) { |
| // there is no more TsBlock, so we have to fill this Column |
| if (noMoreTsBlock) { |
| // break the while-loop, continue to judge next Column |
| break; |
| } else { |
| // there is still more TsBlock, so current calculation is not finished, and we just |
| // return null |
| return null; |
| } |
| } |
| } |
| } |
| } |
| // Step 2: fill current TsBlock |
| originTsBlock = cachedTsBlock.remove(0); |
| long startRowIndex = cachedRowIndex.remove(0); |
| Column[] columns = new Column[outputColumnCount]; |
| for (int i = 0; i < outputColumnCount; i++) { |
| columns[i] = |
| fillArray[i].fill( |
| originTsBlock.getTimeColumn(), originTsBlock.getColumn(i), startRowIndex); |
| } |
| TsBlock result = |
| new TsBlock(originTsBlock.getPositionCount(), originTsBlock.getTimeColumn(), columns); |
| for (int i = 0; i < outputColumnCount; i++) { |
| // make sure nextTsBlockIndex for each column >= 1 |
| nextTsBlockIndex[i] = Math.max(1, nextTsBlockIndex[i] - 1); |
| } |
| return result; |
| } |
| |
| @Override |
| public boolean hasNext() throws Exception { |
| // if child.hasNext() return false, it means that there is no more tsBlocks |
| noMoreTsBlock = !child.hasNextWithTimer(); |
| // if there is more tsBlock, we can call child.next() once |
| canCallNext = !noMoreTsBlock; |
| return !cachedTsBlock.isEmpty() || !noMoreTsBlock; |
| } |
| |
| @Override |
| public void close() throws Exception { |
| child.close(); |
| } |
| |
| @Override |
| public boolean isFinished() throws Exception { |
| return cachedTsBlock.isEmpty() && child.isFinished(); |
| } |
| |
| @Override |
| public long calculateMaxPeekMemory() { |
| // while doing linear fill, we may need to copy the corresponding column if there exists null |
| // values, and we may also need to cache next TsBlock to get next not null value |
| // so the max peek memory may be triple or more, here we just use 3 as the estimated factor |
| // because in most cases, we will get next not null value in next TsBlock |
| return 3 * child.calculateMaxPeekMemory() + child.calculateRetainedSizeAfterCallingNext(); |
| } |
| |
| @Override |
| public long calculateMaxReturnSize() { |
| return child.calculateMaxReturnSize(); |
| } |
| |
| @Override |
| public long calculateRetainedSizeAfterCallingNext() { |
| // we can safely ignore two lines cached in LinearFill |
| return child.calculateRetainedSizeAfterCallingNext(); |
| } |
| |
| /** |
| * Judge whether we can use current cached TsBlock to fill Column. |
| * |
| * @param columnIndex index for column which need to be filled |
| * @param currentEndRowIndex row index for endTime of column which need to be filled |
| * @return true if current cached TsBlock is enough to fill Column at columnIndex, otherwise |
| * false. |
| */ |
| private boolean isCachedTsBlockEnough(int columnIndex, long currentEndRowIndex) { |
| // next TsBlock has already been in the cachedTsBlock |
| while (nextTsBlockIndex[columnIndex] < cachedTsBlock.size()) { |
| TsBlock nextTsBlock = cachedTsBlock.get(nextTsBlockIndex[columnIndex]); |
| long startRowIndex = cachedRowIndex.get(nextTsBlockIndex[columnIndex]); |
| nextTsBlockIndex[columnIndex]++; |
| if (fillArray[columnIndex].prepareForNext( |
| startRowIndex, |
| currentEndRowIndex, |
| nextTsBlock.getTimeColumn(), |
| nextTsBlock.getColumn(columnIndex))) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Try to get next TsBlock |
| * |
| * @return true if we succeed to get next TsBlock and add it into cachedTsBlock, otherwise false |
| * @throws Exception errors happened while getting next batch data |
| */ |
| private boolean tryToGetNextTsBlock() throws Exception { |
| if (canCallNext) { // if we can call child.next(), we call that and cache it in |
| // cachedTsBlock |
| canCallNext = false; |
| TsBlock nextTsBlock = child.nextWithTimer(); |
| // child operator's calculation is not finished, so we just return null |
| if (nextTsBlock == null || nextTsBlock.isEmpty()) { |
| return false; |
| } else { // otherwise, we cache it |
| cachedTsBlock.add(nextTsBlock); |
| cachedRowIndex.add(currentRowIndex); |
| currentRowIndex += nextTsBlock.getPositionCount(); |
| return true; |
| } |
| } |
| return false; |
| } |
| } |