| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.storageengine.dataregion.memtable; |
| |
| import org.apache.iotdb.db.conf.IoTDBConfig; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; |
| import org.apache.iotdb.db.utils.MemUtils; |
| import org.apache.iotdb.db.utils.datastructure.TVList; |
| |
| import org.apache.tsfile.enums.TSDataType; |
| import org.apache.tsfile.utils.Binary; |
| import org.apache.tsfile.utils.BitMap; |
| import org.apache.tsfile.write.UnSupportedDataTypeException; |
| import org.apache.tsfile.write.chunk.ChunkWriterImpl; |
| import org.apache.tsfile.write.chunk.IChunkWriter; |
| import org.apache.tsfile.write.schema.IMeasurementSchema; |
| import org.apache.tsfile.write.schema.MeasurementSchema; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.List; |
| |
| public class WritableMemChunk implements IWritableMemChunk { |
| |
| private IMeasurementSchema schema; |
| private TVList list; |
| private static final String UNSUPPORTED_TYPE = "Unsupported data type:"; |
| private static final Logger LOGGER = LoggerFactory.getLogger(WritableMemChunk.class); |
| |
| private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); |
| |
| public WritableMemChunk(IMeasurementSchema schema) { |
| this.schema = schema; |
| this.list = TVList.newList(schema.getType()); |
| } |
| |
| private WritableMemChunk() {} |
| |
| @Override |
| public boolean writeWithFlushCheck(long insertTime, Object objectValue) { |
| switch (schema.getType()) { |
| case BOOLEAN: |
| putBoolean(insertTime, (boolean) objectValue); |
| break; |
| case INT32: |
| putInt(insertTime, (int) objectValue); |
| break; |
| case INT64: |
| putLong(insertTime, (long) objectValue); |
| break; |
| case FLOAT: |
| putFloat(insertTime, (float) objectValue); |
| break; |
| case DOUBLE: |
| putDouble(insertTime, (double) objectValue); |
| break; |
| case TEXT: |
| return putBinaryWithFlushCheck(insertTime, (Binary) objectValue); |
| default: |
| throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean writeAlignedValueWithFlushCheck( |
| long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) { |
| throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType()); |
| } |
| |
| @Override |
| public boolean writeWithFlushCheck( |
| long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end) { |
| switch (dataType) { |
| case BOOLEAN: |
| boolean[] boolValues = (boolean[]) valueList; |
| putBooleans(times, boolValues, bitMap, start, end); |
| break; |
| case INT32: |
| int[] intValues = (int[]) valueList; |
| putInts(times, intValues, bitMap, start, end); |
| break; |
| case INT64: |
| long[] longValues = (long[]) valueList; |
| putLongs(times, longValues, bitMap, start, end); |
| break; |
| case FLOAT: |
| float[] floatValues = (float[]) valueList; |
| putFloats(times, floatValues, bitMap, start, end); |
| break; |
| case DOUBLE: |
| double[] doubleValues = (double[]) valueList; |
| putDoubles(times, doubleValues, bitMap, start, end); |
| break; |
| case TEXT: |
| Binary[] binaryValues = (Binary[]) valueList; |
| return putBinariesWithFlushCheck(times, binaryValues, bitMap, start, end); |
| default: |
| throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + dataType); |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean writeAlignedValuesWithFlushCheck( |
| long[] times, |
| Object[] valueList, |
| BitMap[] bitMaps, |
| List<IMeasurementSchema> schemaList, |
| int start, |
| int end) { |
| throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType()); |
| } |
| |
| @Override |
| public void putLong(long t, long v) { |
| list.putLong(t, v); |
| } |
| |
| @Override |
| public void putInt(long t, int v) { |
| list.putInt(t, v); |
| } |
| |
| @Override |
| public void putFloat(long t, float v) { |
| list.putFloat(t, v); |
| } |
| |
| @Override |
| public void putDouble(long t, double v) { |
| list.putDouble(t, v); |
| } |
| |
| @Override |
| public boolean putBinaryWithFlushCheck(long t, Binary v) { |
| list.putBinary(t, v); |
| return list.reachMaxChunkSizeThreshold(); |
| } |
| |
| @Override |
| public void putBoolean(long t, boolean v) { |
| list.putBoolean(t, v); |
| } |
| |
| @Override |
| public boolean putAlignedValueWithFlushCheck(long t, Object[] v) { |
| throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); |
| } |
| |
| @Override |
| public void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end) { |
| list.putLongs(t, v, bitMap, start, end); |
| } |
| |
| @Override |
| public void putInts(long[] t, int[] v, BitMap bitMap, int start, int end) { |
| list.putInts(t, v, bitMap, start, end); |
| } |
| |
| @Override |
| public void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end) { |
| list.putFloats(t, v, bitMap, start, end); |
| } |
| |
| @Override |
| public void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end) { |
| list.putDoubles(t, v, bitMap, start, end); |
| } |
| |
| @Override |
| public boolean putBinariesWithFlushCheck( |
| long[] t, Binary[] v, BitMap bitMap, int start, int end) { |
| list.putBinaries(t, v, bitMap, start, end); |
| return list.reachMaxChunkSizeThreshold(); |
| } |
| |
| @Override |
| public void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end) { |
| list.putBooleans(t, v, bitMap, start, end); |
| } |
| |
| @Override |
| public boolean putAlignedValuesWithFlushCheck( |
| long[] t, Object[] v, BitMap[] bitMaps, int start, int end) { |
| throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); |
| } |
| |
| @Override |
| public synchronized TVList getSortedTvListForQuery() { |
| sortTVList(); |
| // increase reference count |
| list.increaseReferenceCount(); |
| return list; |
| } |
| |
| @Override |
| public synchronized TVList getSortedTvListForQuery(List<IMeasurementSchema> measurementSchema) { |
| throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType()); |
| } |
| |
| private void sortTVList() { |
| // check reference count |
| if ((list.getReferenceCount() > 0 && !list.isSorted())) { |
| list = list.clone(); |
| } |
| |
| if (!list.isSorted()) { |
| list.sort(); |
| } |
| } |
| |
| @Override |
| public synchronized void sortTvListForFlush() { |
| sortTVList(); |
| } |
| |
| @Override |
| public TVList getTVList() { |
| return list; |
| } |
| |
| @Override |
| public long count() { |
| return list.rowCount(); |
| } |
| |
| @Override |
| public IMeasurementSchema getSchema() { |
| return schema; |
| } |
| |
| @Override |
| public long getMaxTime() { |
| return list.getMaxTime(); |
| } |
| |
| @Override |
| public long getFirstPoint() { |
| if (list.rowCount() == 0) { |
| return Long.MAX_VALUE; |
| } |
| return getSortedTvListForQuery().getTimeValuePair(0).getTimestamp(); |
| } |
| |
| @Override |
| public long getLastPoint() { |
| if (list.rowCount() == 0) { |
| return Long.MIN_VALUE; |
| } |
| return getSortedTvListForQuery() |
| .getTimeValuePair(getSortedTvListForQuery().rowCount() - 1) |
| .getTimestamp(); |
| } |
| |
| @Override |
| public boolean isEmpty() { |
| return list.rowCount() == 0; |
| } |
| |
| @Override |
| public int delete(long lowerBound, long upperBound) { |
| return list.delete(lowerBound, upperBound); |
| } |
| |
| @Override |
| public IChunkWriter createIChunkWriter() { |
| return new ChunkWriterImpl(schema); |
| } |
| |
| @Override |
| public String toString() { |
| int size = list.rowCount(); |
| int firstIndex = 0; |
| int lastIndex = size - 1; |
| long minTime = Long.MAX_VALUE; |
| long maxTime = Long.MIN_VALUE; |
| for (int i = 0; i < size; i++) { |
| long currentTime = list.getTime(i); |
| if (currentTime < minTime) { |
| firstIndex = i; |
| minTime = currentTime; |
| } |
| if (currentTime >= maxTime) { |
| lastIndex = i; |
| maxTime = currentTime; |
| } |
| } |
| |
| StringBuilder out = new StringBuilder("MemChunk Size: " + size + System.lineSeparator()); |
| if (size != 0) { |
| out.append("Data type:").append(schema.getType()).append(System.lineSeparator()); |
| out.append("First point:") |
| .append(list.getTimeValuePair(firstIndex)) |
| .append(System.lineSeparator()); |
| out.append("Last point:") |
| .append(list.getTimeValuePair(lastIndex)) |
| .append(System.lineSeparator()); |
| } |
| return out.toString(); |
| } |
| |
| @Override |
| public void encode(IChunkWriter chunkWriter) { |
| |
| ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter; |
| |
| for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); sortedRowIndex++) { |
| long time = list.getTime(sortedRowIndex); |
| |
| TSDataType tsDataType = schema.getType(); |
| |
| // skip duplicated data |
| if ((sortedRowIndex + 1 < list.rowCount() && (time == list.getTime(sortedRowIndex + 1)))) { |
| long recordSize = |
| MemUtils.getRecordSize( |
| tsDataType, |
| tsDataType == TSDataType.TEXT ? list.getBinary(sortedRowIndex) : null, |
| true); |
| CompressionRatio.decreaseDuplicatedMemorySize(recordSize); |
| continue; |
| } |
| |
| // store last point for SDT |
| if (sortedRowIndex + 1 == list.rowCount()) { |
| chunkWriterImpl.setLastPoint(true); |
| } |
| |
| switch (tsDataType) { |
| case BOOLEAN: |
| chunkWriterImpl.write(time, list.getBoolean(sortedRowIndex)); |
| break; |
| case INT32: |
| chunkWriterImpl.write(time, list.getInt(sortedRowIndex)); |
| break; |
| case INT64: |
| chunkWriterImpl.write(time, list.getLong(sortedRowIndex)); |
| break; |
| case FLOAT: |
| chunkWriterImpl.write(time, list.getFloat(sortedRowIndex)); |
| break; |
| case DOUBLE: |
| chunkWriterImpl.write(time, list.getDouble(sortedRowIndex)); |
| break; |
| case TEXT: |
| chunkWriterImpl.write(time, list.getBinary(sortedRowIndex)); |
| break; |
| default: |
| LOGGER.error("WritableMemChunk does not support data type: {}", tsDataType); |
| break; |
| } |
| } |
| } |
| |
| @Override |
| public void release() { |
| if (list.getReferenceCount() == 0) { |
| list.clear(); |
| } |
| } |
| |
| @Override |
| public int serializedSize() { |
| return schema.serializedSize() + list.serializedSize(); |
| } |
| |
| @Override |
| public void serializeToWAL(IWALByteBufferView buffer) { |
| byte[] bytes = new byte[schema.serializedSize()]; |
| schema.serializeTo(ByteBuffer.wrap(bytes)); |
| buffer.put(bytes); |
| |
| list.serializeToWAL(buffer); |
| } |
| |
| public static WritableMemChunk deserialize(DataInputStream stream) throws IOException { |
| WritableMemChunk memChunk = new WritableMemChunk(); |
| memChunk.schema = MeasurementSchema.deserializeFrom(stream); |
| memChunk.list = TVList.deserialize(stream); |
| return memChunk; |
| } |
| } |