[IOTDB-4399] Control text chunk size in memtable (#7320)
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties b/server/src/assembly/resources/conf/iotdb-datanode.properties
index cd759c7..6a275d7 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -385,6 +385,11 @@
# Datatype: int
# avg_series_point_number_threshold=100000
+# When a chunk in memtable reaches this threshold, flush the memtable to disk.
+# The default threshold is 20 MB.
+# Datatype: long
+# max_chunk_raw_size_threshold = 20971520
+
# How many threads can concurrently flush. When <= 0, use CPU core number.
# Datatype: int
# concurrent_flush_thread=0
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 853e37d..c277e65 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -397,6 +397,9 @@
/** When average series point number reaches this, flush the memtable to disk */
private int avgSeriesPointNumberThreshold = 100000;
+ /** When a chunk in memtable reaches this threshold, flush the memtable to disk */
+ private long maxChunkRawSizeThreshold = 1024 * 1024 * 20L;
+
/** Enable inner space compaction for sequence files */
private boolean enableSeqSpaceCompaction = true;
@@ -1970,6 +1973,14 @@
this.avgSeriesPointNumberThreshold = avgSeriesPointNumberThreshold;
}
+ public long getMaxChunkRawSizeThreshold() {
+ return maxChunkRawSizeThreshold;
+ }
+
+ public void setMaxChunkRawSizeThreshold(long maxChunkRawSizeThreshold) {
+ this.maxChunkRawSizeThreshold = maxChunkRawSizeThreshold;
+ }
+
public long getCrossCompactionFileSelectionTimeBudget() {
return crossCompactionFileSelectionTimeBudget;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 081b54d..39d590c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -405,6 +405,12 @@
"avg_series_point_number_threshold",
Integer.toString(conf.getAvgSeriesPointNumberThreshold()))));
+ conf.setMaxChunkRawSizeThreshold(
+ Long.parseLong(
+ properties.getProperty(
+ "max_chunk_raw_size_threshold",
+ Long.toString(conf.getMaxChunkRawSizeThreshold()))));
+
conf.setCheckPeriodWhenInsertBlocked(
Integer.parseInt(
properties.getProperty(
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 756f94d..4eaa7df 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -440,7 +440,9 @@
Object[] objectValue) {
IWritableMemChunkGroup memChunkGroup =
createMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
- memChunkGroup.write(insertTime, objectValue, schemaList);
+ if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue, schemaList)) {
+ shouldFlush = true;
+ }
}
@Override
@@ -451,7 +453,9 @@
Object[] objectValue) {
IWritableMemChunkGroup memChunkGroup =
createAlignedMemChunkGroupIfNotExistAndGet(deviceId, schemaList);
- memChunkGroup.write(insertTime, objectValue, schemaList);
+ if (memChunkGroup.writeWithFlushCheck(insertTime, objectValue, schemaList)) {
+ shouldFlush = true;
+ }
}
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
@@ -472,13 +476,15 @@
}
IWritableMemChunkGroup memChunkGroup =
createMemChunkGroupIfNotExistAndGet(insertTabletPlan.getDeviceID(), schemaList);
- memChunkGroup.writeValues(
+ if (memChunkGroup.writeValuesWithFlushCheck(
insertTabletPlan.getTimes(),
insertTabletPlan.getColumns(),
insertTabletPlan.getBitMaps(),
schemaList,
start,
- end);
+ end)) {
+ shouldFlush = true;
+ }
}
public void write(InsertTabletNode insertTabletNode, int start, int end) {
@@ -498,13 +504,15 @@
}
IWritableMemChunkGroup memChunkGroup =
createMemChunkGroupIfNotExistAndGet(insertTabletNode.getDeviceID(), schemaList);
- memChunkGroup.writeValues(
+ if (memChunkGroup.writeValuesWithFlushCheck(
insertTabletNode.getTimes(),
insertTabletNode.getColumns(),
insertTabletNode.getBitMaps(),
schemaList,
start,
- end);
+ end)) {
+ shouldFlush = true;
+ }
}
@Override
@@ -527,13 +535,15 @@
}
IWritableMemChunkGroup memChunkGroup =
createAlignedMemChunkGroupIfNotExistAndGet(insertTabletPlan.getDeviceID(), schemaList);
- memChunkGroup.writeValues(
+ if (memChunkGroup.writeValuesWithFlushCheck(
insertTabletPlan.getTimes(),
insertTabletPlan.getColumns(),
insertTabletPlan.getBitMaps(),
schemaList,
start,
- end);
+ end)) {
+ shouldFlush = true;
+ }
}
public void writeAlignedTablet(InsertTabletNode insertTabletNode, int start, int end) {
@@ -556,13 +566,15 @@
}
IWritableMemChunkGroup memChunkGroup =
createAlignedMemChunkGroupIfNotExistAndGet(insertTabletNode.getDeviceID(), schemaList);
- memChunkGroup.writeValues(
+ if (memChunkGroup.writeValuesWithFlushCheck(
insertTabletNode.getTimes(),
insertTabletNode.getColumns(),
insertTabletNode.getBitMaps(),
schemaList,
start,
- end);
+ end)) {
+ shouldFlush = true;
+ }
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
index c9cbcbb..845fc8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
@@ -108,7 +108,7 @@
}
@Override
- public void putBinary(long t, Binary v) {
+ public boolean putBinaryWithFlushCheck(long t, Binary v) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@@ -118,8 +118,9 @@
}
@Override
- public void putAlignedValue(long t, Object[] v, int[] columnIndexArray) {
+ public boolean putAlignedValueWithFlushCheck(long t, Object[] v, int[] columnIndexArray) {
list.putAlignedValue(t, v, columnIndexArray);
+ return list.reachMaxChunkSizeThreshold();
}
@Override
@@ -143,7 +144,8 @@
}
@Override
- public void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end) {
+ public boolean putBinariesWithFlushCheck(
+ long[] t, Binary[] v, BitMap bitMap, int start, int end) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@@ -153,31 +155,32 @@
}
@Override
- public void putAlignedValues(
+ public boolean putAlignedValuesWithFlushCheck(
long[] t, Object[] v, BitMap[] bitMaps, int[] columnIndexArray, int start, int end) {
list.putAlignedValues(t, v, bitMaps, columnIndexArray, start, end);
+ return list.reachMaxChunkSizeThreshold();
}
@Override
- public void write(long insertTime, Object objectValue) {
+ public boolean writeWithFlushCheck(long insertTime, Object objectValue) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
- public void writeAlignedValue(
+ public boolean writeAlignedValueWithFlushCheck(
long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
int[] columnIndexArray = checkColumnsInInsertPlan(schemaList);
- putAlignedValue(insertTime, objectValue, columnIndexArray);
+ return putAlignedValueWithFlushCheck(insertTime, objectValue, columnIndexArray);
}
@Override
- public void write(
+ public boolean writeWithFlushCheck(
long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + TSDataType.VECTOR);
}
@Override
- public void writeAlignedValues(
+ public boolean writeAlignedValuesWithFlushCheck(
long[] times,
Object[] valueList,
BitMap[] bitMaps,
@@ -185,7 +188,7 @@
int start,
int end) {
int[] columnIndexArray = checkColumnsInInsertPlan(schemaList);
- putAlignedValues(times, valueList, bitMaps, columnIndexArray, start, end);
+ return putAlignedValuesWithFlushCheck(times, valueList, bitMaps, columnIndexArray, start, end);
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
index 31ed096..611b7a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunkGroup.java
@@ -45,14 +45,15 @@
private AlignedWritableMemChunkGroup() {}
@Override
- public void writeValues(
+ public boolean writeValuesWithFlushCheck(
long[] times,
Object[] columns,
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
int end) {
- memChunk.writeAlignedValues(times, columns, bitMaps, schemaList, start, end);
+ return memChunk.writeAlignedValuesWithFlushCheck(
+ times, columns, bitMaps, schemaList, start, end);
}
@Override
@@ -79,8 +80,9 @@
}
@Override
- public void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
- memChunk.writeAlignedValue(insertTime, objectValue, schemaList);
+ public boolean writeWithFlushCheck(
+ long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
+ return memChunk.writeAlignedValueWithFlushCheck(insertTime, objectValue, schemaList);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
index 7de03cc..5237e53 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunk.java
@@ -38,11 +38,11 @@
void putDouble(long t, double v);
- void putBinary(long t, Binary v);
+ boolean putBinaryWithFlushCheck(long t, Binary v);
void putBoolean(long t, boolean v);
- void putAlignedValue(long t, Object[] v, int[] columnIndexArray);
+ boolean putAlignedValueWithFlushCheck(long t, Object[] v, int[] columnIndexArray);
void putLongs(long[] t, long[] v, BitMap bitMap, int start, int end);
@@ -52,26 +52,26 @@
void putDoubles(long[] t, double[] v, BitMap bitMap, int start, int end);
- void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end);
+ boolean putBinariesWithFlushCheck(long[] t, Binary[] v, BitMap bitMap, int start, int end);
void putBooleans(long[] t, boolean[] v, BitMap bitMap, int start, int end);
- void putAlignedValues(
+ boolean putAlignedValuesWithFlushCheck(
long[] t, Object[] v, BitMap[] bitMaps, int[] columnIndexArray, int start, int end);
- void write(long insertTime, Object objectValue);
+ boolean writeWithFlushCheck(long insertTime, Object objectValue);
- void writeAlignedValue(
+ boolean writeAlignedValueWithFlushCheck(
long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList);
/**
* write data in the range [start, end). Null value in the valueList will be replaced by the
* subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5}
*/
- void write(
+ boolean writeWithFlushCheck(
long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end);
- void writeAlignedValues(
+ boolean writeAlignedValuesWithFlushCheck(
long[] times,
Object[] valueList,
BitMap[] bitMaps,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
index 73f2abe..e3d089f 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IWritableMemChunkGroup.java
@@ -29,7 +29,7 @@
public interface IWritableMemChunkGroup extends WALEntryValue {
- void writeValues(
+ boolean writeValuesWithFlushCheck(
long[] times,
Object[] columns,
BitMap[] bitMaps,
@@ -43,7 +43,8 @@
boolean contains(String measurement);
- void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList);
+ boolean writeWithFlushCheck(
+ long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList);
Map<String, IWritableMemChunk> getMemChunkMap();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
index 1951e47..c8bd614 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunk.java
@@ -52,7 +52,7 @@
private WritableMemChunk() {}
@Override
- public void write(long insertTime, Object objectValue) {
+ public boolean writeWithFlushCheck(long insertTime, Object objectValue) {
switch (schema.getType()) {
case BOOLEAN:
putBoolean(insertTime, (boolean) objectValue);
@@ -70,21 +70,21 @@
putDouble(insertTime, (double) objectValue);
break;
case TEXT:
- putBinary(insertTime, (Binary) objectValue);
- break;
+ return putBinaryWithFlushCheck(insertTime, (Binary) objectValue);
default:
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
}
+ return false;
}
@Override
- public void writeAlignedValue(
+ public boolean writeAlignedValueWithFlushCheck(
long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + list.getDataType());
}
@Override
- public void write(
+ public boolean writeWithFlushCheck(
long[] times, Object valueList, BitMap bitMap, TSDataType dataType, int start, int end) {
switch (dataType) {
case BOOLEAN:
@@ -109,15 +109,15 @@
break;
case TEXT:
Binary[] binaryValues = (Binary[]) valueList;
- putBinaries(times, binaryValues, bitMap, start, end);
- break;
+ return putBinariesWithFlushCheck(times, binaryValues, bitMap, start, end);
default:
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + dataType);
}
+ return false;
}
@Override
- public void writeAlignedValues(
+ public boolean writeAlignedValuesWithFlushCheck(
long[] times,
Object[] valueList,
BitMap[] bitMaps,
@@ -148,8 +148,9 @@
}
@Override
- public void putBinary(long t, Binary v) {
+ public boolean putBinaryWithFlushCheck(long t, Binary v) {
list.putBinary(t, v);
+ return list.reachMaxChunkSizeThreshold();
}
@Override
@@ -158,7 +159,7 @@
}
@Override
- public void putAlignedValue(long t, Object[] v, int[] columnOrder) {
+ public boolean putAlignedValueWithFlushCheck(long t, Object[] v, int[] columnIndexArray) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
}
@@ -183,8 +184,10 @@
}
@Override
- public void putBinaries(long[] t, Binary[] v, BitMap bitMap, int start, int end) {
+ public boolean putBinariesWithFlushCheck(
+ long[] t, Binary[] v, BitMap bitMap, int start, int end) {
list.putBinaries(t, v, bitMap, start, end);
+ return list.reachMaxChunkSizeThreshold();
}
@Override
@@ -193,8 +196,8 @@
}
@Override
- public void putAlignedValues(
- long[] t, Object[] v, BitMap[] bitMaps, int[] columnOrder, int start, int end) {
+ public boolean putAlignedValuesWithFlushCheck(
+ long[] t, Object[] v, BitMap[] bitMaps, int[] columnIndexArray, int start, int end) {
throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
index 703ac67..75e24d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkGroup.java
@@ -43,26 +43,29 @@
}
@Override
- public void writeValues(
+ public boolean writeValuesWithFlushCheck(
long[] times,
Object[] columns,
BitMap[] bitMaps,
List<IMeasurementSchema> schemaList,
int start,
int end) {
+ boolean flushFlag = false;
for (int i = 0; i < columns.length; i++) {
if (columns[i] == null) {
continue;
}
IWritableMemChunk memChunk = createMemChunkIfNotExistAndGet(schemaList.get(i));
- memChunk.write(
- times,
- columns[i],
- bitMaps == null ? null : bitMaps[i],
- schemaList.get(i).getType(),
- start,
- end);
+ flushFlag |=
+ memChunk.writeWithFlushCheck(
+ times,
+ columns[i],
+ bitMaps == null ? null : bitMaps[i],
+ schemaList.get(i).getType(),
+ start,
+ end);
}
+ return flushFlag;
}
private IWritableMemChunk createMemChunkIfNotExistAndGet(IMeasurementSchema schema) {
@@ -92,14 +95,17 @@
}
@Override
- public void write(long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
+ public boolean writeWithFlushCheck(
+ long insertTime, Object[] objectValue, List<IMeasurementSchema> schemaList) {
+ boolean flushFlag = false;
for (int i = 0; i < objectValue.length; i++) {
if (objectValue[i] == null) {
continue;
}
IWritableMemChunk memChunk = createMemChunkIfNotExistAndGet(schemaList.get(i));
- memChunk.write(insertTime, objectValue[i]);
+ flushFlag |= memChunk.writeWithFlushCheck(insertTime, objectValue[i]);
}
+ return flushFlag;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index fb950b7..39cdabb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -45,6 +45,7 @@
import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.ARRAY_SIZE;
import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM;
+import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
import static org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
@@ -54,6 +55,9 @@
// data types of this aligned tvList
protected List<TSDataType> dataTypes;
+ // record total memory size of binary column
+ protected long[] memoryBinaryChunkSize;
+
// data type list -> list of TVList, add 1 when expanded -> primitive array of basic type
// index relation: columnIndex(dataTypeIndex) -> arrayIndex -> elementIndex
protected List<List<Object>> values;
@@ -68,10 +72,16 @@
// index relation: columnIndex(dataTypeIndex) -> arrayIndex -> elementIndex
protected List<List<BitMap>> bitMaps;
+ // if a sensor chunk size of Text datatype reaches the threshold, this flag will be set true
+ boolean reachMaxChunkSizeFlag;
+
AlignedTVList(List<TSDataType> types) {
super();
indices = new ArrayList<>(types.size());
dataTypes = types;
+ memoryBinaryChunkSize = new long[dataTypes.size()];
+ reachMaxChunkSizeFlag = false;
+
values = new ArrayList<>(types.size());
for (int i = 0; i < types.size(); i++) {
values.add(new ArrayList<>());
@@ -89,6 +99,8 @@
public AlignedTVList clone() {
AlignedTVList cloneList = AlignedTVList.newAlignedList(dataTypes);
cloneAs(cloneList);
+ System.arraycopy(
+ memoryBinaryChunkSize, 0, cloneList.memoryBinaryChunkSize, 0, dataTypes.size());
for (int[] indicesArray : indices) {
cloneList.indices.add(cloneIndex(indicesArray));
}
@@ -136,6 +148,13 @@
case TEXT:
((Binary[]) columnValues.get(arrayIndex))[elementIndex] =
columnValue != null ? (Binary) columnValue : Binary.EMPTY_VALUE;
+ memoryBinaryChunkSize[i] +=
+ columnValue != null
+ ? getBinarySize((Binary) columnValue)
+ : getBinarySize(Binary.EMPTY_VALUE);
+ if (memoryBinaryChunkSize[i] >= maxChunkRawSizeThreshold) {
+ reachMaxChunkSizeFlag = true;
+ }
break;
case FLOAT:
((float[]) columnValues.get(arrayIndex))[elementIndex] =
@@ -312,6 +331,11 @@
this.bitMaps.add(columnBitMaps);
this.values.add(columnValue);
this.dataTypes.add(dataType);
+
+ long[] tmpValueChunkRawSize = memoryBinaryChunkSize;
+ memoryBinaryChunkSize = new long[dataTypes.size()];
+ System.arraycopy(
+ tmpValueChunkRawSize, 0, memoryBinaryChunkSize, 0, tmpValueChunkRawSize.length);
}
/**
@@ -457,6 +481,10 @@
int originRowIndex = getValueIndex(i);
int arrayIndex = originRowIndex / ARRAY_SIZE;
int elementIndex = originRowIndex % ARRAY_SIZE;
+ if (dataTypes.get(columnIndex) == TSDataType.TEXT) {
+ memoryBinaryChunkSize[columnIndex] -=
+ getBinarySize(((Binary[]) values.get(columnIndex).get(arrayIndex))[elementIndex]);
+ }
markNullValue(columnIndex, arrayIndex, elementIndex);
deletedNumber++;
} else {
@@ -468,6 +496,17 @@
public void deleteColumn(int columnIndex) {
dataTypes.remove(columnIndex);
+
+ long[] tmpValueChunkRawSize = memoryBinaryChunkSize;
+ memoryBinaryChunkSize = new long[dataTypes.size()];
+ int copyIndex = 0;
+ for (int i = 0; i < tmpValueChunkRawSize.length; i++) {
+ if (i == columnIndex) {
+ continue;
+ }
+ memoryBinaryChunkSize[copyIndex++] = tmpValueChunkRawSize[i];
+ }
+
for (Object array : values.get(columnIndex)) {
PrimitiveArrayManager.release(array);
}
@@ -548,6 +587,7 @@
columnBitMaps.clear();
}
}
+ memoryBinaryChunkSize[i] = 0;
}
}
@@ -618,6 +658,11 @@
}
}
+ @Override
+ public boolean reachMaxChunkSizeThreshold() {
+ return reachMaxChunkSizeFlag;
+ }
+
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
@Override
public void putAlignedValues(
@@ -688,6 +733,15 @@
case TEXT:
Binary[] arrayT = ((Binary[]) columnValues.get(arrayIndex));
System.arraycopy(value[columnIndexArray[i]], idx, arrayT, elementIndex, remaining);
+
+ // update raw size of Text chunk
+ for (int i1 = 0; i1 < remaining; i1++) {
+ memoryBinaryChunkSize[i] +=
+ arrayT[elementIndex + i1] != null ? getBinarySize(arrayT[elementIndex + i1]) : 0;
+ }
+ if (memoryBinaryChunkSize[i] > maxChunkRawSizeThreshold) {
+ reachMaxChunkSizeFlag = true;
+ }
break;
case FLOAT:
float[] arrayF = ((float[]) columnValues.get(arrayIndex));
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index 37f1598..d5c7e24 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -38,15 +38,20 @@
import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.ARRAY_SIZE;
import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.TVLIST_SORT_ALGORITHM;
+import static org.apache.iotdb.db.utils.MemUtils.getBinarySize;
public abstract class BinaryTVList extends TVList {
// list of primitive array, add 1 when expanded -> Binary primitive array
// index relation: arrayIndex -> elementIndex
protected List<Binary[]> values;
+ // record total memory size of binary tvlist
+ long memoryBinaryChunkSize;
+
BinaryTVList() {
super();
values = new ArrayList<>();
+ memoryBinaryChunkSize = 0;
}
public static BinaryTVList newList() {
@@ -60,6 +65,7 @@
public TimBinaryTVList clone() {
TimBinaryTVList cloneList = new TimBinaryTVList();
cloneAs(cloneList);
+ cloneList.memoryBinaryChunkSize = memoryBinaryChunkSize;
for (Binary[] valueArray : values) {
cloneList.values.add(cloneValue(valueArray));
}
@@ -84,6 +90,40 @@
if (sorted && rowCount > 1 && timestamp < getTime(rowCount - 2)) {
sorted = false;
}
+ memoryBinaryChunkSize += getBinarySize(value);
+ }
+
+ @Override
+ public boolean reachMaxChunkSizeThreshold() {
+ return memoryBinaryChunkSize >= maxChunkRawSizeThreshold;
+ }
+
+ @Override
+ public int delete(long lowerBound, long upperBound) {
+ int newSize = 0;
+ minTime = Long.MAX_VALUE;
+ for (int i = 0; i < rowCount; i++) {
+ long time = getTime(i);
+ if (time < lowerBound || time > upperBound) {
+ set(i, newSize++);
+ minTime = Math.min(time, minTime);
+ } else {
+ memoryBinaryChunkSize -= getBinarySize(getBinary(i));
+ }
+ }
+ int deletedNumber = rowCount - newSize;
+ rowCount = newSize;
+ // release primitive arrays that are empty
+ int newArrayNum = newSize / ARRAY_SIZE;
+ if (newSize % ARRAY_SIZE != 0) {
+ newArrayNum++;
+ }
+ int oldArrayNum = timestamps.size();
+ for (int releaseIdx = newArrayNum; releaseIdx < oldArrayNum; releaseIdx++) {
+ releaseLastTimeArray();
+ releaseLastValueArray();
+ }
+ return deletedNumber;
}
@Override
@@ -114,6 +154,7 @@
}
values.clear();
}
+ memoryBinaryChunkSize = 0;
}
@Override
@@ -176,6 +217,11 @@
updateMinTimeAndSorted(time, start, end);
}
+ // update raw size
+ for (int i = idx; i < end; i++) {
+ memoryBinaryChunkSize += getBinarySize(value[i]);
+ }
+
while (idx < end) {
int inputRemaining = end - idx;
int arrayIdx = rowCount / ARRAY_SIZE;
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 6786dca..721f9cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.utils.datastructure;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.utils.MathUtils;
import org.apache.iotdb.db.wal.buffer.WALEntryValue;
@@ -48,6 +49,8 @@
protected static final int SMALL_ARRAY_LENGTH = 32;
protected static final String ERR_DATATYPE_NOT_CONSISTENT = "DataType not consistent";
+ long maxChunkRawSizeThreshold =
+ IoTDBDescriptor.getInstance().getConfig().getMaxChunkRawSizeThreshold();
// list of timestamp array, add 1 when expanded -> data point timestamp array
// index relation: arrayIndex -> elementIndex
protected List<long[]> timestamps;
@@ -147,6 +150,10 @@
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
+ public boolean reachMaxChunkSizeThreshold() {
+ return false;
+ }
+
public void putBoolean(long time, boolean value) {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index fd01cd0..acc6210 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -76,7 +76,7 @@
new WritableMemChunk(new MeasurementSchema("s1", dataType, TSEncoding.PLAIN));
int count = 1000;
for (int i = 0; i < count; i++) {
- series.write(i, i);
+ series.writeWithFlushCheck(i, i);
}
IPointReader it =
series.getSortedTvListForQuery().buildTsBlock().getTsBlockSingleColumnIterator();
@@ -95,11 +95,11 @@
new WritableMemChunk(new MeasurementSchema("s1", dataType, TSEncoding.PLAIN));
int count = 100;
for (int i = 0; i < count; i++) {
- series.write(i, i);
+ series.writeWithFlushCheck(i, i);
}
- series.write(0, 21);
- series.write(99, 20);
- series.write(20, 21);
+ series.writeWithFlushCheck(0, 21);
+ series.writeWithFlushCheck(99, 20);
+ series.writeWithFlushCheck(20, 21);
String str = series.toString();
Assert.assertFalse(series.getTVList().isSorted());
Assert.assertEquals(
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java
index 15697f9..a685ce4 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/BinaryTVListTest.java
@@ -107,5 +107,35 @@
Assert.assertEquals(tvList.getBinary((int) i), clonedTvList.getBinary((int) i));
Assert.assertEquals(tvList.getTime((int) i), clonedTvList.getTime((int) i));
}
+ Assert.assertEquals(tvList.memoryBinaryChunkSize, clonedTvList.memoryBinaryChunkSize);
+ }
+
+ @Test
+ public void testCalculateChunkSize() {
+ BinaryTVList tvList = BinaryTVList.newList();
+ for (int i = 0; i < 10; i++) {
+ tvList.putBinary(i, Binary.valueOf(String.valueOf(i)));
+ }
+ Assert.assertEquals(tvList.memoryBinaryChunkSize, 360);
+
+ Binary[] binaryList = new Binary[10];
+ List<Long> timeList = new ArrayList<>();
+ BitMap bitMap = new BitMap(10);
+ for (int i = 0; i < 10; i++) {
+ timeList.add((long) i + 10);
+ binaryList[i] = Binary.valueOf(String.valueOf(i));
+ if (i % 2 == 0) {
+ bitMap.mark(i);
+ }
+ }
+ tvList.putBinaries(
+ ArrayUtils.toPrimitive(timeList.toArray(new Long[0])), binaryList, bitMap, 0, 10);
+ Assert.assertEquals(tvList.memoryBinaryChunkSize, 540);
+
+ tvList.delete(5, 15);
+ Assert.assertEquals(tvList.memoryBinaryChunkSize, 252);
+
+ tvList.clear();
+ Assert.assertEquals(tvList.memoryBinaryChunkSize, 0);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
index 7ddad739..7f660e7 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
@@ -204,5 +204,72 @@
tvList.isNullValue((int) i, column), clonedTvList.isNullValue((int) i, column));
}
}
+
+ for (int i = 0; i < dataTypes.size(); i++) {
+ Assert.assertEquals(tvList.memoryBinaryChunkSize[i], clonedTvList.memoryBinaryChunkSize[i]);
+ }
+ }
+
+ @Test
+ public void testCalculateChunkSize() {
+ List<TSDataType> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.INT32);
+ dataTypes.add(TSDataType.TEXT);
+ AlignedTVList tvList = AlignedTVList.newAlignedList(dataTypes);
+
+ int[] columnOrder = new int[2];
+ columnOrder[0] = 0;
+ columnOrder[1] = 1;
+ for (int i = 0; i < 10; i++) {
+ Object[] value = new Object[2];
+ value[0] = i;
+ value[1] = new Binary(String.valueOf(i));
+ tvList.putAlignedValue(i, value, columnOrder);
+ }
+
+ Assert.assertEquals(tvList.memoryBinaryChunkSize[0], 0);
+ Assert.assertEquals(tvList.memoryBinaryChunkSize[1], 360);
+
+ Object[] vectorArray = new Object[2];
+ BitMap[] bitMaps = new BitMap[2];
+
+ vectorArray[0] = new int[10];
+ vectorArray[1] = new Binary[10];
+ bitMaps[0] = new BitMap(10);
+ bitMaps[1] = new BitMap(10);
+
+ List<Long> timeList = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ timeList.add((long) i + 10);
+ ((int[]) vectorArray[0])[i] = i;
+ ((Binary[]) vectorArray[1])[i] = new Binary(String.valueOf(i));
+
+ if (i % 2 == 0) {
+ bitMaps[1].mark(i);
+ }
+ }
+
+ tvList.putAlignedValues(
+ ArrayUtils.toPrimitive(timeList.toArray(new Long[0])),
+ vectorArray,
+ bitMaps,
+ columnOrder,
+ 0,
+ 10);
+ Assert.assertEquals(tvList.memoryBinaryChunkSize[1], 720);
+
+ tvList.delete(5, 15);
+ Assert.assertEquals(tvList.memoryBinaryChunkSize[1], 324);
+
+ tvList.deleteColumn(0);
+ Assert.assertEquals(tvList.memoryBinaryChunkSize.length, 1);
+ Assert.assertEquals(tvList.memoryBinaryChunkSize[0], 324);
+
+ tvList.extendColumn(TSDataType.INT32);
+ Assert.assertEquals(tvList.memoryBinaryChunkSize.length, 2);
+ Assert.assertEquals(tvList.memoryBinaryChunkSize[0], 324);
+
+ tvList.clear();
+ Assert.assertEquals(tvList.memoryBinaryChunkSize[0], 0);
}
}