| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.queryengine.execution.operator.process.join.merge; |
| |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; |
| import org.apache.iotdb.tsfile.read.common.block.TsBlock; |
| import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; |
| import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; |
| |
| import java.util.List; |
| |
| /** has more than one input column, but these columns' time is not overlapped. */ |
| public class NonOverlappedMultiColumnMerger implements ColumnMerger { |
| |
| private final List<InputLocation> inputLocations; |
| |
| private final TimeComparator comparator; |
| |
| // index for inputLocations indicating current iterating TsBlock's InputLocation |
| private int index; |
| |
| /** |
| * these columns' time should never be overlapped. |
| * |
| * @param inputLocations The time order in TsBlock represented by inputLocations should be |
| * incremented by timestamp if it is order by time asc, otherwise decreased by timestamp if it |
| * is order by time desc |
| */ |
| public NonOverlappedMultiColumnMerger( |
| List<InputLocation> inputLocations, TimeComparator comparator) { |
| this.inputLocations = inputLocations; |
| this.comparator = comparator; |
| this.index = 0; |
| } |
| |
| @Override |
| public void mergeColumn( |
| TsBlock[] inputTsBlocks, |
| int[] inputIndex, |
| int[] updatedInputIndex, |
| TimeColumnBuilder timeBuilder, |
| long currentEndTime, |
| ColumnBuilder columnBuilder) { |
| // move to next InputLocation if current InputLocation's column has been consumed up |
| moveToNextIfNecessary(inputTsBlocks); |
| // merge current column |
| SingleColumnMerger.mergeOneColumn( |
| inputTsBlocks, |
| inputIndex, |
| updatedInputIndex, |
| timeBuilder, |
| currentEndTime, |
| columnBuilder, |
| inputLocations.get(index), |
| comparator); |
| } |
| |
| @Override |
| public void mergeColumn( |
| TsBlock[] inputTsBlocks, |
| int[] inputIndex, |
| int[] updatedInputIndex, |
| long currentTime, |
| ColumnBuilder columnBuilder) { |
| // move to next InputLocation if current InputLocation's column has been consumed up |
| moveToNextIfNecessary(inputTsBlocks); |
| // merge current column |
| SingleColumnMerger.mergeOneColumn( |
| inputTsBlocks, |
| inputIndex, |
| updatedInputIndex, |
| currentTime, |
| columnBuilder, |
| inputLocations.get(index)); |
| } |
| |
| private void moveToNextIfNecessary(TsBlock[] inputTsBlocks) { |
| // if it is already at the last index, don't need to move |
| if (index == inputLocations.size() - 1) { |
| return; |
| } |
| // if inputTsBlocks[tsBlockIndex] is null, means current index's TsBlock has been consumed up |
| while (index < inputLocations.size() - 1 |
| && inputTsBlocks[inputLocations.get(index).getTsBlockIndex()] == null) { |
| index++; |
| } |
| } |
| } |