blob: 3160668b7c74e72ed44af1a186915321e552ab45 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.tsfile.write.chunk.IChunkWriter;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
public class AlignedWritableMemChunk implements IWritableMemChunk {
private final Map<String, Integer> measurementIndexMap;
private final List<IMeasurementSchema> schemaList;
private AlignedTVList list;
private static final int MAX_NUMBER_OF_POINTS_IN_PAGE =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
public AlignedWritableMemChunk(List<IMeasurementSchema> schemaList) {
this.measurementIndexMap = new LinkedHashMap<>();
List<TSDataType> dataTypeList = new ArrayList<>();
this.schemaList = schemaList;
for (int i = 0; i < schemaList.size(); i++) {
measurementIndexMap.put(schemaList.get(i).getMeasurementId(), i);
dataTypeList.add(schemaList.get(i).getType());
}
this.list = AlignedTVList.newAlignedList(dataTypeList);
}
private AlignedWritableMemChunk(List<IMeasurementSchema> schemaList, AlignedTVList list) {
this.measurementIndexMap = new LinkedHashMap<>();
this.schemaList = schemaList;
for (int i = 0; i < schemaList.size(); i++) {
measurementIndexMap.put(schemaList.get(i).getMeasurementId(), i);
}
this.list = list;
}
public Set<String> getAllMeasurements() {
return measurementIndexMap.keySet();
}
public boolean containsMeasurement(String measurementId) {
return measurementIndexMap.containsKey(measurementId);
}
@Override
public void putLong(long t, long v) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putInt(long t, int v) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putFloat(long t, float v) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putDouble(long t, double v) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public boolean putBinaryWithFlushCheck(long t, Binary v) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putBoolean(long t, boolean v) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public boolean putAlignedValueWithFlushCheck(long t, Object[] v) {
list.putAlignedValue(t, v);
return list.reachMaxChunkSizeThreshold();
}
@Override
public void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putInts(long[] t, int[] v, BitMap bitMap, int start, int end) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putFloats(long[] t, float[] v, BitMap bitMap, int start, int end) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public boolean putBinariesWithFlushCheck(
long[] t, Binary[] v, BitMap bitMap, int start, int end) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public boolean putAlignedValuesWithFlushCheck(
long[] t, Object[] v, BitMap[] bitMaps, int start, int end) {
list.putAlignedValues(t, v, bitMaps, start, end);
return list.reachMaxChunkSizeThreshold();
}
@Override
public boolean writeWithFlushCheck(long insertTime, Object objectValue) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public boolean writeWithFlushCheck(
long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
public boolean writeAlignedValueWithFlushCheck(
long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
Object[] reorderedValue =
checkAndReorderColumnValuesInInsertPlan(schemaList, objectValue, null).left;
return putAlignedValueWithFlushCheck(insertTime, reorderedValue);
}
@Override
public boolean writeAlignedValuesWithFlushCheck(
long[] times,
Object[] valueList,
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
int end) {
Pair<Object[], BitMap[]> pair =
checkAndReorderColumnValuesInInsertPlan(schemaList, valueList, bitMaps);
Object[] reorderedColumnValues = pair.left;
BitMap[] reorderedBitMaps = pair.right;
return putAlignedValuesWithFlushCheck(
times, reorderedColumnValues, reorderedBitMaps, start, end);
}
/**
* Check metadata of columns and return array that mapping existed metadata to index of data
* column.
*
* @param schemaListInInsertPlan Contains all existed schema in InsertPlan. If some timeseries
* have been deleted, there will be null in its slot.
* @return columnIndexArray: schemaList[i] is schema of columns[columnIndexArray[i]]
*/
private Pair<Object[], BitMap[]> checkAndReorderColumnValuesInInsertPlan(
List<IMeasurementSchema> schemaListInInsertPlan, Object[] columnValues, BitMap[] bitMaps) {
Object[] reorderedColumnValues = new Object[schemaList.size()];
BitMap[] reorderedBitMaps = bitMaps == null ? null : new BitMap[schemaList.size()];
for (int i = 0; i < schemaListInInsertPlan.size(); i++) {
IMeasurementSchema measurementSchema = schemaListInInsertPlan.get(i);
if (measurementSchema != null) {
Integer index = this.measurementIndexMap.get(measurementSchema.getMeasurementId());
// Index is null means this measurement was not in this AlignedTVList before.
// We need to extend a new column in AlignedMemChunk and AlignedTVList.
// And the reorderedColumnValues should extend one more column for the new measurement
if (index == null) {
index = measurementIndexMap.size();
this.measurementIndexMap.put(schemaListInInsertPlan.get(i).getMeasurementId(), index);
this.schemaList.add(schemaListInInsertPlan.get(i));
this.list.extendColumn(schemaListInInsertPlan.get(i).getType());
reorderedColumnValues =
Arrays.copyOf(reorderedColumnValues, reorderedColumnValues.length + 1);
if (reorderedBitMaps != null) {
reorderedBitMaps = Arrays.copyOf(reorderedBitMaps, reorderedBitMaps.length + 1);
}
}
reorderedColumnValues[index] = columnValues[i];
if (bitMaps != null) {
reorderedBitMaps[index] = bitMaps[i];
}
}
}
return new Pair<>(reorderedColumnValues, reorderedBitMaps);
}
@Override
public TVList getTVList() {
return list;
}
@Override
public long count() {
return (long) list.rowCount() * measurementIndexMap.size();
}
public long alignedListSize() {
return list.rowCount();
}
@Override
public IMeasurementSchema getSchema() {
return null;
}
@Override
public long getMaxTime() {
return list.getMaxTime();
}
@Override
public synchronized TVList getSortedTvListForQuery() {
sortTVList();
// increase reference count
list.increaseReferenceCount();
return list;
}
@Override
public synchronized TVList getSortedTvListForQuery(List<IMeasurementSchema> schemaList) {
sortTVList();
// increase reference count
list.increaseReferenceCount();
List<Integer> columnIndexList = new ArrayList<>();
List<TSDataType> dataTypeList = new ArrayList<>();
for (IMeasurementSchema measurementSchema : schemaList) {
columnIndexList.add(
measurementIndexMap.getOrDefault(measurementSchema.getMeasurementId(), -1));
dataTypeList.add(measurementSchema.getType());
}
return list.getTvListByColumnIndex(columnIndexList, dataTypeList);
}
private void sortTVList() {
// check reference count
if ((list.getReferenceCount() > 0 && !list.isSorted())) {
list = list.clone();
}
if (!list.isSorted()) {
list.sort();
}
}
@Override
public synchronized void sortTvListForFlush() {
sortTVList();
}
@Override
public int delete(long lowerBound, long upperBound) {
return list.delete(lowerBound, upperBound);
}
public Pair<Integer, Boolean> deleteDataFromAColumn(
long lowerBound, long upperBound, String measurementId) {
return list.delete(lowerBound, upperBound, measurementIndexMap.get(measurementId));
}
public void removeColumn(String measurementId) {
list.deleteColumn(measurementIndexMap.get(measurementId));
IMeasurementSchema schemaToBeRemoved = schemaList.get(measurementIndexMap.get(measurementId));
schemaList.remove(schemaToBeRemoved);
measurementIndexMap.clear();
for (int i = 0; i < schemaList.size(); i++) {
measurementIndexMap.put(schemaList.get(i).getMeasurementId(), i);
}
}
@Override
public IChunkWriter createIChunkWriter() {
return new AlignedChunkWriterImpl(schemaList);
}
@SuppressWarnings({"squid:S6541", "squid:S3776"})
@Override
public void encode(IChunkWriter chunkWriter) {
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter;
BitMap rowBitMap = list.getRowBitMap();
boolean[] timeDuplicateInfo = null;
List<Integer> pageRange = new ArrayList<>();
int range = 0;
for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); sortedRowIndex++) {
long time = list.getTime(sortedRowIndex);
if (range == 0) {
pageRange.add(sortedRowIndex);
}
range++;
if (range == MAX_NUMBER_OF_POINTS_IN_PAGE) {
pageRange.add(sortedRowIndex);
range = 0;
}
int nextRowIndex = sortedRowIndex + 1;
while (nextRowIndex < list.rowCount()
&& rowBitMap != null
&& rowBitMap.isMarked(list.getValueIndex(nextRowIndex))) {
nextRowIndex++;
}
if (nextRowIndex != list.rowCount() && time == list.getTime(nextRowIndex)) {
if (Objects.isNull(timeDuplicateInfo)) {
timeDuplicateInfo = new boolean[list.rowCount()];
}
timeDuplicateInfo[sortedRowIndex] = true;
}
sortedRowIndex = nextRowIndex - 1;
}
if (range != 0) {
pageRange.add(list.rowCount() - 1);
}
List<TSDataType> dataTypes = list.getTsDataTypes();
Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new Pair[dataTypes.size()];
for (int pageNum = 0; pageNum < pageRange.size() / 2; pageNum += 1) {
for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) {
// Pair of Time and Index
if (Objects.nonNull(timeDuplicateInfo)
&& lastValidPointIndexForTimeDupCheck[columnIndex] == null) {
lastValidPointIndexForTimeDupCheck[columnIndex] = new Pair<>(Long.MIN_VALUE, null);
}
TSDataType tsDataType = dataTypes.get(columnIndex);
for (int sortedRowIndex = pageRange.get(pageNum * 2);
sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
sortedRowIndex++) {
// skip empty row
if (rowBitMap != null && rowBitMap.isMarked(list.getValueIndex(sortedRowIndex))) {
continue;
}
// skip time duplicated rows
long time = list.getTime(sortedRowIndex);
if (Objects.nonNull(timeDuplicateInfo)) {
if (!list.isNullValue(list.getValueIndex(sortedRowIndex), columnIndex)) {
lastValidPointIndexForTimeDupCheck[columnIndex].left = time;
lastValidPointIndexForTimeDupCheck[columnIndex].right =
list.getValueIndex(sortedRowIndex);
}
if (timeDuplicateInfo[sortedRowIndex]) {
if (!list.isNullValue(list.getValueIndex(sortedRowIndex), columnIndex)) {
long recordSize =
MemUtils.getRecordSize(
tsDataType,
tsDataType == TSDataType.TEXT
? list.getBinaryByValueIndex(
list.getValueIndex(sortedRowIndex), columnIndex)
: null,
true);
CompressionRatio.decreaseDuplicatedMemorySize(recordSize);
}
continue;
}
}
// The part of code solves the following problem:
// Time: 1,2,2,3
// Value: 1,2,null,null
// When rowIndex:1, pair(min,null), timeDuplicateInfo:false, write(T:1,V:1)
// When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing value
// When rowIndex:3, pair(2,2), timeDuplicateInfo:false, T:2!=air.left:2, write(T:2,V:2)
// When rowIndex:4, pair(2,2), timeDuplicateInfo:false, T:3!=pair.left:2,
// write(T:3,V:null)
int originRowIndex;
if (Objects.nonNull(lastValidPointIndexForTimeDupCheck[columnIndex])
&& (time == lastValidPointIndexForTimeDupCheck[columnIndex].left)) {
originRowIndex = lastValidPointIndexForTimeDupCheck[columnIndex].right;
} else {
originRowIndex = list.getValueIndex(sortedRowIndex);
}
boolean isNull = list.isNullValue(originRowIndex, columnIndex);
switch (tsDataType) {
case BOOLEAN:
alignedChunkWriter.writeByColumn(
time, list.getBooleanByValueIndex(originRowIndex, columnIndex), isNull);
break;
case INT32:
alignedChunkWriter.writeByColumn(
time, list.getIntByValueIndex(originRowIndex, columnIndex), isNull);
break;
case INT64:
alignedChunkWriter.writeByColumn(
time, list.getLongByValueIndex(originRowIndex, columnIndex), isNull);
break;
case FLOAT:
alignedChunkWriter.writeByColumn(
time, list.getFloatByValueIndex(originRowIndex, columnIndex), isNull);
break;
case DOUBLE:
alignedChunkWriter.writeByColumn(
time, list.getDoubleByValueIndex(originRowIndex, columnIndex), isNull);
break;
case TEXT:
alignedChunkWriter.writeByColumn(
time, list.getBinaryByValueIndex(originRowIndex, columnIndex), isNull);
break;
default:
break;
}
}
alignedChunkWriter.nextColumn();
}
long[] times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
int pointsInPage = 0;
for (int sortedRowIndex = pageRange.get(pageNum * 2);
sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
sortedRowIndex++) {
// skip empty row
if (rowBitMap != null && rowBitMap.isMarked(list.getValueIndex(sortedRowIndex))) {
continue;
}
if (Objects.isNull(timeDuplicateInfo) || !timeDuplicateInfo[sortedRowIndex]) {
times[pointsInPage++] = list.getTime(sortedRowIndex);
}
}
alignedChunkWriter.write(times, pointsInPage, 0);
}
}
@Override
public void release() {
if (list.getReferenceCount() == 0) {
list.clear();
}
}
@Override
public long getFirstPoint() {
if (list.rowCount() == 0) {
return Long.MAX_VALUE;
}
return getSortedTvListForQuery().getTimeValuePair(0).getTimestamp();
}
@Override
public long getLastPoint() {
if (list.rowCount() == 0) {
return Long.MIN_VALUE;
}
return getSortedTvListForQuery()
.getTimeValuePair(getSortedTvListForQuery().rowCount() - 1)
.getTimestamp();
}
@Override
public boolean isEmpty() {
return list.rowCount() == 0;
}
@Override
public int serializedSize() {
int size = 0;
size += Integer.BYTES;
for (IMeasurementSchema schema : schemaList) {
size += schema.serializedSize();
}
size += list.serializedSize();
return size;
}
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
WALWriteUtils.write(schemaList.size(), buffer);
for (IMeasurementSchema schema : schemaList) {
byte[] bytes = new byte[schema.serializedSize()];
schema.serializeTo(ByteBuffer.wrap(bytes));
buffer.put(bytes);
}
list.serializeToWAL(buffer);
}
public static AlignedWritableMemChunk deserialize(DataInputStream stream) throws IOException {
int schemaListSize = stream.readInt();
List<IMeasurementSchema> schemaList = new ArrayList<>(schemaListSize);
for (int i = 0; i < schemaListSize; i++) {
IMeasurementSchema schema = MeasurementSchema.deserializeFrom(stream);
schemaList.add(schema);
}
AlignedTVList list = (AlignedTVList) TVList.deserialize(stream);
return new AlignedWritableMemChunk(schemaList, list);
}
}