blob: 5be353e4d47812d66f8bfdfbf1c3cba0710df284 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.execution.operator.process.join.merge;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
/** only has one input column. */
public class SingleColumnMerger implements ColumnMerger {
private final InputLocation location;
private final TimeComparator comparator;
public SingleColumnMerger(InputLocation location, TimeComparator comparator) {
this.location = location;
this.comparator = comparator;
}
@Override
public void mergeColumn(
TsBlock[] inputTsBlocks,
int[] inputIndex,
int[] updatedInputIndex,
TimeColumnBuilder timeBuilder,
long currentEndTime,
ColumnBuilder columnBuilder) {
mergeOneColumn(
inputTsBlocks,
inputIndex,
updatedInputIndex,
timeBuilder,
currentEndTime,
columnBuilder,
location,
comparator);
}
@Override
public void mergeColumn(
TsBlock[] inputTsBlocks,
int[] inputIndex,
int[] updatedInputIndex,
long currentTime,
ColumnBuilder columnBuilder) {
mergeOneColumn(
inputTsBlocks, inputIndex, updatedInputIndex, currentTime, columnBuilder, location);
}
@SuppressWarnings({"squid:S107", "squid:S3776"})
public static void mergeOneColumn(
TsBlock[] inputTsBlocks,
int[] inputIndex,
int[] updatedInputIndex,
TimeColumnBuilder timeBuilder,
long currentEndTime,
ColumnBuilder columnBuilder,
InputLocation location,
TimeComparator comparator) {
int tsBlockIndex = location.getTsBlockIndex();
int columnIndex = location.getValueColumnIndex();
int rowCount = timeBuilder.getPositionCount();
int index = inputIndex[tsBlockIndex];
// input column is empty or current time of input column is already larger than currentEndTime
// just appendNull rowCount null
if (ColumnMerger.empty(tsBlockIndex, inputTsBlocks, inputIndex)
|| !comparator.satisfyCurEndTime(
inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
columnBuilder.appendNull(rowCount);
} else {
// read from input column and write it into columnBuilder
TimeColumn timeColumn = inputTsBlocks[tsBlockIndex].getTimeColumn();
Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
for (int i = 0; i < rowCount; i++) {
// current index reaches the size of input column or current time of input column is already
// larger than currentEndTime, use null column to fill the remaining
if (timeColumn.getPositionCount() == index
|| !comparator.satisfyCurEndTime(
inputTsBlocks[tsBlockIndex].getTimeByIndex(index), currentEndTime)) {
columnBuilder.appendNull(rowCount - i);
break;
}
// current time of input column is equal to result row's time
if (timeColumn.getLong(index) == timeBuilder.getTime(i)) {
// if input column's value at index is null, append a null value
if (valueColumn.isNull(index)) {
columnBuilder.appendNull();
} else {
// if input column's value at index is not null, append the value
columnBuilder.write(valueColumn, index);
}
// increase the index
index++;
} else {
// otherwise, append a null
columnBuilder.appendNull();
}
}
}
// update the index after merging
updatedInputIndex[tsBlockIndex] = index;
}
public static void mergeOneColumn(
TsBlock[] inputTsBlocks,
int[] inputIndex,
int[] updatedInputIndex,
long currentTime,
ColumnBuilder columnBuilder,
InputLocation location) {
int tsBlockIndex = location.getTsBlockIndex();
int columnIndex = location.getValueColumnIndex();
int index = inputIndex[tsBlockIndex];
// input column is empty or current time of input column is already larger than currentEndTime
// just appendNull
if (ColumnMerger.empty(tsBlockIndex, inputTsBlocks, inputIndex)
|| inputTsBlocks[tsBlockIndex].getTimeByIndex(index) != currentTime) {
columnBuilder.appendNull();
} else {
// read from input column and write it into columnBuilder
Column valueColumn = inputTsBlocks[tsBlockIndex].getColumn(columnIndex);
if (valueColumn.isNull(index)) {
columnBuilder.appendNull();
} else {
columnBuilder.write(valueColumn, index);
}
index++;
}
// update the index after merging
updatedInputIndex[tsBlockIndex] = index;
}
}