blob: 1fd51be0699e3d484f8a57eac2da6e3137ddebc2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.transformation.datastructure.tv;
import org.apache.iotdb.commons.udf.utils.UDFBinaryTransformer;
import org.apache.iotdb.db.queryengine.transformation.api.LayerPointReader;
import org.apache.iotdb.db.queryengine.transformation.api.YieldableState;
import org.apache.iotdb.db.queryengine.transformation.datastructure.Cache;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.type.Binary;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.BatchData;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ElasticSerializableTVList implements PointCollector {
public static ElasticSerializableTVList newElasticSerializableTVList(
TSDataType dataType, String queryId, float memoryLimitInMB, int cacheSize) {
return dataType.equals(TSDataType.TEXT)
? new ElasticSerializableBinaryTVList(queryId, memoryLimitInMB, cacheSize)
: new ElasticSerializableTVList(dataType, queryId, memoryLimitInMB, cacheSize);
}
protected TSDataType dataType;
protected String queryId;
protected float memoryLimitInMB;
protected int internalTVListCapacity;
protected int cacheSize;
protected LRUCache cache;
protected List<SerializableTVList> tvLists;
/**
* the bitmap used to indicate whether one value is null in the tvLists. The size of bitMap is the
* same as tvLists and the length of whole bits is the same as tvLists' length.
*/
protected List<BitMap> bitMaps;
protected int size;
protected int evictionUpperBound;
protected ElasticSerializableTVList(
TSDataType dataType, String queryId, float memoryLimitInMB, int cacheSize) {
this.dataType = dataType;
this.queryId = queryId;
this.memoryLimitInMB = memoryLimitInMB;
int allocatableCapacity = SerializableTVList.calculateCapacity(dataType, memoryLimitInMB);
internalTVListCapacity = allocatableCapacity / cacheSize;
if (internalTVListCapacity == 0) {
cacheSize = 1;
internalTVListCapacity = allocatableCapacity;
}
this.cacheSize = cacheSize;
cache = new LRUCache(cacheSize);
bitMaps = new ArrayList<>();
tvLists = new ArrayList<>();
size = 0;
evictionUpperBound = 0;
}
protected ElasticSerializableTVList(
TSDataType dataType,
String queryId,
float memoryLimitInMB,
int internalTVListCapacity,
int cacheSize) {
this.dataType = dataType;
this.queryId = queryId;
this.memoryLimitInMB = memoryLimitInMB;
this.internalTVListCapacity = internalTVListCapacity;
this.cacheSize = cacheSize;
cache = new LRUCache(cacheSize);
bitMaps = new ArrayList<>();
tvLists = new ArrayList<>();
size = 0;
evictionUpperBound = 0;
}
public TSDataType getDataType() {
return dataType;
}
public int size() {
return size;
}
public boolean isNull(int index) {
return bitMaps.get(index / internalTVListCapacity).isMarked(index % internalTVListCapacity);
}
public long getTime(int index) throws IOException {
return cache.get(index / internalTVListCapacity).getTimeByIndex(index % internalTVListCapacity);
}
public int getInt(int index) throws IOException {
return cache.get(index / internalTVListCapacity).getIntByIndex(index % internalTVListCapacity);
}
public long getLong(int index) throws IOException {
return cache.get(index / internalTVListCapacity).getLongByIndex(index % internalTVListCapacity);
}
public float getFloat(int index) throws IOException {
return cache
.get(index / internalTVListCapacity)
.getFloatByIndex(index % internalTVListCapacity);
}
public double getDouble(int index) throws IOException {
return cache
.get(index / internalTVListCapacity)
.getDoubleByIndex(index % internalTVListCapacity);
}
public boolean getBoolean(int index) throws IOException {
return cache
.get(index / internalTVListCapacity)
.getBooleanByIndex(index % internalTVListCapacity);
}
public org.apache.tsfile.utils.Binary getBinary(int index) throws IOException {
return cache
.get(index / internalTVListCapacity)
.getBinaryByIndex(index % internalTVListCapacity);
}
public String getString(int index) throws IOException {
return cache
.get(index / internalTVListCapacity)
.getBinaryByIndex(index % internalTVListCapacity)
.getStringValue(TSFileConfig.STRING_CHARSET);
}
public void put(long timestamp, Object value) throws IOException {
switch (dataType) {
case INT32:
putInt(timestamp, (Integer) value);
break;
case INT64:
putLong(timestamp, (Long) value);
break;
case FLOAT:
putFloat(timestamp, (Float) value);
break;
case DOUBLE:
putDouble(timestamp, (Double) value);
break;
case BOOLEAN:
putBoolean(timestamp, (Boolean) value);
break;
case TEXT:
putBinary(
timestamp,
UDFBinaryTransformer.transformToUDFBinary((org.apache.tsfile.utils.Binary) value));
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", dataType));
}
}
@Override
public void putInt(long timestamp, int value) throws IOException {
checkExpansion();
cache.get(size / internalTVListCapacity).putInt(timestamp, value);
++size;
}
@Override
public void putLong(long timestamp, long value) throws IOException {
checkExpansion();
cache.get(size / internalTVListCapacity).putLong(timestamp, value);
++size;
}
@Override
public void putFloat(long timestamp, float value) throws IOException {
checkExpansion();
cache.get(size / internalTVListCapacity).putFloat(timestamp, value);
++size;
}
@Override
public void putDouble(long timestamp, double value) throws IOException {
checkExpansion();
cache.get(size / internalTVListCapacity).putDouble(timestamp, value);
++size;
}
@Override
public void putBoolean(long timestamp, boolean value) throws IOException {
checkExpansion();
cache.get(size / internalTVListCapacity).putBoolean(timestamp, value);
++size;
}
@Override
public void putBinary(long timestamp, Binary value) throws IOException {
checkExpansion();
cache
.get(size / internalTVListCapacity)
.putBinary(timestamp, UDFBinaryTransformer.transformToBinary(value));
++size;
}
@Override
public void putString(long timestamp, String value) throws IOException {
checkExpansion();
cache.get(size / internalTVListCapacity).putBinary(timestamp, BytesUtils.valueOf(value));
++size;
}
public void putNull(long timestamp) throws IOException {
switch (dataType) {
case INT32:
putInt(timestamp, 0);
break;
case INT64:
putLong(timestamp, 0L);
break;
case FLOAT:
putFloat(timestamp, 0.0F);
break;
case DOUBLE:
putDouble(timestamp, 0.0D);
break;
case BOOLEAN:
putBoolean(timestamp, false);
break;
case TEXT:
putBinary(
timestamp,
UDFBinaryTransformer.transformToUDFBinary(org.apache.tsfile.utils.Binary.EMPTY_VALUE));
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", dataType));
}
bitMaps.get((size - 1) / internalTVListCapacity).mark((size - 1) % internalTVListCapacity);
}
private void checkExpansion() {
if (size % internalTVListCapacity == 0) {
tvLists.add(SerializableTVList.newSerializableTVList(dataType, queryId));
bitMaps.add(new BitMap(internalTVListCapacity));
}
}
public LayerPointReader constructPointReaderUsingTrivialEvictionStrategy() {
return new LayerPointReader() {
private int currentPointIndex = -1;
@Override
public boolean isConstantPointReader() {
return false;
}
@Override
public YieldableState yield() {
throw new UnsupportedOperationException();
}
@Override
public boolean next() {
if (size - 1 <= currentPointIndex) {
return false;
}
++currentPointIndex;
return true;
}
@Override
public void readyForNext() {
setEvictionUpperBound(currentPointIndex + 1);
}
@Override
public TSDataType getDataType() {
return dataType;
}
@Override
public long currentTime() throws IOException {
return getTime(currentPointIndex);
}
@Override
public int currentInt() throws IOException {
return getInt(currentPointIndex);
}
@Override
public long currentLong() throws IOException {
return getLong(currentPointIndex);
}
@Override
public float currentFloat() throws IOException {
return getFloat(currentPointIndex);
}
@Override
public double currentDouble() throws IOException {
return getDouble(currentPointIndex);
}
@Override
public boolean currentBoolean() throws IOException {
return getBoolean(currentPointIndex);
}
@Override
public org.apache.tsfile.utils.Binary currentBinary() throws IOException {
return getBinary(currentPointIndex);
}
@Override
public boolean isCurrentNull() throws IOException {
return isNull(currentPointIndex);
}
};
}
/**
* Set the upper bound.
*
* @param evictionUpperBound the index of the first element that cannot be evicted. in other
* words, elements whose index are <b>less than</b> the evictionUpperBound can be evicted.
*/
public void setEvictionUpperBound(int evictionUpperBound) {
this.evictionUpperBound = evictionUpperBound;
}
private class LRUCache extends Cache {
LRUCache(int capacity) {
super(capacity);
}
BatchData get(int targetIndex) throws IOException {
if (!containsKey(targetIndex)) {
if (cacheCapacity <= super.size()) {
int lastIndex = getLast();
if (lastIndex < evictionUpperBound / internalTVListCapacity) {
tvLists.set(lastIndex, null);
bitMaps.set(lastIndex, null);
} else {
tvLists.get(lastIndex).serialize();
}
}
tvLists.get(targetIndex).deserialize();
}
putKey(targetIndex);
return tvLists.get(targetIndex);
}
}
}