blob: c24b76798e416c31245df80918cba6f8ace527dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.read.reader.page;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.reader.series.PaginationController;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import static org.apache.iotdb.tsfile.read.reader.series.PaginationController.UNLIMITED_PAGINATION_CONTROLLER;
public class AlignedPageReader implements IPageReader {
private final TimePageReader timePageReader;
private final List<ValuePageReader> valuePageReaderList;
private final int valueCount;
private final Filter globalTimeFilter;
private Filter pushDownFilter;
private PaginationController paginationController = UNLIMITED_PAGINATION_CONTROLLER;
private boolean isModified;
private TsBlockBuilder builder;
private static final int MASK = 0x80;
@SuppressWarnings("squid:S107")
public AlignedPageReader(
PageHeader timePageHeader,
ByteBuffer timePageData,
Decoder timeDecoder,
List<PageHeader> valuePageHeaderList,
List<ByteBuffer> valuePageDataList,
List<TSDataType> valueDataTypeList,
List<Decoder> valueDecoderList,
Filter globalTimeFilter) {
timePageReader = new TimePageReader(timePageHeader, timePageData, timeDecoder);
isModified = timePageReader.isModified();
valuePageReaderList = new ArrayList<>(valuePageHeaderList.size());
for (int i = 0; i < valuePageHeaderList.size(); i++) {
if (valuePageHeaderList.get(i) != null) {
ValuePageReader valuePageReader =
new ValuePageReader(
valuePageHeaderList.get(i),
valuePageDataList.get(i),
valueDataTypeList.get(i),
valueDecoderList.get(i));
valuePageReaderList.add(valuePageReader);
isModified = isModified || valuePageReader.isModified();
} else {
valuePageReaderList.add(null);
}
}
this.globalTimeFilter = globalTimeFilter;
this.valueCount = valuePageReaderList.size();
}
@Override
public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
BatchData pageData = BatchDataFactory.createBatchData(TSDataType.VECTOR, ascending, false);
int timeIndex = -1;
Object[] rowValues = new Object[valueCount];
while (timePageReader.hasNextTime()) {
long timestamp = timePageReader.nextTime();
timeIndex++;
TsPrimitiveType[] v = new TsPrimitiveType[valueCount];
// if all the sub sensors' value are null in current row, just discard it
boolean hasNotNullValues = false;
for (int i = 0; i < valueCount; i++) {
ValuePageReader pageReader = valuePageReaderList.get(i);
if (pageReader != null) {
v[i] = pageReader.nextValue(timestamp, timeIndex);
rowValues[i] = (v[i] == null) ? null : v[i].getValue();
} else {
v[i] = null;
rowValues[i] = null;
}
if (rowValues[i] != null) {
hasNotNullValues = true;
}
}
if (hasNotNullValues && satisfyRecordFilter(timestamp, rowValues)) {
pageData.putVector(timestamp, v);
}
}
return pageData.flip();
}
private boolean satisfyRecordFilter(long timestamp, Object[] rowValues) {
return (globalTimeFilter == null || globalTimeFilter.satisfyRow(timestamp, rowValues))
&& (pushDownFilter == null || pushDownFilter.satisfyRow(timestamp, rowValues));
}
@Override
public boolean timeAllSelected() {
for (int index = 0; index < getMeasurementCount(); index++) {
if (!hasNullValue(index)) {
// When there is any value page point number that is the same as the time page,
// it means that all timestamps in time page will be selected.
return true;
}
}
return false;
}
@Override
public int getMeasurementCount() {
return valueCount;
}
public IPointReader getLazyPointReader() throws IOException {
return new LazyLoadAlignedPagePointReader(timePageReader, valuePageReaderList);
}
private boolean allPageDataSatisfy() {
return !isModified
&& timeAllSelected()
&& globalTimeFilterAllSatisfy()
&& pushDownFilterAllSatisfy();
}
private boolean globalTimeFilterAllSatisfy() {
return globalTimeFilter == null || globalTimeFilter.allSatisfy(this);
}
private boolean pushDownFilterAllSatisfy() {
return pushDownFilter == null || pushDownFilter.allSatisfy(this);
}
@Override
public TsBlock getAllSatisfiedData() throws IOException {
long[] timeBatch = timePageReader.getNextTimeBatch();
if (allPageDataSatisfy()) {
buildResultWithoutAnyFilterAndDelete(timeBatch);
return builder.build();
}
// if all the sub sensors' value are null in current row, just discard it
// if !filter.satisfy, discard this row
boolean[] keepCurrentRow = new boolean[timeBatch.length];
boolean globalTimeFilterAllSatisfy = globalTimeFilterAllSatisfy();
if (globalTimeFilterAllSatisfy) {
Arrays.fill(keepCurrentRow, true);
} else {
updateKeepCurrentRowThroughGlobalTimeFilter(keepCurrentRow, timeBatch);
}
boolean[][] isDeleted = null;
if ((isModified || !timeAllSelected()) && valueCount != 0) {
// using bitMap in valuePageReaders to indicate whether columns of current row are all null.
byte[] bitmask = new byte[(timeBatch.length - 1) / 8 + 1];
Arrays.fill(bitmask, (byte) 0x00);
isDeleted = new boolean[valueCount][timeBatch.length];
fillIsDeletedAndBitMask(timeBatch, isDeleted, bitmask);
updateKeepCurrentRowThroughBitmask(keepCurrentRow, bitmask);
}
boolean pushDownFilterAllSatisfy = pushDownFilterAllSatisfy();
// construct time column
// when pushDownFilterAllSatisfy = true, we can skip rows by OFFSET & LIMIT
int readEndIndex = buildTimeColumn(timeBatch, keepCurrentRow, pushDownFilterAllSatisfy);
// construct value columns
buildValueColumns(readEndIndex, keepCurrentRow, isDeleted);
TsBlock unFilteredBlock = builder.build();
if (pushDownFilterAllSatisfy) {
// OFFSET & LIMIT has been consumed in buildTimeColumn
return unFilteredBlock;
}
builder.reset();
return TsBlockUtil.applyFilterAndLimitOffsetToTsBlock(
unFilteredBlock, builder, pushDownFilter, paginationController);
}
private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) {
if (paginationController.hasCurOffset(timeBatch.length)) {
paginationController.consumeOffset(timeBatch.length);
} else {
int readStartIndex = 0;
if (paginationController.hasCurOffset()) {
readStartIndex = (int) paginationController.getCurOffset();
// consume the remaining offset
paginationController.consumeOffset(readStartIndex);
}
// not included
int readEndIndex = timeBatch.length;
if (paginationController.hasCurLimit() && paginationController.getCurLimit() > 0) {
readEndIndex =
Math.min(readEndIndex, readStartIndex + (int) paginationController.getCurLimit());
paginationController.consumeLimit((long) readEndIndex - readStartIndex);
}
// construct time column
for (int i = readStartIndex; i < readEndIndex; i++) {
builder.getTimeColumnBuilder().writeLong(timeBatch[i]);
builder.declarePosition();
}
// construct value columns
for (int i = 0; i < valueCount; i++) {
ValuePageReader pageReader = valuePageReaderList.get(i);
if (pageReader != null) {
pageReader.writeColumnBuilderWithNextBatch(
readStartIndex, readEndIndex, builder.getColumnBuilder(i));
} else {
builder.getColumnBuilder(i).appendNull(readEndIndex - readStartIndex);
}
}
}
}
private int buildTimeColumn(
long[] timeBatch, boolean[] keepCurrentRow, boolean pushDownFilterAllSatisfy) {
if (pushDownFilterAllSatisfy) {
return buildTimeColumnWithPagination(timeBatch, keepCurrentRow);
} else {
return buildTimeColumnWithoutPagination(timeBatch, keepCurrentRow);
}
}
private int buildTimeColumnWithPagination(long[] timeBatch, boolean[] keepCurrentRow) {
int readEndIndex = timeBatch.length;
for (int rowIndex = 0; rowIndex < timeBatch.length; rowIndex++) {
if (keepCurrentRow[rowIndex]) {
if (paginationController.hasCurOffset()) {
paginationController.consumeOffset();
keepCurrentRow[rowIndex] = false;
} else if (paginationController.hasCurLimit()) {
builder.getTimeColumnBuilder().writeLong(timeBatch[rowIndex]);
builder.declarePosition();
paginationController.consumeLimit();
} else {
readEndIndex = rowIndex;
break;
}
}
}
return readEndIndex;
}
private int buildTimeColumnWithoutPagination(long[] timeBatch, boolean[] keepCurrentRow) {
int readEndIndex = 0;
for (int i = 0; i < timeBatch.length; i++) {
if (keepCurrentRow[i]) {
builder.getTimeColumnBuilder().writeLong(timeBatch[i]);
builder.declarePosition();
readEndIndex = i;
}
}
return readEndIndex + 1;
}
private void buildValueColumns(
int readEndIndex, boolean[] keepCurrentRow, boolean[][] isDeleted) {
for (int i = 0; i < valueCount; i++) {
ValuePageReader pageReader = valuePageReaderList.get(i);
if (pageReader != null) {
if (pageReader.isModified()) {
pageReader.writeColumnBuilderWithNextBatch(
readEndIndex,
builder.getColumnBuilder(i),
keepCurrentRow,
Objects.requireNonNull(isDeleted)[i]);
} else {
pageReader.writeColumnBuilderWithNextBatch(
readEndIndex, builder.getColumnBuilder(i), keepCurrentRow);
}
} else {
for (int j = 0; j < readEndIndex; j++) {
if (keepCurrentRow[j]) {
builder.getColumnBuilder(i).appendNull();
}
}
}
}
}
private void fillIsDeletedAndBitMask(long[] timeBatch, boolean[][] isDeleted, byte[] bitmask) {
for (int columnIndex = 0; columnIndex < valueCount; columnIndex++) {
ValuePageReader pageReader = valuePageReaderList.get(columnIndex);
if (pageReader != null) {
byte[] bitmap = pageReader.getBitmap();
if (pageReader.isModified()) {
pageReader.fillIsDeleted(timeBatch, isDeleted[columnIndex]);
updateBitmapThroughIsDeleted(bitmap, isDeleted[columnIndex]);
}
for (int i = 0, n = bitmask.length; i < n; i++) {
bitmask[i] = (byte) (bitmap[i] | bitmask[i]);
}
}
}
}
private void updateBitmapThroughIsDeleted(byte[] bitmap, boolean[] isDeleted) {
for (int i = 0, n = isDeleted.length; i < n; i++) {
if (isDeleted[i]) {
int shift = i % 8;
bitmap[i / 8] = (byte) (bitmap[i / 8] & (~(MASK >>> shift)));
}
}
}
private void updateKeepCurrentRowThroughGlobalTimeFilter(
boolean[] keepCurrentRow, long[] timeBatch) {
for (int i = 0, n = timeBatch.length; i < n; i++) {
keepCurrentRow[i] = globalTimeFilter.satisfy(timeBatch[i], null);
}
}
private void updateKeepCurrentRowThroughBitmask(boolean[] keepCurrentRow, byte[] bitmask) {
for (int i = 0, n = bitmask.length; i < n; i++) {
if (bitmask[i] == (byte) 0xFF) {
// 8 rows are not all null, do nothing
} else if (bitmask[i] == (byte) 0x00) {
Arrays.fill(keepCurrentRow, i * 8, Math.min(i * 8 + 8, keepCurrentRow.length), false);
} else {
for (int j = 0; j < 8 && (i * 8 + j < keepCurrentRow.length); j++) {
if (((bitmask[i] & 0xFF) & (MASK >>> j)) == 0) {
keepCurrentRow[i * 8 + j] = false;
}
}
}
}
}
public void setDeleteIntervalList(List<List<TimeRange>> list) {
for (int i = 0; i < valueCount; i++) {
if (valuePageReaderList.get(i) != null) {
valuePageReaderList.get(i).setDeleteIntervalList(list.get(i));
}
}
}
@Override
public Statistics<? extends Serializable> getStatistics() {
return valuePageReaderList.size() == 1 && valuePageReaderList.get(0) != null
? valuePageReaderList.get(0).getStatistics()
: timePageReader.getStatistics();
}
@Override
public Statistics<? extends Serializable> getTimeStatistics() {
return timePageReader.getStatistics();
}
@Override
public Optional<Statistics<? extends Serializable>> getMeasurementStatistics(
int measurementIndex) {
ValuePageReader valuePageReader = valuePageReaderList.get(measurementIndex);
return Optional.ofNullable(valuePageReader == null ? null : valuePageReader.getStatistics());
}
@Override
public boolean hasNullValue(int measurementIndex) {
long rowCount = getTimeStatistics().getCount();
Optional<Statistics<? extends Serializable>> statistics =
getMeasurementStatistics(measurementIndex);
return statistics.map(stat -> stat.hasNullValue(rowCount)).orElse(true);
}
@Override
public void addRecordFilter(Filter filter) {
this.pushDownFilter = filter;
}
@Override
public void setLimitOffset(PaginationController paginationController) {
this.paginationController = paginationController;
}
@Override
public boolean isModified() {
return isModified;
}
@Override
public void initTsBlockBuilder(List<TSDataType> dataTypes) {
if (paginationController.hasCurLimit()) {
builder =
new TsBlockBuilder(
(int)
Math.min(
paginationController.getCurLimit(),
timePageReader.getStatistics().getCount()),
dataTypes);
} else {
builder = new TsBlockBuilder((int) timePageReader.getStatistics().getCount(), dataTypes);
}
}
}