| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.queryengine.execution.operator.process.join.merge; |
| |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; |
| import org.apache.iotdb.tsfile.read.common.block.TsBlock; |
| import org.apache.iotdb.tsfile.read.common.block.column.Column; |
| import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; |
| import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; |
| import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; |
| |
| import java.util.List; |
| |
| /** has more than one input column, but these columns' time is overlapped. */ |
| public class MultiColumnMerger implements ColumnMerger { |
| |
| private final List<InputLocation> inputLocations; |
| |
| public MultiColumnMerger(List<InputLocation> inputLocations) { |
| this.inputLocations = inputLocations; |
| } |
| |
| @SuppressWarnings("squid:S3776") |
| @Override |
| public void mergeColumn( |
| TsBlock[] inputTsBlocks, |
| int[] inputIndex, |
| int[] updatedInputIndex, |
| TimeColumnBuilder timeBuilder, |
| long currentEndTime, |
| ColumnBuilder columnBuilder) { |
| int rowCount = timeBuilder.getPositionCount(); |
| |
| // init startIndex for each input locations |
| for (InputLocation inputLocation : inputLocations) { |
| int tsBlockIndex = inputLocation.getTsBlockIndex(); |
| updatedInputIndex[tsBlockIndex] = inputIndex[tsBlockIndex]; |
| } |
| |
| for (int i = 0; i < rowCount; i++) { |
| // record whether current row already has value to be appended |
| boolean appendValue = false; |
| // we don't use MinHeap here to choose the right column, because inputLocations.size() won't |
| // be very large. |
| // Assuming inputLocations.size() will be less than 5, performance of for-loop may be better |
| // than PriorityQueue. |
| for (InputLocation location : inputLocations) { |
| int tsBlockIndex = location.getTsBlockIndex(); |
| int columnIndex = location.getValueColumnIndex(); |
| int index = updatedInputIndex[tsBlockIndex]; |
| |
| // current location's input column is not empty |
| if (!ColumnMerger.empty(tsBlockIndex, inputTsBlocks, updatedInputIndex)) { |
| TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn(); |
| Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex); |
| // time of current location's input column is equal to current row's time |
| if (timeColumn.getLong(index) == timeBuilder.getTime(i)) { |
| // value of current location's input column is not null |
| if (!valueColumn.isNull(index)) { |
| columnBuilder.write(valueColumn, index); |
| appendValue = true; |
| } |
| // increase the index |
| index++; |
| // update the index after merging |
| updatedInputIndex[tsBlockIndex] = index; |
| // we can safely set appendValue to true and then break the loop, because these input |
| // columns' time is not overlapped |
| if (appendValue) { |
| break; |
| } |
| } |
| } |
| } |
| // all input columns are null at current row, so just append a null |
| if (!appendValue) { |
| columnBuilder.appendNull(); |
| } |
| } |
| } |
| |
| @Override |
| public void mergeColumn( |
| TsBlock[] inputTsBlocks, |
| int[] inputIndex, |
| int[] updatedInputIndex, |
| long currentTime, |
| ColumnBuilder columnBuilder) { |
| |
| // init startIndex for each input locations |
| for (InputLocation inputLocation : inputLocations) { |
| int tsBlockIndex = inputLocation.getTsBlockIndex(); |
| updatedInputIndex[tsBlockIndex] = inputIndex[tsBlockIndex]; |
| } |
| |
| // record whether current row already has value to be appended |
| boolean appendValue = false; |
| // we don't use MinHeap here to choose the right column, because inputLocations.size() won't |
| // be very large. |
| // Assuming inputLocations.size() will be less than 5, performance of for-loop may be better |
| // than PriorityQueue. |
| for (InputLocation location : inputLocations) { |
| int tsBlockIndex = location.getTsBlockIndex(); |
| int columnIndex = location.getValueColumnIndex(); |
| int index = updatedInputIndex[tsBlockIndex]; |
| |
| // current location's input column is not empty |
| if (!ColumnMerger.empty(tsBlockIndex, inputTsBlocks, updatedInputIndex)) { |
| TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn(); |
| Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex); |
| // time of current location's input column is equal to current row's time |
| if (timeColumn.getLong(index) == currentTime) { |
| // value of current location's input column is not null |
| // here we only append value if there is no value appended before and current value is |
| // null |
| // belonging to more than one DataRegion, we need to choose which one is latest |
| if (!appendValue && !valueColumn.isNull(index)) { |
| columnBuilder.write(valueColumn, index); |
| appendValue = true; |
| } |
| // increase the index |
| index++; |
| // update the index after merging |
| updatedInputIndex[tsBlockIndex] = index; |
| } |
| } |
| } |
| // all input columns are null at current row, so just append a null |
| if (!appendValue) { |
| columnBuilder.appendNull(); |
| } |
| } |
| } |