| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.utils; |
| |
| import org.apache.iotdb.commons.exception.IoTDBException; |
| import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; |
| import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; |
| |
| import org.apache.tsfile.block.column.Column; |
| import org.apache.tsfile.enums.TSDataType; |
| import org.apache.tsfile.read.common.block.TsBlock; |
| import org.apache.tsfile.utils.Binary; |
| import org.apache.tsfile.utils.BitMap; |
| import org.apache.tsfile.utils.BytesUtils; |
| import org.apache.tsfile.utils.Pair; |
| import org.apache.tsfile.write.UnSupportedDataTypeException; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Optional; |
| |
| /** TimeValuePairUtils to convert between thrift format and TsFile format. */ |
| public class QueryDataSetUtils { |
| |
| private static final int FLAG = 0x01; |
| |
| private QueryDataSetUtils() {} |
| |
| public static Pair<TSQueryDataSet, Boolean> convertTsBlockByFetchSize( |
| IQueryExecution queryExecution, int fetchSize) throws IOException, IoTDBException { |
| boolean finished = false; |
| int columnNum = queryExecution.getOutputValueColumnCount(); |
| // one time column and each value column has an actual value buffer and a bitmap value to |
| // indicate whether it is a null |
| int columnNumWithTime = columnNum * 2 + 1; |
| DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime]; |
| ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime]; |
| for (int i = 0; i < columnNumWithTime; i++) { |
| byteArrayOutputStreams[i] = new ByteArrayOutputStream(); |
| dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]); |
| } |
| |
| int rowCount = 0; |
| int[] valueOccupation = new int[columnNum]; |
| |
| // used to record a bitmap for every 8 points |
| int[] bitmaps = new int[columnNum]; |
| while (rowCount < fetchSize) { |
| Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult(); |
| if (!optionalTsBlock.isPresent()) { |
| finished = true; |
| break; |
| } |
| TsBlock tsBlock = optionalTsBlock.get(); |
| if (!tsBlock.isEmpty()) { |
| int currentCount = tsBlock.getPositionCount(); |
| serializeTsBlock( |
| rowCount, |
| currentCount, |
| tsBlock, |
| columnNum, |
| dataOutputStreams, |
| valueOccupation, |
| bitmaps); |
| rowCount += currentCount; |
| } |
| } |
| |
| fillRemainingBitMap(rowCount, columnNum, dataOutputStreams, bitmaps); |
| |
| TSQueryDataSet tsQueryDataSet = new TSQueryDataSet(); |
| |
| fillTimeColumn(rowCount, byteArrayOutputStreams, tsQueryDataSet); |
| |
| fillValueColumnsAndBitMaps(rowCount, byteArrayOutputStreams, valueOccupation, tsQueryDataSet); |
| |
| return new Pair<>(tsQueryDataSet, finished); |
| } |
| |
| public static TSQueryDataSet convertTsBlockByFetchSize(List<TsBlock> tsBlocks) |
| throws IOException { |
| TSQueryDataSet tsQueryDataSet = new TSQueryDataSet(); |
| |
| // one time column and each value column has an actual value buffer and a bitmap value to |
| // indicate whether it is a null |
| int columnNum = 1; |
| int columnNumWithTime = columnNum * 2 + 1; |
| DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime]; |
| ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime]; |
| for (int i = 0; i < columnNumWithTime; i++) { |
| byteArrayOutputStreams[i] = new ByteArrayOutputStream(); |
| dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]); |
| } |
| |
| int rowCount = 0; |
| int[] valueOccupation = new int[columnNum]; |
| |
| // used to record a bitmap for every 8 points |
| int[] bitmaps = new int[columnNum]; |
| for (TsBlock tsBlock : tsBlocks) { |
| if (tsBlock.isEmpty()) { |
| continue; |
| } |
| |
| int currentCount = tsBlock.getPositionCount(); |
| // serialize time column |
| for (int i = 0; i < currentCount; i++) { |
| // use columnOutput to write byte array |
| dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i)); |
| } |
| |
| // serialize each value column and its bitmap |
| for (int k = 0; k < columnNum; k++) { |
| // get DataOutputStream for current value column and its bitmap |
| DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1]; |
| DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)]; |
| |
| Column column = tsBlock.getColumn(k); |
| TSDataType type = column.getDataType(); |
| switch (type) { |
| case INT32: |
| for (int i = 0; i < currentCount; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[k] = bitmaps[k] << 1; |
| } else { |
| bitmaps[k] = (bitmaps[k] << 1) | FLAG; |
| dataOutputStream.writeInt(column.getInt(i)); |
| valueOccupation[k] += 4; |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[k]); |
| // we should clear the bitmap every 8 points |
| bitmaps[k] = 0; |
| } |
| } |
| break; |
| case INT64: |
| for (int i = 0; i < currentCount; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[k] = bitmaps[k] << 1; |
| } else { |
| bitmaps[k] = (bitmaps[k] << 1) | FLAG; |
| dataOutputStream.writeLong(column.getLong(i)); |
| valueOccupation[k] += 8; |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[k]); |
| // we should clear the bitmap every 8 points |
| bitmaps[k] = 0; |
| } |
| } |
| break; |
| case FLOAT: |
| for (int i = 0; i < currentCount; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[k] = bitmaps[k] << 1; |
| } else { |
| bitmaps[k] = (bitmaps[k] << 1) | FLAG; |
| dataOutputStream.writeFloat(column.getFloat(i)); |
| valueOccupation[k] += 4; |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[k]); |
| // we should clear the bitmap every 8 points |
| bitmaps[k] = 0; |
| } |
| } |
| break; |
| case DOUBLE: |
| for (int i = 0; i < currentCount; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[k] = bitmaps[k] << 1; |
| } else { |
| bitmaps[k] = (bitmaps[k] << 1) | FLAG; |
| dataOutputStream.writeDouble(column.getDouble(i)); |
| valueOccupation[k] += 8; |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[k]); |
| // we should clear the bitmap every 8 points |
| bitmaps[k] = 0; |
| } |
| } |
| break; |
| case BOOLEAN: |
| for (int i = 0; i < currentCount; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[k] = bitmaps[k] << 1; |
| } else { |
| bitmaps[k] = (bitmaps[k] << 1) | FLAG; |
| dataOutputStream.writeBoolean(column.getBoolean(i)); |
| valueOccupation[k] += 1; |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[k]); |
| // we should clear the bitmap every 8 points |
| bitmaps[k] = 0; |
| } |
| } |
| break; |
| case TEXT: |
| for (int i = 0; i < currentCount; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[k] = bitmaps[k] << 1; |
| } else { |
| bitmaps[k] = (bitmaps[k] << 1) | FLAG; |
| Binary binary = column.getBinary(i); |
| dataOutputStream.writeInt(binary.getLength()); |
| dataOutputStream.write(binary.getValues()); |
| valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength(); |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[k]); |
| // we should clear the bitmap every 8 points |
| bitmaps[k] = 0; |
| } |
| } |
| break; |
| default: |
| throw new UnSupportedDataTypeException( |
| String.format("Data type %s is not supported.", type)); |
| } |
| if (k != columnNum - 1) { |
| rowCount -= currentCount; |
| } |
| } |
| } |
| // feed the remaining bitmap |
| int remaining = rowCount % 8; |
| for (int k = 0; k < columnNum; k++) { |
| if (remaining != 0) { |
| DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)]; |
| dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining)); |
| } |
| } |
| |
| // calculate the time buffer size |
| int timeOccupation = rowCount * 8; |
| ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation); |
| timeBuffer.put(byteArrayOutputStreams[0].toByteArray()); |
| timeBuffer.flip(); |
| tsQueryDataSet.setTime(timeBuffer); |
| |
| // calculate the bitmap buffer size |
| int bitmapOccupation = (rowCount + 7) / 8; |
| |
| List<ByteBuffer> bitmapList = new LinkedList<>(); |
| List<ByteBuffer> valueList = new LinkedList<>(); |
| for (int i = 1; i < byteArrayOutputStreams.length; i += 2) { |
| ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]); |
| valueBuffer.put(byteArrayOutputStreams[i].toByteArray()); |
| valueBuffer.flip(); |
| valueList.add(valueBuffer); |
| |
| ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation); |
| bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray()); |
| bitmapBuffer.flip(); |
| bitmapList.add(bitmapBuffer); |
| } |
| tsQueryDataSet.setBitmapList(bitmapList); |
| tsQueryDataSet.setValueList(valueList); |
| return tsQueryDataSet; |
| } |
| |
| private static void serializeTsBlock( |
| int rowCount, |
| int currentCount, |
| TsBlock tsBlock, |
| int columnNum, |
| DataOutputStream[] dataOutputStreams, |
| int[] valueOccupation, |
| int[] bitmaps) |
| throws IOException { |
| // serialize time column |
| for (int i = 0; i < currentCount; i++) { |
| // use columnOutput to write byte array |
| dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i)); |
| } |
| |
| // serialize each value column and its bitmap |
| for (int k = 0; k < columnNum; k++) { |
| // get DataOutputStream for current value column and its bitmap |
| DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1]; |
| DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)]; |
| |
| Column column = tsBlock.getColumn(k); |
| TSDataType type = column.getDataType(); |
| switch (type) { |
| case INT32: |
| doWithInt32Column( |
| rowCount, |
| column, |
| bitmaps, |
| k, |
| dataOutputStream, |
| valueOccupation, |
| dataBitmapOutputStream); |
| break; |
| case INT64: |
| doWithInt64Column( |
| rowCount, |
| column, |
| bitmaps, |
| k, |
| dataOutputStream, |
| valueOccupation, |
| dataBitmapOutputStream); |
| break; |
| case FLOAT: |
| doWithFloatColumn( |
| rowCount, |
| column, |
| bitmaps, |
| k, |
| dataOutputStream, |
| valueOccupation, |
| dataBitmapOutputStream); |
| break; |
| case DOUBLE: |
| doWithDoubleColumn( |
| rowCount, |
| column, |
| bitmaps, |
| k, |
| dataOutputStream, |
| valueOccupation, |
| dataBitmapOutputStream); |
| break; |
| case BOOLEAN: |
| doWithBooleanColumn( |
| rowCount, |
| column, |
| bitmaps, |
| k, |
| dataOutputStream, |
| valueOccupation, |
| dataBitmapOutputStream); |
| break; |
| case TEXT: |
| doWithTextColumn( |
| rowCount, |
| column, |
| bitmaps, |
| k, |
| dataOutputStream, |
| valueOccupation, |
| dataBitmapOutputStream); |
| break; |
| default: |
| throw new UnSupportedDataTypeException( |
| String.format("Data type %s is not supported.", type)); |
| } |
| } |
| } |
| |
| private static void doWithInt32Column( |
| int rowCount, |
| Column column, |
| int[] bitmaps, |
| int columnIndex, |
| DataOutputStream dataOutputStream, |
| int[] valueOccupation, |
| DataOutputStream dataBitmapOutputStream) |
| throws IOException { |
| for (int i = 0, size = column.getPositionCount(); i < size; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[columnIndex] = bitmaps[columnIndex] << 1; |
| } else { |
| bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG; |
| dataOutputStream.writeInt(column.getInt(i)); |
| valueOccupation[columnIndex] += 4; |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[columnIndex]); |
| // we should clear the bitmap every 8 points |
| bitmaps[columnIndex] = 0; |
| } |
| } |
| } |
| |
| private static void doWithInt64Column( |
| int rowCount, |
| Column column, |
| int[] bitmaps, |
| int columnIndex, |
| DataOutputStream dataOutputStream, |
| int[] valueOccupation, |
| DataOutputStream dataBitmapOutputStream) |
| throws IOException { |
| for (int i = 0, size = column.getPositionCount(); i < size; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[columnIndex] = bitmaps[columnIndex] << 1; |
| } else { |
| bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG; |
| dataOutputStream.writeLong(column.getLong(i)); |
| valueOccupation[columnIndex] += 8; |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[columnIndex]); |
| // we should clear the bitmap every 8 points |
| bitmaps[columnIndex] = 0; |
| } |
| } |
| } |
| |
| private static void doWithFloatColumn( |
| int rowCount, |
| Column column, |
| int[] bitmaps, |
| int columnIndex, |
| DataOutputStream dataOutputStream, |
| int[] valueOccupation, |
| DataOutputStream dataBitmapOutputStream) |
| throws IOException { |
| for (int i = 0, size = column.getPositionCount(); i < size; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[columnIndex] = bitmaps[columnIndex] << 1; |
| } else { |
| bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG; |
| dataOutputStream.writeFloat(column.getFloat(i)); |
| valueOccupation[columnIndex] += 4; |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[columnIndex]); |
| // we should clear the bitmap every 8 points |
| bitmaps[columnIndex] = 0; |
| } |
| } |
| } |
| |
| private static void doWithDoubleColumn( |
| int rowCount, |
| Column column, |
| int[] bitmaps, |
| int columnIndex, |
| DataOutputStream dataOutputStream, |
| int[] valueOccupation, |
| DataOutputStream dataBitmapOutputStream) |
| throws IOException { |
| for (int i = 0, size = column.getPositionCount(); i < size; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[columnIndex] = bitmaps[columnIndex] << 1; |
| } else { |
| bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG; |
| dataOutputStream.writeDouble(column.getDouble(i)); |
| valueOccupation[columnIndex] += 8; |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[columnIndex]); |
| // we should clear the bitmap every 8 points |
| bitmaps[columnIndex] = 0; |
| } |
| } |
| } |
| |
| private static void doWithBooleanColumn( |
| int rowCount, |
| Column column, |
| int[] bitmaps, |
| int columnIndex, |
| DataOutputStream dataOutputStream, |
| int[] valueOccupation, |
| DataOutputStream dataBitmapOutputStream) |
| throws IOException { |
| for (int i = 0, size = column.getPositionCount(); i < size; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[columnIndex] = bitmaps[columnIndex] << 1; |
| } else { |
| bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG; |
| dataOutputStream.writeBoolean(column.getBoolean(i)); |
| valueOccupation[columnIndex] += 1; |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[columnIndex]); |
| // we should clear the bitmap every 8 points |
| bitmaps[columnIndex] = 0; |
| } |
| } |
| } |
| |
| private static void doWithTextColumn( |
| int rowCount, |
| Column column, |
| int[] bitmaps, |
| int columnIndex, |
| DataOutputStream dataOutputStream, |
| int[] valueOccupation, |
| DataOutputStream dataBitmapOutputStream) |
| throws IOException { |
| for (int i = 0, size = column.getPositionCount(); i < size; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[columnIndex] = bitmaps[columnIndex] << 1; |
| } else { |
| bitmaps[columnIndex] = (bitmaps[columnIndex] << 1) | FLAG; |
| Binary binary = column.getBinary(i); |
| dataOutputStream.writeInt(binary.getLength()); |
| dataOutputStream.write(binary.getValues()); |
| valueOccupation[columnIndex] = valueOccupation[columnIndex] + 4 + binary.getLength(); |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[columnIndex]); |
| // we should clear the bitmap every 8 points |
| bitmaps[columnIndex] = 0; |
| } |
| } |
| } |
| |
| private static void fillRemainingBitMap( |
| int rowCount, int columnNum, DataOutputStream[] dataOutputStreams, int[] bitmaps) |
| throws IOException { |
| // feed the remaining bitmap |
| int remaining = rowCount % 8; |
| for (int k = 0; k < columnNum; k++) { |
| if (remaining != 0) { |
| DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)]; |
| dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining)); |
| } |
| } |
| } |
| |
| private static void fillTimeColumn( |
| int rowCount, ByteArrayOutputStream[] byteArrayOutputStreams, TSQueryDataSet tsQueryDataSet) { |
| // calculate the time buffer size |
| int timeOccupation = rowCount * 8; |
| ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation); |
| timeBuffer.put(byteArrayOutputStreams[0].toByteArray()); |
| timeBuffer.flip(); |
| tsQueryDataSet.setTime(timeBuffer); |
| } |
| |
| private static void fillValueColumnsAndBitMaps( |
| int rowCount, |
| ByteArrayOutputStream[] byteArrayOutputStreams, |
| int[] valueOccupation, |
| TSQueryDataSet tsQueryDataSet) { |
| // calculate the bitmap buffer size |
| int bitmapOccupation = (rowCount + 7) / 8; |
| |
| List<ByteBuffer> bitmapList = new LinkedList<>(); |
| List<ByteBuffer> valueList = new LinkedList<>(); |
| for (int i = 1; i < byteArrayOutputStreams.length; i += 2) { |
| ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]); |
| valueBuffer.put(byteArrayOutputStreams[i].toByteArray()); |
| valueBuffer.flip(); |
| valueList.add(valueBuffer); |
| |
| ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation); |
| bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray()); |
| bitmapBuffer.flip(); |
| bitmapList.add(bitmapBuffer); |
| } |
| tsQueryDataSet.setBitmapList(bitmapList); |
| tsQueryDataSet.setValueList(valueList); |
| } |
| |
| /** |
| * To fetch required amounts of data and combine them through List |
| * |
| * @param queryExecution used to get TsBlock from and judge whether there is more data. |
| * @param fetchSize wanted row size |
| * @return pair.left is serialized TsBlock pair.right indicates if the read finished |
| * @throws IoTDBException IoTDBException may be thrown if error happened while getting TsBlock |
| * from IQueryExecution |
| */ |
| public static Pair<List<ByteBuffer>, Boolean> convertQueryResultByFetchSize( |
| IQueryExecution queryExecution, int fetchSize) throws IoTDBException { |
| int rowCount = 0; |
| List<ByteBuffer> res = new ArrayList<>(); |
| while (rowCount < fetchSize) { |
| Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult(); |
| if (!optionalByteBuffer.isPresent()) { |
| break; |
| } |
| ByteBuffer byteBuffer = optionalByteBuffer.get(); |
| byteBuffer.mark(); |
| int valueColumnCount = byteBuffer.getInt(); |
| for (int i = 0; i < valueColumnCount; i++) { |
| byteBuffer.get(); |
| } |
| int positionCount = byteBuffer.getInt(); |
| byteBuffer.reset(); |
| if (positionCount != 0) { |
| res.add(byteBuffer); |
| } |
| rowCount += positionCount; |
| } |
| return new Pair<>(res, !queryExecution.hasNextResult()); |
| } |
| |
| public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) { |
| long[] times = new long[size]; |
| for (int i = 0; i < size; i++) { |
| times[i] = buffer.getLong(); |
| } |
| return times; |
| } |
| |
| public static long[] readTimesFromStream(DataInputStream stream, int size) throws IOException { |
| long[] times = new long[size]; |
| for (int i = 0; i < size; i++) { |
| times[i] = stream.readLong(); |
| } |
| return times; |
| } |
| |
| public static Optional<BitMap[]> readBitMapsFromBuffer(ByteBuffer buffer, int columns, int size) { |
| if (!buffer.hasRemaining()) { |
| return Optional.empty(); |
| } |
| BitMap[] bitMaps = new BitMap[columns]; |
| for (int i = 0; i < columns; i++) { |
| boolean hasBitMap = BytesUtils.byteToBool(buffer.get()); |
| if (hasBitMap) { |
| byte[] bytes = new byte[size / Byte.SIZE + 1]; |
| for (int j = 0; j < bytes.length; j++) { |
| bytes[j] = buffer.get(); |
| } |
| bitMaps[i] = new BitMap(size, bytes); |
| } |
| } |
| return Optional.of(bitMaps); |
| } |
| |
| public static Optional<BitMap[]> readBitMapsFromStream( |
| DataInputStream stream, int columns, int size) throws IOException { |
| if (stream.available() <= 0) { |
| return Optional.empty(); |
| } |
| BitMap[] bitMaps = new BitMap[columns]; |
| for (int i = 0; i < columns; i++) { |
| boolean hasBitMap = BytesUtils.byteToBool(stream.readByte()); |
| if (hasBitMap) { |
| byte[] bytes = new byte[size / Byte.SIZE + 1]; |
| for (int j = 0; j < bytes.length; j++) { |
| bytes[j] = stream.readByte(); |
| } |
| bitMaps[i] = new BitMap(size, bytes); |
| } |
| } |
| return Optional.of(bitMaps); |
| } |
| |
| public static Object[] readTabletValuesFromBuffer( |
| ByteBuffer buffer, List<Integer> types, int columns, int size) { |
| TSDataType[] dataTypes = new TSDataType[types.size()]; |
| for (int i = 0; i < dataTypes.length; i++) { |
| dataTypes[i] = TSDataType.values()[types.get(i)]; |
| } |
| return readTabletValuesFromBuffer(buffer, dataTypes, columns, size); |
| } |
| |
| /** |
| * Deserialize Tablet Values From Buffer |
| * |
| * @param buffer data values |
| * @param columns column number |
| * @param size value count in each column |
| * @throws UnSupportedDataTypeException if TSDataType is unknown, UnSupportedDataTypeException |
| * will be thrown. |
| */ |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| public static Object[] readTabletValuesFromBuffer( |
| ByteBuffer buffer, TSDataType[] types, int columns, int size) { |
| Object[] values = new Object[columns]; |
| for (int i = 0; i < columns; i++) { |
| switch (types[i]) { |
| case BOOLEAN: |
| boolean[] boolValues = new boolean[size]; |
| for (int index = 0; index < size; index++) { |
| boolValues[index] = BytesUtils.byteToBool(buffer.get()); |
| } |
| values[i] = boolValues; |
| break; |
| case INT32: |
| int[] intValues = new int[size]; |
| for (int index = 0; index < size; index++) { |
| intValues[index] = buffer.getInt(); |
| } |
| values[i] = intValues; |
| break; |
| case INT64: |
| long[] longValues = new long[size]; |
| for (int index = 0; index < size; index++) { |
| longValues[index] = buffer.getLong(); |
| } |
| values[i] = longValues; |
| break; |
| case FLOAT: |
| float[] floatValues = new float[size]; |
| for (int index = 0; index < size; index++) { |
| floatValues[index] = buffer.getFloat(); |
| } |
| values[i] = floatValues; |
| break; |
| case DOUBLE: |
| double[] doubleValues = new double[size]; |
| for (int index = 0; index < size; index++) { |
| doubleValues[index] = buffer.getDouble(); |
| } |
| values[i] = doubleValues; |
| break; |
| case TEXT: |
| Binary[] binaryValues = new Binary[size]; |
| for (int index = 0; index < size; index++) { |
| int binarySize = buffer.getInt(); |
| byte[] binaryValue = new byte[binarySize]; |
| buffer.get(binaryValue); |
| binaryValues[index] = new Binary(binaryValue); |
| } |
| values[i] = binaryValues; |
| break; |
| default: |
| throw new UnSupportedDataTypeException( |
| String.format("data type %s is not supported when convert data at client", types[i])); |
| } |
| } |
| return values; |
| } |
| |
| public static Object[] readTabletValuesFromStream( |
| DataInputStream stream, TSDataType[] types, int columns, int size) throws IOException { |
| Object[] values = new Object[columns]; |
| for (int i = 0; i < columns; i++) { |
| switch (types[i]) { |
| case BOOLEAN: |
| parseBooleanColumn(size, stream, values, i); |
| break; |
| case INT32: |
| parseInt32Column(size, stream, values, i); |
| break; |
| case INT64: |
| parseInt64Column(size, stream, values, i); |
| break; |
| case FLOAT: |
| parseFloatColumn(size, stream, values, i); |
| break; |
| case DOUBLE: |
| parseDoubleColumn(size, stream, values, i); |
| break; |
| case TEXT: |
| parseTextColumn(size, stream, values, i); |
| break; |
| default: |
| throw new UnSupportedDataTypeException( |
| String.format("data type %s is not supported when convert data at client", types[i])); |
| } |
| } |
| return values; |
| } |
| |
| private static void parseBooleanColumn( |
| int size, DataInputStream stream, Object[] values, int columnIndex) throws IOException { |
| boolean[] boolValues = new boolean[size]; |
| for (int index = 0; index < size; index++) { |
| boolValues[index] = BytesUtils.byteToBool(stream.readByte()); |
| } |
| values[columnIndex] = boolValues; |
| } |
| |
| private static void parseInt32Column( |
| int size, DataInputStream stream, Object[] values, int columnIndex) throws IOException { |
| int[] intValues = new int[size]; |
| for (int index = 0; index < size; index++) { |
| intValues[index] = stream.readInt(); |
| } |
| values[columnIndex] = intValues; |
| } |
| |
| private static void parseInt64Column( |
| int size, DataInputStream stream, Object[] values, int columnIndex) throws IOException { |
| long[] longValues = new long[size]; |
| for (int index = 0; index < size; index++) { |
| longValues[index] = stream.readLong(); |
| } |
| values[columnIndex] = longValues; |
| } |
| |
| private static void parseFloatColumn( |
| int size, DataInputStream stream, Object[] values, int columnIndex) throws IOException { |
| float[] floatValues = new float[size]; |
| for (int index = 0; index < size; index++) { |
| floatValues[index] = stream.readFloat(); |
| } |
| values[columnIndex] = floatValues; |
| } |
| |
| private static void parseDoubleColumn( |
| int size, DataInputStream stream, Object[] values, int columnIndex) throws IOException { |
| double[] doubleValues = new double[size]; |
| for (int index = 0; index < size; index++) { |
| doubleValues[index] = stream.readDouble(); |
| } |
| values[columnIndex] = doubleValues; |
| } |
| |
| private static void parseTextColumn( |
| int size, DataInputStream stream, Object[] values, int columnIndex) throws IOException { |
| Binary[] binaryValues = new Binary[size]; |
| for (int index = 0; index < size; index++) { |
| int binarySize = stream.readInt(); |
| byte[] binaryValue = new byte[binarySize]; |
| int actualReadSize = stream.read(binaryValue); |
| if (actualReadSize != binarySize) { |
| throw new IllegalStateException( |
| "Expect to read " + binarySize + " bytes, actually read " + actualReadSize + "bytes."); |
| } |
| binaryValues[index] = new Binary(binaryValue); |
| } |
| values[columnIndex] = binaryValues; |
| } |
| } |