| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.tsfile.read.common; |
| |
| import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.utils.Binary; |
| import org.apache.iotdb.tsfile.utils.TsPrimitiveType; |
| |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.util.LinkedList; |
| |
| /** |
| * This class is for reading and writing batch data in reverse. The data source is from mergeReader. |
| * For example, the time sequence from mergeReader is 1000 -> 1, to keep the consistency that the |
| * timestamp should be ascending. It will be written in reverse, i.e. the timeRet will be [1, 1000]. |
| * Then it can be handled the same as DescReadBatchData. |
| */ |
| public class DescReadWriteBatchData extends DescReadBatchData { |
| |
| public DescReadWriteBatchData(TSDataType dataType) { |
| super(); |
| this.batchDataType = BatchDataType.DESC_READ_WRITE; |
| this.dataType = dataType; |
| this.readCurListIndex = 0; |
| this.readCurArrayIndex = 0; |
| this.writeCurListIndex = 0; |
| this.writeCurArrayIndex = capacity - 1; |
| |
| timeRet = new LinkedList<>(); |
| timeRet.add(new long[capacity]); |
| count = 0; |
| |
| switch (dataType) { |
| case BOOLEAN: |
| booleanRet = new LinkedList<>(); |
| booleanRet.add(new boolean[capacity]); |
| break; |
| case INT32: |
| intRet = new LinkedList<>(); |
| intRet.add(new int[capacity]); |
| break; |
| case INT64: |
| longRet = new LinkedList<>(); |
| longRet.add(new long[capacity]); |
| break; |
| case FLOAT: |
| floatRet = new LinkedList<>(); |
| floatRet.add(new float[capacity]); |
| break; |
| case DOUBLE: |
| doubleRet = new LinkedList<>(); |
| doubleRet.add(new double[capacity]); |
| break; |
| case TEXT: |
| binaryRet = new LinkedList<>(); |
| binaryRet.add(new Binary[capacity]); |
| break; |
| case VECTOR: |
| vectorRet = new LinkedList<>(); |
| vectorRet.add(new TsPrimitiveType[capacity][]); |
| break; |
| default: |
| throw new UnSupportedDataTypeException(String.valueOf(dataType)); |
| } |
| } |
| |
| /** |
| * put boolean data reversely. |
| * |
| * @param t timestamp |
| * @param v boolean data |
| */ |
| @Override |
| public void putBoolean(long t, boolean v) { |
| if (writeCurArrayIndex == -1) { |
| if (capacity >= CAPACITY_THRESHOLD) { |
| ((LinkedList<long[]>) timeRet).addFirst(new long[capacity]); |
| ((LinkedList<boolean[]>) booleanRet).addFirst(new boolean[capacity]); |
| writeCurListIndex++; |
| writeCurArrayIndex = capacity - 1; |
| } else { |
| int newCapacity = capacity << 1; |
| |
| long[] newTimeData = new long[newCapacity]; |
| boolean[] newValueData = new boolean[newCapacity]; |
| |
| System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity); |
| System.arraycopy(booleanRet.get(0), 0, newValueData, newCapacity - capacity, capacity); |
| |
| timeRet.set(0, newTimeData); |
| booleanRet.set(0, newValueData); |
| |
| writeCurArrayIndex = newCapacity - capacity - 1; |
| capacity = newCapacity; |
| } |
| } |
| timeRet.get(0)[writeCurArrayIndex] = t; |
| booleanRet.get(0)[writeCurArrayIndex] = v; |
| |
| writeCurArrayIndex--; |
| count++; |
| } |
| |
| /** |
| * put int data reversely. |
| * |
| * @param t timestamp |
| * @param v int data |
| */ |
| @Override |
| public void putInt(long t, int v) { |
| if (writeCurArrayIndex == -1) { |
| if (capacity >= CAPACITY_THRESHOLD) { |
| ((LinkedList<long[]>) timeRet).addFirst(new long[capacity]); |
| ((LinkedList<int[]>) intRet).addFirst(new int[capacity]); |
| writeCurListIndex++; |
| writeCurArrayIndex = capacity - 1; |
| } else { |
| int newCapacity = capacity << 1; |
| |
| long[] newTimeData = new long[newCapacity]; |
| int[] newValueData = new int[newCapacity]; |
| |
| System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity); |
| System.arraycopy(intRet.get(0), 0, newValueData, newCapacity - capacity, capacity); |
| |
| timeRet.set(0, newTimeData); |
| intRet.set(0, newValueData); |
| |
| writeCurArrayIndex = newCapacity - capacity - 1; |
| capacity = newCapacity; |
| } |
| } |
| timeRet.get(0)[writeCurArrayIndex] = t; |
| intRet.get(0)[writeCurArrayIndex] = v; |
| |
| writeCurArrayIndex--; |
| count++; |
| } |
| |
| /** |
| * put long data reversely. |
| * |
| * @param t timestamp |
| * @param v long data |
| */ |
| @Override |
| public void putLong(long t, long v) { |
| if (writeCurArrayIndex == -1) { |
| if (capacity >= CAPACITY_THRESHOLD) { |
| ((LinkedList<long[]>) timeRet).addFirst(new long[capacity]); |
| ((LinkedList<long[]>) longRet).addFirst(new long[capacity]); |
| writeCurListIndex++; |
| writeCurArrayIndex = capacity - 1; |
| } else { |
| int newCapacity = capacity << 1; |
| |
| long[] newTimeData = new long[newCapacity]; |
| long[] newValueData = new long[newCapacity]; |
| |
| System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity); |
| System.arraycopy(longRet.get(0), 0, newValueData, newCapacity - capacity, capacity); |
| |
| timeRet.set(0, newTimeData); |
| longRet.set(0, newValueData); |
| |
| writeCurArrayIndex = newCapacity - capacity - 1; |
| capacity = newCapacity; |
| } |
| } |
| timeRet.get(0)[writeCurArrayIndex] = t; |
| longRet.get(0)[writeCurArrayIndex] = v; |
| |
| writeCurArrayIndex--; |
| count++; |
| } |
| |
| /** |
| * put float data reversely. |
| * |
| * @param t timestamp |
| * @param v float data |
| */ |
| @Override |
| public void putFloat(long t, float v) { |
| if (writeCurArrayIndex == -1) { |
| if (capacity >= CAPACITY_THRESHOLD) { |
| ((LinkedList<long[]>) timeRet).addFirst(new long[capacity]); |
| ((LinkedList<float[]>) floatRet).addFirst(new float[capacity]); |
| writeCurListIndex++; |
| writeCurArrayIndex = capacity - 1; |
| } else { |
| int newCapacity = capacity << 1; |
| |
| long[] newTimeData = new long[newCapacity]; |
| float[] newValueData = new float[newCapacity]; |
| |
| System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity); |
| System.arraycopy(floatRet.get(0), 0, newValueData, newCapacity - capacity, capacity); |
| |
| timeRet.set(0, newTimeData); |
| floatRet.set(0, newValueData); |
| |
| writeCurArrayIndex = newCapacity - capacity - 1; |
| capacity = newCapacity; |
| } |
| } |
| timeRet.get(0)[writeCurArrayIndex] = t; |
| floatRet.get(0)[writeCurArrayIndex] = v; |
| |
| writeCurArrayIndex--; |
| count++; |
| } |
| |
| /** |
| * put double data reversely. |
| * |
| * @param t timestamp |
| * @param v double data |
| */ |
| @Override |
| public void putDouble(long t, double v) { |
| if (writeCurArrayIndex == -1) { |
| if (capacity >= CAPACITY_THRESHOLD) { |
| ((LinkedList<long[]>) timeRet).addFirst(new long[capacity]); |
| ((LinkedList<double[]>) doubleRet).addFirst(new double[capacity]); |
| writeCurListIndex++; |
| writeCurArrayIndex = capacity - 1; |
| } else { |
| int newCapacity = capacity << 1; |
| |
| long[] newTimeData = new long[newCapacity]; |
| double[] newValueData = new double[newCapacity]; |
| |
| System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity); |
| System.arraycopy(doubleRet.get(0), 0, newValueData, newCapacity - capacity, capacity); |
| |
| timeRet.set(0, newTimeData); |
| doubleRet.set(0, newValueData); |
| |
| writeCurArrayIndex = newCapacity - capacity - 1; |
| capacity = newCapacity; |
| } |
| } |
| timeRet.get(0)[writeCurArrayIndex] = t; |
| doubleRet.get(0)[writeCurArrayIndex] = v; |
| |
| writeCurArrayIndex--; |
| count++; |
| } |
| |
| /** |
| * put binary data reversely. |
| * |
| * @param t timestamp |
| * @param v binary data. |
| */ |
| @Override |
| public void putBinary(long t, Binary v) { |
| if (writeCurArrayIndex == -1) { |
| if (capacity >= CAPACITY_THRESHOLD) { |
| ((LinkedList<long[]>) timeRet).addFirst(new long[capacity]); |
| ((LinkedList<Binary[]>) binaryRet).addFirst(new Binary[capacity]); |
| writeCurListIndex++; |
| writeCurArrayIndex = capacity - 1; |
| } else { |
| int newCapacity = capacity << 1; |
| |
| long[] newTimeData = new long[newCapacity]; |
| Binary[] newValueData = new Binary[newCapacity]; |
| |
| System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity); |
| System.arraycopy(binaryRet.get(0), 0, newValueData, newCapacity - capacity, capacity); |
| |
| timeRet.set(0, newTimeData); |
| binaryRet.set(0, newValueData); |
| |
| writeCurArrayIndex = newCapacity - capacity - 1; |
| capacity = newCapacity; |
| } |
| } |
| timeRet.get(0)[writeCurArrayIndex] = t; |
| binaryRet.get(0)[writeCurArrayIndex] = v; |
| |
| writeCurArrayIndex--; |
| count++; |
| } |
| |
| /** |
| * put vector data. |
| * |
| * @param t timestamp |
| * @param v vector data. |
| */ |
| @Override |
| public void putVector(long t, TsPrimitiveType[] v) { |
| if (writeCurArrayIndex == -1) { |
| if (capacity >= CAPACITY_THRESHOLD) { |
| ((LinkedList<long[]>) timeRet).addFirst(new long[capacity]); |
| ((LinkedList<TsPrimitiveType[][]>) vectorRet).addFirst(new TsPrimitiveType[capacity][]); |
| writeCurListIndex++; |
| writeCurArrayIndex = capacity - 1; |
| } else { |
| int newCapacity = capacity << 1; |
| |
| long[] newTimeData = new long[newCapacity]; |
| TsPrimitiveType[][] newValueData = new TsPrimitiveType[newCapacity][]; |
| |
| System.arraycopy(timeRet.get(0), 0, newTimeData, newCapacity - capacity, capacity); |
| System.arraycopy(vectorRet.get(0), 0, newValueData, newCapacity - capacity, capacity); |
| |
| timeRet.set(0, newTimeData); |
| vectorRet.set(0, newValueData); |
| |
| writeCurArrayIndex = newCapacity - capacity - 1; |
| capacity = newCapacity; |
| } |
| } |
| timeRet.get(0)[writeCurArrayIndex] = t; |
| vectorRet.get(0)[writeCurArrayIndex] = v; |
| |
| writeCurArrayIndex--; |
| count++; |
| } |
| |
| @Override |
| public boolean hasCurrent() { |
| return (readCurListIndex == 0 && readCurArrayIndex > writeCurArrayIndex) |
| || (readCurListIndex > 0 && readCurArrayIndex >= 0); |
| } |
| |
| @Override |
| public void next() { |
| super.readCurArrayIndex--; |
| if ((readCurListIndex == 0 && readCurArrayIndex <= writeCurArrayIndex) |
| || readCurArrayIndex == -1) { |
| super.readCurListIndex--; |
| super.readCurArrayIndex = capacity - 1; |
| } |
| } |
| |
| @Override |
| public void resetBatchData() { |
| super.readCurArrayIndex = capacity - 1; |
| super.readCurListIndex = writeCurListIndex; |
| } |
| |
| @Override |
| public long getTimeByIndex(int idx) { |
| return timeRet |
| .get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1) % capacity]; |
| } |
| |
| @Override |
| public long getLongByIndex(int idx) { |
| return longRet |
| .get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1) % capacity]; |
| } |
| |
| @Override |
| public double getDoubleByIndex(int idx) { |
| return doubleRet |
| .get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1) % capacity]; |
| } |
| |
| @Override |
| public int getIntByIndex(int idx) { |
| return intRet |
| .get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1) % capacity]; |
| } |
| |
| @Override |
| public float getFloatByIndex(int idx) { |
| return floatRet |
| .get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1) % capacity]; |
| } |
| |
| @Override |
| public Binary getBinaryByIndex(int idx) { |
| return binaryRet |
| .get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1) % capacity]; |
| } |
| |
| @Override |
| public boolean getBooleanByIndex(int idx) { |
| return booleanRet |
| .get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1) % capacity]; |
| } |
| |
| @Override |
| public TsPrimitiveType[] getVectorByIndex(int idx) { |
| return vectorRet |
| .get((idx + writeCurArrayIndex + 1) / capacity)[(idx + writeCurArrayIndex + 1) % capacity]; |
| } |
| |
| @Override |
| public void serializeData(DataOutputStream outputStream) throws IOException { |
| switch (dataType) { |
| case BOOLEAN: |
| for (int i = length() - 1; i >= 0; i--) { |
| outputStream.writeLong(getTimeByIndex(i)); |
| outputStream.writeBoolean(getBooleanByIndex(i)); |
| } |
| break; |
| case DOUBLE: |
| for (int i = length() - 1; i >= 0; i--) { |
| outputStream.writeLong(getTimeByIndex(i)); |
| outputStream.writeDouble(getDoubleByIndex(i)); |
| } |
| break; |
| case FLOAT: |
| for (int i = length() - 1; i >= 0; i--) { |
| outputStream.writeLong(getTimeByIndex(i)); |
| outputStream.writeFloat(getFloatByIndex(i)); |
| } |
| break; |
| case TEXT: |
| for (int i = length() - 1; i >= 0; i--) { |
| outputStream.writeLong(getTimeByIndex(i)); |
| Binary binary = getBinaryByIndex(i); |
| outputStream.writeInt(binary.getLength()); |
| outputStream.write(binary.getValues()); |
| } |
| break; |
| case INT64: |
| for (int i = length() - 1; i >= 0; i--) { |
| outputStream.writeLong(getTimeByIndex(i)); |
| outputStream.writeLong(getLongByIndex(i)); |
| } |
| break; |
| case INT32: |
| for (int i = length() - 1; i >= 0; i--) { |
| outputStream.writeLong(getTimeByIndex(i)); |
| outputStream.writeInt(getIntByIndex(i)); |
| } |
| break; |
| case VECTOR: |
| for (int i = length() - 1; i >= 0; i--) { |
| outputStream.writeLong(getTimeByIndex(i)); |
| TsPrimitiveType[] values = getVectorByIndex(i); |
| outputStream.writeInt(values.length); |
| for (TsPrimitiveType value : values) { |
| if (value == null) { |
| outputStream.write(0); |
| } else { |
| outputStream.write(1); |
| outputStream.write(value.getDataType().serialize()); |
| switch (value.getDataType()) { |
| case BOOLEAN: |
| outputStream.writeBoolean(value.getBoolean()); |
| break; |
| case DOUBLE: |
| outputStream.writeDouble(value.getDouble()); |
| break; |
| case FLOAT: |
| outputStream.writeFloat(value.getFloat()); |
| break; |
| case TEXT: |
| Binary binary = value.getBinary(); |
| outputStream.writeInt(binary.getLength()); |
| outputStream.write(binary.getValues()); |
| break; |
| case INT64: |
| outputStream.writeLong(value.getLong()); |
| break; |
| case INT32: |
| outputStream.writeInt(value.getInt()); |
| break; |
| default: |
| throw new UnSupportedDataTypeException(String.valueOf(dataType)); |
| } |
| } |
| } |
| } |
| break; |
| default: |
| throw new UnSupportedDataTypeException(String.valueOf(dataType)); |
| } |
| } |
| |
| /** |
| * Read: When put data, the writeIndex increases while the readIndex remains 0. For descending |
| * read, we need to read from writeIndex to writeCurArrayIndex |
| */ |
| @Override |
| public BatchData flip() { |
| super.readCurArrayIndex = capacity - 1; |
| super.readCurListIndex = writeCurListIndex; |
| return this; |
| } |
| } |