blob: 51af2315b95b8af5479fc95bf15090cfab0c8d72 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.memtable;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.MetaUtils;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.rescon.TVListAllocator;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
public abstract class AbstractMemTable implements IMemTable {
private final Map<String, Map<String, IWritableMemChunk>> memTableMap;
/**
* The initial value is true because we want calculate the text data size when recover memTable!!
*/
protected boolean disableMemControl = true;
private boolean shouldFlush = false;
private int avgSeriesPointNumThreshold =
IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
/** memory size of data points, including TEXT values */
private long memSize = 0;
/**
* memory usage of all TVLists memory usage regardless of whether these TVLists are full,
* including TEXT values
*/
private long tvListRamCost = 0;
private int seriesNumber = 0;
private long totalPointsNum = 0;
private long totalPointsNumThreshold = 0;
private long maxPlanIndex = Long.MIN_VALUE;
private long minPlanIndex = Long.MAX_VALUE;
private long createdTime = System.currentTimeMillis();
public AbstractMemTable() {
this.memTableMap = new HashMap<>();
}
public AbstractMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) {
this.memTableMap = memTableMap;
}
@Override
public Map<String, Map<String, IWritableMemChunk>> getMemTableMap() {
return memTableMap;
}
/**
* check whether the given seriesPath is within this memtable.
*
* @return true if seriesPath is within this memtable
*/
private boolean checkPath(String deviceId, String measurement) {
return memTableMap.containsKey(deviceId) && memTableMap.get(deviceId).containsKey(measurement);
}
/**
* create this memtable if it's not exist
*
* @param deviceId device id
* @param schema measurement schema
* @return this memtable
*/
private IWritableMemChunk createIfNotExistAndGet(String deviceId, IMeasurementSchema schema) {
Map<String, IWritableMemChunk> memSeries =
memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>());
return memSeries.computeIfAbsent(
schema.getMeasurementId(),
k -> {
seriesNumber++;
totalPointsNumThreshold += avgSeriesPointNumThreshold;
return genMemSeries(schema);
});
}
protected abstract IWritableMemChunk genMemSeries(IMeasurementSchema schema);
@Override
public void insert(InsertRowPlan insertRowPlan) {
updatePlanIndexes(insertRowPlan.getIndex());
Object[] values = insertRowPlan.getValues();
IMeasurementMNode[] measurementMNodes = insertRowPlan.getMeasurementMNodes();
int columnIndex = 0;
if (insertRowPlan.isAligned()) {
IMeasurementMNode measurementMNode = measurementMNodes[0];
if (measurementMNode != null) {
// write vector
Object[] vectorValue =
new Object[measurementMNode.getSchema().getValueTSDataTypeList().size()];
for (int j = 0; j < vectorValue.length; j++) {
vectorValue[j] = values[columnIndex];
columnIndex++;
}
memSize +=
MemUtils.getVectorRecordSize(
measurementMNode.getSchema().getValueTSDataTypeList(),
vectorValue,
disableMemControl);
write(
insertRowPlan.getPrefixPath().getFullPath(),
measurementMNode.getSchema(),
insertRowPlan.getTime(),
vectorValue);
}
} else {
for (IMeasurementMNode measurementMNode : measurementMNodes) {
if (values[columnIndex] == null) {
columnIndex++;
continue;
}
memSize +=
MemUtils.getRecordSize(
measurementMNode.getSchema().getType(), values[columnIndex], disableMemControl);
write(
insertRowPlan.getPrefixPath().getFullPath(),
measurementMNode.getSchema(),
insertRowPlan.getTime(),
values[columnIndex]);
columnIndex++;
}
}
totalPointsNum +=
insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber();
}
@Override
public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end)
throws WriteProcessException {
updatePlanIndexes(insertTabletPlan.getIndex());
try {
write(insertTabletPlan, start, end);
memSize += MemUtils.getRecordSize(insertTabletPlan, start, end, disableMemControl);
totalPointsNum +=
(insertTabletPlan.getDataTypes().length - insertTabletPlan.getFailedMeasurementNumber())
* (end - start);
} catch (RuntimeException e) {
throw new WriteProcessException(e);
}
}
@Override
public void write(
String deviceId, IMeasurementSchema schema, long insertTime, Object objectValue) {
IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, schema);
memSeries.write(insertTime, objectValue);
}
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
@Override
public void write(InsertTabletPlan insertTabletPlan, int start, int end) {
int columnIndex = 0;
updatePlanIndexes(insertTabletPlan.getIndex());
for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
if (insertTabletPlan.getColumns()[columnIndex] == null) {
columnIndex++;
continue;
}
IWritableMemChunk memSeries =
createIfNotExistAndGet(
insertTabletPlan.getPrefixPath().getFullPath(),
insertTabletPlan.getMeasurementMNodes()[i].getSchema());
if (insertTabletPlan.isAligned()) {
VectorMeasurementSchema vectorSchema =
(VectorMeasurementSchema) insertTabletPlan.getMeasurementMNodes()[i].getSchema();
Object[] columns = new Object[vectorSchema.getValueMeasurementIdList().size()];
BitMap[] bitMaps = new BitMap[vectorSchema.getValueMeasurementIdList().size()];
for (int j = 0; j < vectorSchema.getValueMeasurementIdList().size(); j++) {
columns[j] = insertTabletPlan.getColumns()[columnIndex];
if (insertTabletPlan.getBitMaps() != null) {
bitMaps[j] = insertTabletPlan.getBitMaps()[columnIndex];
}
columnIndex++;
}
memSeries.write(
insertTabletPlan.getTimes(), bitMaps, columns, TSDataType.VECTOR, start, end);
break;
} else {
memSeries.write(
insertTabletPlan.getTimes(),
insertTabletPlan.getBitMaps() != null
? insertTabletPlan.getBitMaps()[columnIndex]
: null,
insertTabletPlan.getColumns()[columnIndex],
insertTabletPlan.getDataTypes()[columnIndex],
start,
end);
columnIndex++;
}
}
}
@Override
public boolean checkIfChunkDoesNotExist(String deviceId, String measurement) {
Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId);
if (null == memSeries) {
return true;
}
return !memSeries.containsKey(measurement);
}
@Override
public int getCurrentChunkPointNum(String deviceId, String measurement) {
Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId);
IWritableMemChunk memChunk = memSeries.get(measurement);
return memChunk.getTVList().size();
}
@Override
public int getSeriesNumber() {
return seriesNumber;
}
@Override
public long getTotalPointsNum() {
return totalPointsNum;
}
@Override
public long size() {
long sum = 0;
for (Map<String, IWritableMemChunk> seriesMap : memTableMap.values()) {
for (IWritableMemChunk writableMemChunk : seriesMap.values()) {
sum += writableMemChunk.count();
}
}
return sum;
}
@Override
public long memSize() {
return memSize;
}
@Override
public boolean reachTotalPointNumThreshold() {
if (totalPointsNum == 0) {
return false;
}
return totalPointsNum >= totalPointsNumThreshold;
}
@Override
public void clear() {
memTableMap.clear();
memSize = 0;
seriesNumber = 0;
totalPointsNum = 0;
totalPointsNumThreshold = 0;
tvListRamCost = 0;
maxPlanIndex = 0;
}
@Override
public boolean isEmpty() {
return memTableMap.isEmpty();
}
@Override
public ReadOnlyMemChunk query(
String deviceId,
String measurement,
IMeasurementSchema partialVectorSchema,
long ttlLowerBound,
List<TimeRange> deletionList)
throws IOException, QueryProcessException {
if (partialVectorSchema.getType() == TSDataType.VECTOR) {
if (!memTableMap.containsKey(deviceId)) {
return null;
}
IWritableMemChunk vectorMemChunk =
memTableMap.get(deviceId).get(partialVectorSchema.getMeasurementId());
if (vectorMemChunk == null) {
return null;
}
List<String> measurementIdList = partialVectorSchema.getValueMeasurementIdList();
List<Integer> columns = new ArrayList<>();
IMeasurementSchema vectorSchema = vectorMemChunk.getSchema();
for (String queryingMeasurement : measurementIdList) {
columns.add(vectorSchema.getValueMeasurementIdList().indexOf(queryingMeasurement));
}
// get sorted tv list is synchronized so different query can get right sorted list reference
TVList vectorTvListCopy = vectorMemChunk.getSortedTvListForQuery(columns);
int curSize = vectorTvListCopy.size();
return new ReadOnlyMemChunk(partialVectorSchema, vectorTvListCopy, curSize, deletionList);
} else {
if (!checkPath(deviceId, measurement)) {
return null;
}
IWritableMemChunk memChunk =
memTableMap.get(deviceId).get(partialVectorSchema.getMeasurementId());
// get sorted tv list is synchronized so different query can get right sorted list reference
TVList chunkCopy = memChunk.getSortedTvListForQuery();
int curSize = chunkCopy.size();
return new ReadOnlyMemChunk(
measurement,
partialVectorSchema.getType(),
partialVectorSchema.getEncodingType(),
chunkCopy,
partialVectorSchema.getProps(),
curSize,
deletionList);
}
}
@SuppressWarnings("squid:S3776") // high Cognitive Complexity
@Override
public void delete(
PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp) {
Map<String, IWritableMemChunk> deviceMap = memTableMap.get(devicePath.getFullPath());
if (deviceMap == null) {
return;
}
Iterator<Entry<String, IWritableMemChunk>> iter = deviceMap.entrySet().iterator();
while (iter.hasNext()) {
Entry<String, IWritableMemChunk> entry = iter.next();
IWritableMemChunk chunk = entry.getValue();
PartialPath fullPath = devicePath.concatNode(entry.getKey());
IMeasurementSchema schema = chunk.getSchema();
if (originalPath.matchFullPath(fullPath)) {
if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
iter.remove();
}
int deletedPointsNumber = chunk.delete(startTimestamp, endTimestamp);
totalPointsNum -= deletedPointsNumber;
}
// for vector type
else if (schema.getType() == TSDataType.VECTOR) {
List<String> measurements = MetaUtils.getMeasurementsInPartialPath(originalPath);
if (measurements.containsAll(schema.getValueMeasurementIdList())) {
if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
iter.remove();
}
int deletedPointsNumber = chunk.delete(startTimestamp, endTimestamp);
totalPointsNum -= deletedPointsNumber;
}
}
}
}
@Override
public void addTVListRamCost(long cost) {
this.tvListRamCost += cost;
}
@Override
public long getTVListsRamCost() {
return tvListRamCost;
}
@Override
public void addTextDataSize(long testDataSize) {
this.memSize += testDataSize;
}
@Override
public void setShouldFlush() {
shouldFlush = true;
}
@Override
public boolean shouldFlush() {
return shouldFlush;
}
@Override
public void release() {
for (Entry<String, Map<String, IWritableMemChunk>> entry : memTableMap.entrySet()) {
for (Entry<String, IWritableMemChunk> subEntry : entry.getValue().entrySet()) {
TVList list = subEntry.getValue().getTVList();
if (list.getReferenceCount() == 0) {
TVListAllocator.getInstance().release(list);
}
}
}
}
@Override
public long getMaxPlanIndex() {
return maxPlanIndex;
}
@Override
public long getMinPlanIndex() {
return minPlanIndex;
}
void updatePlanIndexes(long index) {
maxPlanIndex = Math.max(index, maxPlanIndex);
minPlanIndex = Math.min(index, minPlanIndex);
}
@Override
public long getCreatedTime() {
return createdTime;
}
}