blob: 16328bf2294a794563b158fac7e46e261f55b4d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.process.join.merge;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.column.TimeColumn;
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
import java.util.List;
/** has more than one input column, but these columns' time is overlapped. */
public class MultiColumnMerger implements ColumnMerger {
private final List<InputLocation> inputLocations;
public MultiColumnMerger(List<InputLocation> inputLocations) {
this.inputLocations = inputLocations;
}
@SuppressWarnings("squid:S3776")
@Override
public void mergeColumn(
TsBlock[] inputTsBlocks,
int[] inputIndex,
int[] updatedInputIndex,
TimeColumnBuilder timeBuilder,
long currentEndTime,
ColumnBuilder columnBuilder) {
int rowCount = timeBuilder.getPositionCount();
// init startIndex for each input locations
for (InputLocation inputLocation : inputLocations) {
int tsBlockIndex = inputLocation.getTsBlockIndex();
updatedInputIndex[tsBlockIndex] = inputIndex[tsBlockIndex];
}
for (int i = 0; i < rowCount; i++) {
// record whether current row already has value to be appended
boolean appendValue = false;
// we don't use MinHeap here to choose the right column, because inputLocations.size() won't
// be very large.
// Assuming inputLocations.size() will be less than 5, performance of for-loop may be better
// than PriorityQueue.
for (InputLocation location : inputLocations) {
int tsBlockIndex = location.getTsBlockIndex();
int columnIndex = location.getValueColumnIndex();
int index = updatedInputIndex[tsBlockIndex];
// current location's input column is not empty
if (!ColumnMerger.empty(tsBlockIndex, inputTsBlocks, updatedInputIndex)) {
TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn();
Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
// time of current location's input column is equal to current row's time
if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
// value of current location's input column is not null
if (!valueColumn.isNull(index)) {
columnBuilder.write(valueColumn, index);
appendValue = true;
}
// increase the index
index++;
// update the index after merging
updatedInputIndex[tsBlockIndex] = index;
// we can safely set appendValue to true and then break the loop, because these input
// columns' time is not overlapped
if (appendValue) {
break;
}
}
}
}
// all input columns are null at current row, so just append a null
if (!appendValue) {
columnBuilder.appendNull();
}
}
}
@Override
public void mergeColumn(
TsBlock[] inputTsBlocks,
int[] inputIndex,
int[] updatedInputIndex,
long currentTime,
ColumnBuilder columnBuilder) {
// init startIndex for each input locations
for (InputLocation inputLocation : inputLocations) {
int tsBlockIndex = inputLocation.getTsBlockIndex();
updatedInputIndex[tsBlockIndex] = inputIndex[tsBlockIndex];
}
// record whether current row already has value to be appended
boolean appendValue = false;
// we don't use MinHeap here to choose the right column, because inputLocations.size() won't
// be very large.
// Assuming inputLocations.size() will be less than 5, performance of for-loop may be better
// than PriorityQueue.
for (InputLocation location : inputLocations) {
int tsBlockIndex = location.getTsBlockIndex();
int columnIndex = location.getValueColumnIndex();
int index = updatedInputIndex[tsBlockIndex];
// current location's input column is not empty
if (!ColumnMerger.empty(tsBlockIndex, inputTsBlocks, updatedInputIndex)) {
TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn();
Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
// time of current location's input column is equal to current row's time
if (timeColumn.getLong(index) == currentTime) {
// value of current location's input column is not null
// here we only append value if there is no value appended before and current value is
// null
// belonging to more than one DataRegion, we need to choose which one is latest
if (!appendValue && !valueColumn.isNull(index)) {
columnBuilder.write(valueColumn, index);
appendValue = true;
}
// increase the index
index++;
// update the index after merging
updatedInputIndex[tsBlockIndex] = index;
}
}
}
// all input columns are null at current row, so just append a null
if (!appendValue) {
columnBuilder.appendNull();
}
}
}