blob: b6bb5c46e76d2e34f7dc98e8419848528a811a08 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.transformation.datastructure.row;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.transformation.dag.util.InputRowUtils;
import org.apache.iotdb.db.queryengine.transformation.datastructure.Cache;
import org.apache.iotdb.db.queryengine.transformation.datastructure.SerializableList;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/** An elastic list of records that implements memory control using LRU strategy. */
public class ElasticSerializableRowRecordList {
protected static final int MEMORY_CHECK_THRESHOLD = 1000;
protected TSDataType[] dataTypes;
protected String queryId;
protected float memoryLimitInMB;
protected int internalRowRecordListCapacity;
protected int numCacheBlock;
protected LRUCache cache;
protected List<SerializableRowRecordList> rowRecordLists;
/** Mark bitMaps of correct index when one row has at least one null field. */
protected List<BitMap> bitMaps;
protected int size;
protected int evictionUpperBound;
protected boolean disableMemoryControl;
protected int[] indexListOfTextFields;
protected int byteArrayLengthForMemoryControl;
protected long totalByteArrayLengthLimit;
protected long totalByteArrayLength;
/**
* Construct a ElasticSerializableRowRecordList.
*
* @param dataTypes Data types of columns.
* @param queryId Query ID.
* @param memoryLimitInMB Memory limit.
* @param numCacheBlock Number of cache blocks.
* @throws QueryProcessException by SerializableRowRecordList.calculateCapacity
*/
public ElasticSerializableRowRecordList(
TSDataType[] dataTypes, String queryId, float memoryLimitInMB, int numCacheBlock)
throws QueryProcessException {
this.dataTypes = dataTypes;
this.queryId = queryId;
this.memoryLimitInMB = memoryLimitInMB;
int allocatableCapacity =
SerializableRowRecordList.calculateCapacity(
dataTypes,
memoryLimitInMB,
SerializableList.INITIAL_BYTE_ARRAY_LENGTH_FOR_MEMORY_CONTROL);
internalRowRecordListCapacity = allocatableCapacity / numCacheBlock;
if (internalRowRecordListCapacity == 0) {
numCacheBlock = 1;
internalRowRecordListCapacity = allocatableCapacity;
}
this.numCacheBlock = numCacheBlock;
cache = new ElasticSerializableRowRecordList.LRUCache(numCacheBlock);
rowRecordLists = new ArrayList<>();
bitMaps = new ArrayList<>();
size = 0;
evictionUpperBound = 0;
disableMemoryControl = true;
int textFieldsCount = 0;
for (TSDataType dataType : dataTypes) {
if (dataType.equals(TSDataType.TEXT)) {
++textFieldsCount;
disableMemoryControl = false;
}
}
indexListOfTextFields = new int[textFieldsCount];
int fieldIndex = 0;
for (int i = 0; i < dataTypes.length; ++i) {
if (dataTypes[i].equals(TSDataType.TEXT)) {
indexListOfTextFields[fieldIndex++] = i;
}
}
byteArrayLengthForMemoryControl = SerializableList.INITIAL_BYTE_ARRAY_LENGTH_FOR_MEMORY_CONTROL;
totalByteArrayLengthLimit = 0;
totalByteArrayLength = 0;
}
protected ElasticSerializableRowRecordList(
TSDataType[] dataTypes,
String queryId,
float memoryLimitInMB,
int internalRowRecordListCapacity,
int numCacheBlock) {
this.dataTypes = dataTypes;
this.queryId = queryId;
this.memoryLimitInMB = memoryLimitInMB;
this.internalRowRecordListCapacity = internalRowRecordListCapacity;
this.numCacheBlock = numCacheBlock;
cache = new ElasticSerializableRowRecordList.LRUCache(numCacheBlock);
rowRecordLists = new ArrayList<>();
bitMaps = new ArrayList<>();
size = 0;
evictionUpperBound = 0;
disableMemoryControl = true;
}
public int size() {
return size;
}
public TSDataType[] getDataTypes() {
return dataTypes;
}
public long getTime(int index) throws IOException {
return cache
.get(index / internalRowRecordListCapacity)
.getTime(index % internalRowRecordListCapacity);
}
public Object[] getRowRecord(int index) throws IOException {
return cache
.get(index / internalRowRecordListCapacity)
.getRowRecord(index % internalRowRecordListCapacity);
}
/** true if any field except the timestamp in the current row is null. */
public boolean fieldsHasAnyNull(int index) {
return bitMaps
.get(index / internalRowRecordListCapacity)
.isMarked(index % internalRowRecordListCapacity);
}
public void put(Object[] rowRecord) throws IOException, QueryProcessException {
put(rowRecord, InputRowUtils.hasNullField(rowRecord));
}
/**
* Put the row in the list with an any-field-null marker, this method is faster than calling put
* directly.
*
* @throws IOException by checkMemoryUsage()
* @throws QueryProcessException by checkMemoryUsage()
*/
private void put(Object[] rowRecord, boolean hasNullField)
throws IOException, QueryProcessException {
checkExpansion();
cache.get(size / internalRowRecordListCapacity).put(rowRecord);
if (hasNullField) {
bitMaps.get(size / internalRowRecordListCapacity).mark(size % internalRowRecordListCapacity);
}
++size;
if (!disableMemoryControl) {
totalByteArrayLengthLimit +=
(long) indexListOfTextFields.length * byteArrayLengthForMemoryControl;
if (rowRecord == null) {
totalByteArrayLength +=
(long) indexListOfTextFields.length * byteArrayLengthForMemoryControl;
} else {
for (int indexListOfTextField : indexListOfTextFields) {
Binary binary = (Binary) rowRecord[indexListOfTextField];
totalByteArrayLength += binary == null ? 0 : binary.getLength();
}
checkMemoryUsage();
}
}
}
private void checkExpansion() {
if (size % internalRowRecordListCapacity == 0) {
rowRecordLists.add(
SerializableRowRecordList.newSerializableRowRecordList(
queryId, dataTypes, internalRowRecordListCapacity));
bitMaps.add(new BitMap(internalRowRecordListCapacity));
}
}
protected void checkMemoryUsage() throws IOException, QueryProcessException {
if (size % MEMORY_CHECK_THRESHOLD != 0 || totalByteArrayLength <= totalByteArrayLengthLimit) {
return;
}
int newByteArrayLengthForMemoryControl = byteArrayLengthForMemoryControl;
while ((long) newByteArrayLengthForMemoryControl * size < totalByteArrayLength) {
newByteArrayLengthForMemoryControl *= 2;
}
int newInternalTVListCapacity =
SerializableRowRecordList.calculateCapacity(
dataTypes, memoryLimitInMB, newByteArrayLengthForMemoryControl)
/ numCacheBlock;
if (0 < newInternalTVListCapacity) {
applyNewMemoryControlParameters(
newByteArrayLengthForMemoryControl, newInternalTVListCapacity);
return;
}
int delta =
(int)
((totalByteArrayLength - totalByteArrayLengthLimit)
/ size
/ indexListOfTextFields.length
/ SerializableList.INITIAL_BYTE_ARRAY_LENGTH_FOR_MEMORY_CONTROL);
newByteArrayLengthForMemoryControl =
byteArrayLengthForMemoryControl
+ 2 * (delta + 1) * SerializableList.INITIAL_BYTE_ARRAY_LENGTH_FOR_MEMORY_CONTROL;
newInternalTVListCapacity =
SerializableRowRecordList.calculateCapacity(
dataTypes, memoryLimitInMB, newByteArrayLengthForMemoryControl)
/ numCacheBlock;
if (0 < newInternalTVListCapacity) {
applyNewMemoryControlParameters(
newByteArrayLengthForMemoryControl, newInternalTVListCapacity);
return;
}
throw new QueryProcessException("Memory is not enough for current query.");
}
protected void applyNewMemoryControlParameters(
int newByteArrayLengthForMemoryControl, int newInternalRowRecordListCapacity)
throws IOException, QueryProcessException {
ElasticSerializableRowRecordList newElasticSerializableRowRecordList =
new ElasticSerializableRowRecordList(
dataTypes, queryId, memoryLimitInMB, newInternalRowRecordListCapacity, numCacheBlock);
newElasticSerializableRowRecordList.evictionUpperBound = evictionUpperBound;
int internalListEvictionUpperBound = evictionUpperBound / newInternalRowRecordListCapacity;
for (int i = 0; i < internalListEvictionUpperBound; ++i) {
newElasticSerializableRowRecordList.rowRecordLists.add(null);
newElasticSerializableRowRecordList.bitMaps.add(null);
}
newElasticSerializableRowRecordList.size =
internalListEvictionUpperBound * newInternalRowRecordListCapacity;
for (int i = newElasticSerializableRowRecordList.size; i < evictionUpperBound; ++i) {
newElasticSerializableRowRecordList.put(null);
}
for (int i = evictionUpperBound; i < size; ++i) {
newElasticSerializableRowRecordList.put(getRowRecord(i), fieldsHasAnyNull(i));
}
internalRowRecordListCapacity = newInternalRowRecordListCapacity;
cache = newElasticSerializableRowRecordList.cache;
rowRecordLists = newElasticSerializableRowRecordList.rowRecordLists;
bitMaps = newElasticSerializableRowRecordList.bitMaps;
byteArrayLengthForMemoryControl = newByteArrayLengthForMemoryControl;
totalByteArrayLengthLimit =
(long) size * indexListOfTextFields.length * byteArrayLengthForMemoryControl;
}
/**
* Set the upper bound.
*
* @param evictionUpperBound the index of the first element that cannot be evicted. in other
* words, elements whose index are <b>less than</b> the evictionUpperBound can be evicted.
*/
public void setEvictionUpperBound(int evictionUpperBound) {
this.evictionUpperBound = evictionUpperBound;
}
private class LRUCache extends Cache {
LRUCache(int capacity) {
super(capacity);
}
SerializableRowRecordList get(int targetIndex) throws IOException {
if (!containsKey(targetIndex)) {
if (cacheCapacity <= super.size()) {
int lastIndex = getLast();
if (lastIndex < evictionUpperBound / internalRowRecordListCapacity) {
rowRecordLists.set(lastIndex, null);
bitMaps.set(lastIndex, null);
} else {
rowRecordLists.get(lastIndex).serialize();
}
}
rowRecordLists.get(targetIndex).deserialize();
}
putKey(targetIndex);
return rowRecordLists.get(targetIndex);
}
}
}