| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.utils; |
| |
| import org.apache.iotdb.commons.exception.IoTDBException; |
| import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution; |
| import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; |
| import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; |
| import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.read.common.Field; |
| import org.apache.iotdb.tsfile.read.common.RowRecord; |
| import org.apache.iotdb.tsfile.read.common.block.TsBlock; |
| import org.apache.iotdb.tsfile.read.common.block.column.Column; |
| import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; |
| import org.apache.iotdb.tsfile.utils.Binary; |
| import org.apache.iotdb.tsfile.utils.BitMap; |
| import org.apache.iotdb.tsfile.utils.BytesUtils; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Optional; |
| |
| /** TimeValuePairUtils to convert between thrift format and TsFile format. */ |
| public class QueryDataSetUtils { |
| |
| private static final int FLAG = 0x01; |
| |
| private QueryDataSetUtils() {} |
| |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| public static TSQueryDataSet convertQueryDataSetByFetchSize( |
| QueryDataSet queryDataSet, int fetchSize, WatermarkEncoder watermarkEncoder) |
| throws IOException { |
| int columnNum = queryDataSet.getColumnNum(); |
| TSQueryDataSet tsQueryDataSet = new TSQueryDataSet(); |
| // one time column and each value column has a actual value buffer and a bitmap value to |
| // indicate whether it is a null |
| int columnNumWithTime = columnNum * 2 + 1; |
| DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime]; |
| ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime]; |
| for (int i = 0; i < columnNumWithTime; i++) { |
| byteArrayOutputStreams[i] = new ByteArrayOutputStream(); |
| dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]); |
| } |
| |
| int rowCount = 0; |
| int[] valueOccupation = new int[columnNum]; |
| // used to record a bitmap for every 8 row record |
| int[] bitmap = new int[columnNum]; |
| for (int i = 0; i < fetchSize; i++) { |
| if (queryDataSet.hasNext()) { |
| RowRecord rowRecord = queryDataSet.next(); |
| // filter rows whose columns are null according to the rule |
| if (queryDataSet.withoutNullFilter(rowRecord)) { |
| // if the current RowRecord doesn't satisfy, we should also decrease |
| // AlreadyReturnedRowNum |
| queryDataSet.decreaseAlreadyReturnedRowNum(); |
| i--; |
| continue; |
| } |
| |
| if (watermarkEncoder != null) { |
| rowRecord = watermarkEncoder.encodeRecord(rowRecord); |
| } |
| // use columnOutput to write byte array |
| dataOutputStreams[0].writeLong(rowRecord.getTimestamp()); |
| List<Field> fields = rowRecord.getFields(); |
| for (int k = 0; k < fields.size(); k++) { |
| Field field = fields.get(k); |
| DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1]; // DO NOT FORGET +1 |
| if (field == null || field.getDataType() == null) { |
| bitmap[k] = (bitmap[k] << 1); |
| } else { |
| bitmap[k] = (bitmap[k] << 1) | FLAG; |
| TSDataType type = field.getDataType(); |
| switch (type) { |
| case INT32: |
| dataOutputStream.writeInt(field.getIntV()); |
| valueOccupation[k] += 4; |
| break; |
| case INT64: |
| dataOutputStream.writeLong(field.getLongV()); |
| valueOccupation[k] += 8; |
| break; |
| case FLOAT: |
| dataOutputStream.writeFloat(field.getFloatV()); |
| valueOccupation[k] += 4; |
| break; |
| case DOUBLE: |
| dataOutputStream.writeDouble(field.getDoubleV()); |
| valueOccupation[k] += 8; |
| break; |
| case BOOLEAN: |
| dataOutputStream.writeBoolean(field.getBoolV()); |
| valueOccupation[k] += 1; |
| break; |
| case TEXT: |
| dataOutputStream.writeInt(field.getBinaryV().getLength()); |
| dataOutputStream.write(field.getBinaryV().getValues()); |
| valueOccupation[k] = valueOccupation[k] + 4 + field.getBinaryV().getLength(); |
| break; |
| default: |
| throw new UnSupportedDataTypeException( |
| String.format("Data type %s is not supported.", type)); |
| } |
| } |
| } |
| rowCount++; |
| if (rowCount % 8 == 0) { |
| for (int j = 0; j < bitmap.length; j++) { |
| DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (j + 1)]; |
| dataBitmapOutputStream.writeByte(bitmap[j]); |
| // we should clear the bitmap every 8 row record |
| bitmap[j] = 0; |
| } |
| } |
| } else { |
| break; |
| } |
| } |
| |
| // feed the remaining bitmap |
| int remaining = rowCount % 8; |
| if (remaining != 0) { |
| for (int j = 0; j < bitmap.length; j++) { |
| DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (j + 1)]; |
| dataBitmapOutputStream.writeByte(bitmap[j] << (8 - remaining)); |
| } |
| } |
| |
| // calculate the time buffer size |
| int timeOccupation = rowCount * 8; |
| ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation); |
| timeBuffer.put(byteArrayOutputStreams[0].toByteArray()); |
| timeBuffer.flip(); |
| tsQueryDataSet.setTime(timeBuffer); |
| |
| // calculate the bitmap buffer size |
| int bitmapOccupation = rowCount / 8 + (rowCount % 8 == 0 ? 0 : 1); |
| |
| List<ByteBuffer> bitmapList = new LinkedList<>(); |
| List<ByteBuffer> valueList = new LinkedList<>(); |
| for (int i = 1; i < byteArrayOutputStreams.length; i += 2) { |
| ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]); |
| valueBuffer.put(byteArrayOutputStreams[i].toByteArray()); |
| valueBuffer.flip(); |
| valueList.add(valueBuffer); |
| |
| ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation); |
| bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray()); |
| bitmapBuffer.flip(); |
| bitmapList.add(bitmapBuffer); |
| } |
| tsQueryDataSet.setBitmapList(bitmapList); |
| tsQueryDataSet.setValueList(valueList); |
| return tsQueryDataSet; |
| } |
| |
| public static TSQueryDataSet convertTsBlockByFetchSize( |
| IQueryExecution queryExecution, int fetchSize) throws IOException, IoTDBException { |
| int columnNum = queryExecution.getOutputValueColumnCount(); |
| TSQueryDataSet tsQueryDataSet = new TSQueryDataSet(); |
| // one time column and each value column has an actual value buffer and a bitmap value to |
| // indicate whether it is a null |
| int columnNumWithTime = columnNum * 2 + 1; |
| DataOutputStream[] dataOutputStreams = new DataOutputStream[columnNumWithTime]; |
| ByteArrayOutputStream[] byteArrayOutputStreams = new ByteArrayOutputStream[columnNumWithTime]; |
| for (int i = 0; i < columnNumWithTime; i++) { |
| byteArrayOutputStreams[i] = new ByteArrayOutputStream(); |
| dataOutputStreams[i] = new DataOutputStream(byteArrayOutputStreams[i]); |
| } |
| |
| int rowCount = 0; |
| int[] valueOccupation = new int[columnNum]; |
| |
| // used to record a bitmap for every 8 points |
| int[] bitmaps = new int[columnNum]; |
| while (rowCount < fetchSize) { |
| Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult(); |
| if (!optionalTsBlock.isPresent()) { |
| break; |
| } |
| TsBlock tsBlock = optionalTsBlock.get(); |
| if (tsBlock.isEmpty()) { |
| continue; |
| } |
| |
| int currentCount = tsBlock.getPositionCount(); |
| // serialize time column |
| for (int i = 0; i < currentCount; i++) { |
| // use columnOutput to write byte array |
| dataOutputStreams[0].writeLong(tsBlock.getTimeByIndex(i)); |
| } |
| |
| // serialize each value column and its bitmap |
| for (int k = 0; k < columnNum; k++) { |
| // get DataOutputStream for current value column and its bitmap |
| DataOutputStream dataOutputStream = dataOutputStreams[2 * k + 1]; |
| DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)]; |
| |
| Column column = tsBlock.getColumn(k); |
| TSDataType type = column.getDataType(); |
| switch (type) { |
| case INT32: |
| for (int i = 0; i < currentCount; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[k] = bitmaps[k] << 1; |
| } else { |
| bitmaps[k] = (bitmaps[k] << 1) | FLAG; |
| dataOutputStream.writeInt(column.getInt(i)); |
| valueOccupation[k] += 4; |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[k]); |
| // we should clear the bitmap every 8 points |
| bitmaps[k] = 0; |
| } |
| } |
| break; |
| case INT64: |
| for (int i = 0; i < currentCount; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[k] = bitmaps[k] << 1; |
| } else { |
| bitmaps[k] = (bitmaps[k] << 1) | FLAG; |
| dataOutputStream.writeLong(column.getLong(i)); |
| valueOccupation[k] += 8; |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[k]); |
| // we should clear the bitmap every 8 points |
| bitmaps[k] = 0; |
| } |
| } |
| break; |
| case FLOAT: |
| for (int i = 0; i < currentCount; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[k] = bitmaps[k] << 1; |
| } else { |
| bitmaps[k] = (bitmaps[k] << 1) | FLAG; |
| dataOutputStream.writeFloat(column.getFloat(i)); |
| valueOccupation[k] += 4; |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[k]); |
| // we should clear the bitmap every 8 points |
| bitmaps[k] = 0; |
| } |
| } |
| break; |
| case DOUBLE: |
| for (int i = 0; i < currentCount; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[k] = bitmaps[k] << 1; |
| } else { |
| bitmaps[k] = (bitmaps[k] << 1) | FLAG; |
| dataOutputStream.writeDouble(column.getDouble(i)); |
| valueOccupation[k] += 8; |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[k]); |
| // we should clear the bitmap every 8 points |
| bitmaps[k] = 0; |
| } |
| } |
| break; |
| case BOOLEAN: |
| for (int i = 0; i < currentCount; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[k] = bitmaps[k] << 1; |
| } else { |
| bitmaps[k] = (bitmaps[k] << 1) | FLAG; |
| dataOutputStream.writeBoolean(column.getBoolean(i)); |
| valueOccupation[k] += 1; |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[k]); |
| // we should clear the bitmap every 8 points |
| bitmaps[k] = 0; |
| } |
| } |
| break; |
| case TEXT: |
| for (int i = 0; i < currentCount; i++) { |
| rowCount++; |
| if (column.isNull(i)) { |
| bitmaps[k] = bitmaps[k] << 1; |
| } else { |
| bitmaps[k] = (bitmaps[k] << 1) | FLAG; |
| Binary binary = column.getBinary(i); |
| dataOutputStream.writeInt(binary.getLength()); |
| dataOutputStream.write(binary.getValues()); |
| valueOccupation[k] = valueOccupation[k] + 4 + binary.getLength(); |
| } |
| if (rowCount != 0 && rowCount % 8 == 0) { |
| dataBitmapOutputStream.writeByte(bitmaps[k]); |
| // we should clear the bitmap every 8 points |
| bitmaps[k] = 0; |
| } |
| } |
| break; |
| default: |
| throw new UnSupportedDataTypeException( |
| String.format("Data type %s is not supported.", type)); |
| } |
| if (k != columnNum - 1) { |
| rowCount -= currentCount; |
| } |
| } |
| } |
| // feed the remaining bitmap |
| int remaining = rowCount % 8; |
| for (int k = 0; k < columnNum; k++) { |
| if (remaining != 0) { |
| DataOutputStream dataBitmapOutputStream = dataOutputStreams[2 * (k + 1)]; |
| dataBitmapOutputStream.writeByte(bitmaps[k] << (8 - remaining)); |
| } |
| } |
| |
| // calculate the time buffer size |
| int timeOccupation = rowCount * 8; |
| ByteBuffer timeBuffer = ByteBuffer.allocate(timeOccupation); |
| timeBuffer.put(byteArrayOutputStreams[0].toByteArray()); |
| timeBuffer.flip(); |
| tsQueryDataSet.setTime(timeBuffer); |
| |
| // calculate the bitmap buffer size |
| int bitmapOccupation = (rowCount + 7) / 8; |
| |
| List<ByteBuffer> bitmapList = new LinkedList<>(); |
| List<ByteBuffer> valueList = new LinkedList<>(); |
| for (int i = 1; i < byteArrayOutputStreams.length; i += 2) { |
| ByteBuffer valueBuffer = ByteBuffer.allocate(valueOccupation[(i - 1) / 2]); |
| valueBuffer.put(byteArrayOutputStreams[i].toByteArray()); |
| valueBuffer.flip(); |
| valueList.add(valueBuffer); |
| |
| ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapOccupation); |
| bitmapBuffer.put(byteArrayOutputStreams[i + 1].toByteArray()); |
| bitmapBuffer.flip(); |
| bitmapList.add(bitmapBuffer); |
| } |
| tsQueryDataSet.setBitmapList(bitmapList); |
| tsQueryDataSet.setValueList(valueList); |
| return tsQueryDataSet; |
| } |
| |
| public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) { |
| long[] times = new long[size]; |
| for (int i = 0; i < size; i++) { |
| times[i] = buffer.getLong(); |
| } |
| return times; |
| } |
| |
| public static long[] readTimesFromStream(DataInputStream stream, int size) throws IOException { |
| long[] times = new long[size]; |
| for (int i = 0; i < size; i++) { |
| times[i] = stream.readLong(); |
| } |
| return times; |
| } |
| |
| public static BitMap[] readBitMapsFromBuffer(ByteBuffer buffer, int columns, int size) { |
| if (!buffer.hasRemaining()) { |
| return null; |
| } |
| BitMap[] bitMaps = new BitMap[columns]; |
| for (int i = 0; i < columns; i++) { |
| boolean hasBitMap = BytesUtils.byteToBool(buffer.get()); |
| if (hasBitMap) { |
| byte[] bytes = new byte[size / Byte.SIZE + 1]; |
| for (int j = 0; j < bytes.length; j++) { |
| bytes[j] = buffer.get(); |
| } |
| bitMaps[i] = new BitMap(size, bytes); |
| } |
| } |
| return bitMaps; |
| } |
| |
| public static BitMap[] readBitMapsFromStream(DataInputStream stream, int columns, int size) |
| throws IOException { |
| if (stream.available() <= 0) { |
| return null; |
| } |
| BitMap[] bitMaps = new BitMap[columns]; |
| for (int i = 0; i < columns; i++) { |
| boolean hasBitMap = BytesUtils.byteToBool(stream.readByte()); |
| if (hasBitMap) { |
| byte[] bytes = new byte[size / Byte.SIZE + 1]; |
| for (int j = 0; j < bytes.length; j++) { |
| bytes[j] = stream.readByte(); |
| } |
| bitMaps[i] = new BitMap(size, bytes); |
| } |
| } |
| return bitMaps; |
| } |
| |
| public static Object[] readTabletValuesFromBuffer( |
| ByteBuffer buffer, List<Integer> types, int columns, int size) { |
| TSDataType[] dataTypes = new TSDataType[types.size()]; |
| for (int i = 0; i < dataTypes.length; i++) { |
| dataTypes[i] = TSDataType.values()[types.get(i)]; |
| } |
| return readTabletValuesFromBuffer(buffer, dataTypes, columns, size); |
| } |
| |
| public static Object[] readTabletValuesFromStream( |
| DataInputStream stream, List<Integer> types, int columns, int size) throws IOException { |
| TSDataType[] dataTypes = new TSDataType[types.size()]; |
| for (int i = 0; i < dataTypes.length; i++) { |
| dataTypes[i] = TSDataType.values()[types.get(i)]; |
| } |
| return readTabletValuesFromStream(stream, dataTypes, columns, size); |
| } |
| |
| /** |
| * @param buffer data values |
| * @param columns column number |
| * @param size value count in each column |
| */ |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| public static Object[] readTabletValuesFromBuffer( |
| ByteBuffer buffer, TSDataType[] types, int columns, int size) { |
| Object[] values = new Object[columns]; |
| for (int i = 0; i < columns; i++) { |
| switch (types[i]) { |
| case BOOLEAN: |
| boolean[] boolValues = new boolean[size]; |
| for (int index = 0; index < size; index++) { |
| boolValues[index] = BytesUtils.byteToBool(buffer.get()); |
| } |
| values[i] = boolValues; |
| break; |
| case INT32: |
| int[] intValues = new int[size]; |
| for (int index = 0; index < size; index++) { |
| intValues[index] = buffer.getInt(); |
| } |
| values[i] = intValues; |
| break; |
| case INT64: |
| long[] longValues = new long[size]; |
| for (int index = 0; index < size; index++) { |
| longValues[index] = buffer.getLong(); |
| } |
| values[i] = longValues; |
| break; |
| case FLOAT: |
| float[] floatValues = new float[size]; |
| for (int index = 0; index < size; index++) { |
| floatValues[index] = buffer.getFloat(); |
| } |
| values[i] = floatValues; |
| break; |
| case DOUBLE: |
| double[] doubleValues = new double[size]; |
| for (int index = 0; index < size; index++) { |
| doubleValues[index] = buffer.getDouble(); |
| } |
| values[i] = doubleValues; |
| break; |
| case TEXT: |
| Binary[] binaryValues = new Binary[size]; |
| for (int index = 0; index < size; index++) { |
| int binarySize = buffer.getInt(); |
| byte[] binaryValue = new byte[binarySize]; |
| buffer.get(binaryValue); |
| binaryValues[index] = new Binary(binaryValue); |
| } |
| values[i] = binaryValues; |
| break; |
| default: |
| throw new UnSupportedDataTypeException( |
| String.format("data type %s is not supported when convert data at client", types[i])); |
| } |
| } |
| return values; |
| } |
| |
| public static Object[] readTabletValuesFromStream( |
| DataInputStream stream, TSDataType[] types, int columns, int size) throws IOException { |
| Object[] values = new Object[columns]; |
| for (int i = 0; i < columns; i++) { |
| switch (types[i]) { |
| case BOOLEAN: |
| boolean[] boolValues = new boolean[size]; |
| for (int index = 0; index < size; index++) { |
| boolValues[index] = BytesUtils.byteToBool(stream.readByte()); |
| } |
| values[i] = boolValues; |
| break; |
| case INT32: |
| int[] intValues = new int[size]; |
| for (int index = 0; index < size; index++) { |
| intValues[index] = stream.readInt(); |
| } |
| values[i] = intValues; |
| break; |
| case INT64: |
| long[] longValues = new long[size]; |
| for (int index = 0; index < size; index++) { |
| longValues[index] = stream.readLong(); |
| } |
| values[i] = longValues; |
| break; |
| case FLOAT: |
| float[] floatValues = new float[size]; |
| for (int index = 0; index < size; index++) { |
| floatValues[index] = stream.readFloat(); |
| } |
| values[i] = floatValues; |
| break; |
| case DOUBLE: |
| double[] doubleValues = new double[size]; |
| for (int index = 0; index < size; index++) { |
| doubleValues[index] = stream.readDouble(); |
| } |
| values[i] = doubleValues; |
| break; |
| case TEXT: |
| Binary[] binaryValues = new Binary[size]; |
| for (int index = 0; index < size; index++) { |
| int binarySize = stream.readInt(); |
| byte[] binaryValue = new byte[binarySize]; |
| stream.read(binaryValue); |
| binaryValues[index] = new Binary(binaryValue); |
| } |
| values[i] = binaryValues; |
| break; |
| default: |
| throw new UnSupportedDataTypeException( |
| String.format("data type %s is not supported when convert data at client", types[i])); |
| } |
| } |
| return values; |
| } |
| } |