| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.utils.datastructure; |
| |
| import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; |
| import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager; |
| import org.apache.iotdb.db.utils.MathUtils; |
| |
| import org.apache.tsfile.enums.TSDataType; |
| import org.apache.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.tsfile.read.TimeValuePair; |
| import org.apache.tsfile.read.common.TimeRange; |
| import org.apache.tsfile.read.common.block.TsBlockBuilder; |
| import org.apache.tsfile.utils.BitMap; |
| import org.apache.tsfile.utils.TsPrimitiveType; |
| |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.ARRAY_SIZE; |
| import static org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM; |
| |
| public abstract class FloatTVList extends TVList { |
| // list of primitive array, add 1 when expanded -> float primitive array |
| // index relation: arrayIndex -> elementIndex |
| protected List<float[]> values; |
| |
| FloatTVList() { |
| super(); |
| values = new ArrayList<>(); |
| } |
| |
| public static FloatTVList newList() { |
| switch (TVLIST_SORT_ALGORITHM) { |
| case QUICK: |
| return new QuickFloatTVList(); |
| case BACKWARD: |
| return new BackFloatTVList(); |
| default: |
| return new TimFloatTVList(); |
| } |
| } |
| |
| @Override |
| public FloatTVList clone() { |
| FloatTVList cloneList = FloatTVList.newList(); |
| cloneAs(cloneList); |
| for (float[] valueArray : values) { |
| cloneList.values.add(cloneValue(valueArray)); |
| } |
| return cloneList; |
| } |
| |
| private float[] cloneValue(float[] array) { |
| float[] cloneArray = new float[array.length]; |
| System.arraycopy(array, 0, cloneArray, 0, array.length); |
| return cloneArray; |
| } |
| |
| @Override |
| public void putFloat(long timestamp, float value) { |
| checkExpansion(); |
| int arrayIndex = rowCount / ARRAY_SIZE; |
| int elementIndex = rowCount % ARRAY_SIZE; |
| maxTime = Math.max(maxTime, timestamp); |
| timestamps.get(arrayIndex)[elementIndex] = timestamp; |
| values.get(arrayIndex)[elementIndex] = value; |
| rowCount++; |
| if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) { |
| sorted = false; |
| } |
| } |
| |
| @Override |
| public float getFloat(int index) { |
| if (index >= rowCount) { |
| throw new ArrayIndexOutOfBoundsException(index); |
| } |
| int arrayIndex = index / ARRAY_SIZE; |
| int elementIndex = index % ARRAY_SIZE; |
| return values.get(arrayIndex)[elementIndex]; |
| } |
| |
| protected void set(int index, long timestamp, float value) { |
| if (index >= rowCount) { |
| throw new ArrayIndexOutOfBoundsException(index); |
| } |
| int arrayIndex = index / ARRAY_SIZE; |
| int elementIndex = index % ARRAY_SIZE; |
| timestamps.get(arrayIndex)[elementIndex] = timestamp; |
| values.get(arrayIndex)[elementIndex] = value; |
| } |
| |
| @Override |
| void clearValue() { |
| if (values != null) { |
| for (float[] dataArray : values) { |
| PrimitiveArrayManager.release(dataArray); |
| } |
| values.clear(); |
| } |
| } |
| |
| @Override |
| protected void expandValues() { |
| values.add((float[]) getPrimitiveArraysByType(TSDataType.FLOAT)); |
| } |
| |
| @Override |
| public TimeValuePair getTimeValuePair(int index) { |
| return new TimeValuePair( |
| getTime(index), TsPrimitiveType.getByType(TSDataType.FLOAT, getFloat(index))); |
| } |
| |
| @Override |
| protected TimeValuePair getTimeValuePair( |
| int index, long time, Integer floatPrecision, TSEncoding encoding) { |
| float value = getFloat(index); |
| if (!Float.isNaN(value) && (encoding == TSEncoding.RLE || encoding == TSEncoding.TS_2DIFF)) { |
| value = MathUtils.roundWithGivenPrecision(value, floatPrecision); |
| } |
| return new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.FLOAT, value)); |
| } |
| |
| @Override |
| protected void writeValidValuesIntoTsBlock( |
| TsBlockBuilder builder, |
| int floatPrecision, |
| TSEncoding encoding, |
| List<TimeRange> deletionList) { |
| Integer deleteCursor = 0; |
| for (int i = 0; i < rowCount; i++) { |
| if (!isPointDeleted(getTime(i), deletionList, deleteCursor) |
| && (i == rowCount - 1 || getTime(i) != getTime(i + 1))) { |
| builder.getTimeColumnBuilder().writeLong(getTime(i)); |
| builder |
| .getColumnBuilder(0) |
| .writeFloat(roundValueWithGivenPrecision(getFloat(i), floatPrecision, encoding)); |
| builder.declarePosition(); |
| } |
| } |
| } |
| |
| @Override |
| protected void releaseLastValueArray() { |
| PrimitiveArrayManager.release(values.remove(values.size() - 1)); |
| } |
| |
| @Override |
| public void putFloats(long[] time, float[] value, BitMap bitMap, int start, int end) { |
| checkExpansion(); |
| |
| int idx = start; |
| // constraint: time.length + timeIdxOffset == value.length |
| int timeIdxOffset = 0; |
| if (bitMap != null && !bitMap.isAllUnmarked()) { |
| // time array is a reference, should clone necessary time values |
| long[] clonedTime = new long[end - start]; |
| System.arraycopy(time, start, clonedTime, 0, end - start); |
| time = clonedTime; |
| timeIdxOffset = start; |
| // drop null at the end of value array |
| int nullCnt = |
| dropNullValThenUpdateMaxTimeAndSorted(time, value, bitMap, start, end, timeIdxOffset); |
| end -= nullCnt; |
| } else { |
| updateMaxTimeAndSorted(time, start, end); |
| } |
| |
| while (idx < end) { |
| int inputRemaining = end - idx; |
| int arrayIdx = rowCount / ARRAY_SIZE; |
| int elementIdx = rowCount % ARRAY_SIZE; |
| int internalRemaining = ARRAY_SIZE - elementIdx; |
| if (internalRemaining >= inputRemaining) { |
| // the remaining inputs can fit the last array, copy all remaining inputs into last array |
| System.arraycopy( |
| time, idx - timeIdxOffset, timestamps.get(arrayIdx), elementIdx, inputRemaining); |
| System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, inputRemaining); |
| rowCount += inputRemaining; |
| break; |
| } else { |
| // the remaining inputs cannot fit the last array, fill the last array and create a new |
| // one and enter the next loop |
| System.arraycopy( |
| time, idx - timeIdxOffset, timestamps.get(arrayIdx), elementIdx, internalRemaining); |
| System.arraycopy(value, idx, values.get(arrayIdx), elementIdx, internalRemaining); |
| idx += internalRemaining; |
| rowCount += internalRemaining; |
| checkExpansion(); |
| } |
| } |
| } |
| |
| // move null values to the end of time array and value array, then return number of null values |
| int dropNullValThenUpdateMaxTimeAndSorted( |
| long[] time, float[] values, BitMap bitMap, int start, int end, int tIdxOffset) { |
| long inPutMinTime = Long.MAX_VALUE; |
| boolean inputSorted = true; |
| |
| int nullCnt = 0; |
| for (int vIdx = start; vIdx < end; vIdx++) { |
| if (bitMap.isMarked(vIdx)) { |
| nullCnt++; |
| continue; |
| } |
| // move value ahead to replace null |
| int tIdx = vIdx - tIdxOffset; |
| if (nullCnt != 0) { |
| time[tIdx - nullCnt] = time[tIdx]; |
| values[vIdx - nullCnt] = values[vIdx]; |
| } |
| // update maxTime and sorted |
| tIdx = tIdx - nullCnt; |
| inPutMinTime = Math.min(inPutMinTime, time[tIdx]); |
| maxTime = Math.max(maxTime, time[tIdx]); |
| if (inputSorted && tIdx > 0 && time[tIdx - 1] > time[tIdx]) { |
| inputSorted = false; |
| } |
| } |
| |
| sorted = sorted && inputSorted && (rowCount == 0 || inPutMinTime >= getTime(rowCount - 1)); |
| return nullCnt; |
| } |
| |
| @Override |
| public TSDataType getDataType() { |
| return TSDataType.FLOAT; |
| } |
| |
| @Override |
| public int serializedSize() { |
| return Byte.BYTES + Integer.BYTES + rowCount * (Long.BYTES + Float.BYTES); |
| } |
| |
| @Override |
| public void serializeToWAL(IWALByteBufferView buffer) { |
| WALWriteUtils.write(TSDataType.FLOAT, buffer); |
| buffer.putInt(rowCount); |
| for (int rowIdx = 0; rowIdx < rowCount; ++rowIdx) { |
| buffer.putLong(getTime(rowIdx)); |
| buffer.putFloat(getFloat(rowIdx)); |
| } |
| } |
| |
| public static FloatTVList deserialize(DataInputStream stream) throws IOException { |
| FloatTVList tvList = FloatTVList.newList(); |
| int rowCount = stream.readInt(); |
| long[] times = new long[rowCount]; |
| float[] values = new float[rowCount]; |
| for (int rowIdx = 0; rowIdx < rowCount; ++rowIdx) { |
| times[rowIdx] = stream.readLong(); |
| values[rowIdx] = stream.readFloat(); |
| } |
| tvList.putFloats(times, values, null, 0, rowCount); |
| return tvList; |
| } |
| } |