| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.engine.memtable; |
| |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; |
| import org.apache.iotdb.db.exception.WriteProcessException; |
| import org.apache.iotdb.db.exception.metadata.MetadataException; |
| import org.apache.iotdb.db.exception.query.QueryProcessException; |
| import org.apache.iotdb.db.metadata.PartialPath; |
| import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; |
| import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; |
| import org.apache.iotdb.db.rescon.TVListAllocator; |
| import org.apache.iotdb.db.utils.MemUtils; |
| import org.apache.iotdb.db.utils.datastructure.TVList; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; |
| import org.apache.iotdb.tsfile.read.common.TimeRange; |
| import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; |
| |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| |
| public abstract class AbstractMemTable implements IMemTable { |
| |
| private final Map<String, Map<String, IWritableMemChunk>> memTableMap; |
| /** |
| * The initial value is true because we want calculate the text data size when recover memTable!! |
| */ |
| protected boolean disableMemControl = true; |
| |
| private int avgSeriesPointNumThreshold = |
| IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold(); |
| /** memory size of data points, including TEXT values */ |
| private long memSize = 0; |
| /** |
| * memory usage of all TVLists memory usage regardless of whether these TVLists are full, |
| * including TEXT values |
| */ |
| private long tvListRamCost = 0; |
| |
| private int seriesNumber = 0; |
| |
| private long totalPointsNum = 0; |
| |
| private long totalPointsNumThreshold = 0; |
| |
| private long maxPlanIndex = Long.MIN_VALUE; |
| |
| private long minPlanIndex = Long.MAX_VALUE; |
| |
| public AbstractMemTable() { |
| this.memTableMap = new HashMap<>(); |
| } |
| |
| public AbstractMemTable(Map<String, Map<String, IWritableMemChunk>> memTableMap) { |
| this.memTableMap = memTableMap; |
| } |
| |
| @Override |
| public Map<String, Map<String, IWritableMemChunk>> getMemTableMap() { |
| return memTableMap; |
| } |
| |
| /** |
| * check whether the given seriesPath is within this memtable. |
| * |
| * @return true if seriesPath is within this memtable |
| */ |
| private boolean checkPath(String deviceId, String measurement) { |
| return memTableMap.containsKey(deviceId) && memTableMap.get(deviceId).containsKey(measurement); |
| } |
| |
| private IWritableMemChunk createIfNotExistAndGet( |
| String deviceId, String measurement, MeasurementSchema schema) { |
| Map<String, IWritableMemChunk> memSeries = |
| memTableMap.computeIfAbsent(deviceId, k -> new HashMap<>()); |
| |
| return memSeries.computeIfAbsent( |
| measurement, |
| k -> { |
| seriesNumber++; |
| totalPointsNumThreshold += avgSeriesPointNumThreshold; |
| return genMemSeries(schema); |
| }); |
| } |
| |
| protected abstract IWritableMemChunk genMemSeries(MeasurementSchema schema); |
| |
| @Override |
| public void insert(InsertRowPlan insertRowPlan) { |
| updatePlanIndexes(insertRowPlan.getIndex()); |
| Object[] values = insertRowPlan.getValues(); |
| |
| MeasurementMNode[] measurementMNodes = insertRowPlan.getMeasurementMNodes(); |
| String[] measurements = insertRowPlan.getMeasurements(); |
| for (int i = 0; i < values.length; i++) { |
| Object value = values[i]; |
| if (value == null) { |
| continue; |
| } |
| |
| memSize += |
| MemUtils.getRecordSize( |
| measurementMNodes[i].getSchema().getType(), value, disableMemControl); |
| |
| write( |
| insertRowPlan.getDeviceId().getFullPath(), |
| measurements[i], |
| measurementMNodes[i].getSchema(), |
| insertRowPlan.getTime(), |
| value); |
| } |
| |
| totalPointsNum += |
| insertRowPlan.getMeasurements().length - insertRowPlan.getFailedMeasurementNumber(); |
| } |
| |
| @Override |
| public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end) |
| throws WriteProcessException { |
| updatePlanIndexes(insertTabletPlan.getIndex()); |
| try { |
| write(insertTabletPlan, start, end); |
| memSize += MemUtils.getRecordSize(insertTabletPlan, start, end, disableMemControl); |
| totalPointsNum += |
| (insertTabletPlan.getMeasurements().length |
| - insertTabletPlan.getFailedMeasurementNumber()) |
| * (end - start); |
| } catch (RuntimeException e) { |
| throw new WriteProcessException(e); |
| } |
| } |
| |
| @Override |
| public void write( |
| String deviceId, |
| String measurement, |
| MeasurementSchema schema, |
| long insertTime, |
| Object objectValue) { |
| IWritableMemChunk memSeries = createIfNotExistAndGet(deviceId, measurement, schema); |
| memSeries.write(insertTime, objectValue); |
| } |
| |
| @Override |
| public void write(InsertTabletPlan insertTabletPlan, int start, int end) { |
| updatePlanIndexes(insertTabletPlan.getIndex()); |
| for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) { |
| if (insertTabletPlan.getColumns()[i] == null) { |
| continue; |
| } |
| IWritableMemChunk memSeries = |
| createIfNotExistAndGet( |
| insertTabletPlan.getDeviceId().getFullPath(), |
| insertTabletPlan.getMeasurements()[i], |
| insertTabletPlan.getMeasurementMNodes()[i].getSchema()); |
| memSeries.write( |
| insertTabletPlan.getTimes(), |
| insertTabletPlan.getColumns()[i], |
| insertTabletPlan.getDataTypes()[i], |
| start, |
| end); |
| } |
| } |
| |
| @Override |
| public boolean checkIfChunkDoesNotExist(String deviceId, String measurement) { |
| Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId); |
| if (null == memSeries) { |
| return true; |
| } |
| |
| return !memSeries.containsKey(measurement); |
| } |
| |
| @Override |
| public int getCurrentChunkPointNum(String deviceId, String measurement) { |
| Map<String, IWritableMemChunk> memSeries = memTableMap.get(deviceId); |
| IWritableMemChunk memChunk = memSeries.get(measurement); |
| return memChunk.getTVList().size(); |
| } |
| |
| @Override |
| public int getSeriesNumber() { |
| return seriesNumber; |
| } |
| |
| @Override |
| public long getTotalPointsNum() { |
| return totalPointsNum; |
| } |
| |
| @Override |
| public long size() { |
| long sum = 0; |
| for (Map<String, IWritableMemChunk> seriesMap : memTableMap.values()) { |
| for (IWritableMemChunk writableMemChunk : seriesMap.values()) { |
| sum += writableMemChunk.count(); |
| } |
| } |
| return sum; |
| } |
| |
| @Override |
| public long memSize() { |
| return memSize; |
| } |
| |
| @Override |
| public boolean reachTotalPointNumThreshold() { |
| if (totalPointsNum == 0) { |
| return false; |
| } |
| return totalPointsNum >= totalPointsNumThreshold; |
| } |
| |
| @Override |
| public void clear() { |
| memTableMap.clear(); |
| memSize = 0; |
| seriesNumber = 0; |
| totalPointsNum = 0; |
| totalPointsNumThreshold = 0; |
| tvListRamCost = 0; |
| maxPlanIndex = 0; |
| } |
| |
| @Override |
| public boolean isEmpty() { |
| return memTableMap.isEmpty(); |
| } |
| |
| @Override |
| public ReadOnlyMemChunk query( |
| String deviceId, |
| String measurement, |
| TSDataType dataType, |
| TSEncoding encoding, |
| Map<String, String> props, |
| long timeLowerBound, |
| List<TimeRange> deletionList) |
| throws IOException, QueryProcessException, MetadataException { |
| if (!checkPath(deviceId, measurement)) { |
| return null; |
| } |
| IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement); |
| // get sorted tv list is synchronized so different query can get right sorted list reference |
| TVList chunkCopy = memChunk.getSortedTVListForQuery(); |
| int curSize = chunkCopy.size(); |
| |
| return new ReadOnlyMemChunk( |
| measurement, dataType, encoding, chunkCopy, props, curSize, deletionList); |
| } |
| |
| @Override |
| public void delete( |
| PartialPath originalPath, PartialPath devicePath, long startTimestamp, long endTimestamp) { |
| Map<String, IWritableMemChunk> deviceMap = memTableMap.get(devicePath.getFullPath()); |
| if (deviceMap == null) { |
| return; |
| } |
| |
| Iterator<Entry<String, IWritableMemChunk>> iter = deviceMap.entrySet().iterator(); |
| while (iter.hasNext()) { |
| Entry<String, IWritableMemChunk> entry = iter.next(); |
| IWritableMemChunk chunk = entry.getValue(); |
| PartialPath fullPath = devicePath.concatNode(entry.getKey()); |
| if (originalPath.matchFullPath(fullPath)) { |
| if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) { |
| iter.remove(); |
| } |
| int deletedPointsNumber = chunk.delete(startTimestamp, endTimestamp); |
| totalPointsNum -= deletedPointsNumber; |
| } |
| } |
| } |
| |
| @Override |
| public void addTVListRamCost(long cost) { |
| this.tvListRamCost += cost; |
| } |
| |
| @Override |
| public long getTVListsRamCost() { |
| return tvListRamCost; |
| } |
| |
| @Override |
| public void addTextDataSize(long testDataSize) { |
| this.memSize += testDataSize; |
| } |
| |
| @Override |
| public void release() { |
| for (Entry<String, Map<String, IWritableMemChunk>> entry : memTableMap.entrySet()) { |
| for (Entry<String, IWritableMemChunk> subEntry : entry.getValue().entrySet()) { |
| TVList list = subEntry.getValue().getTVList(); |
| if (list.getReferenceCount() == 0) { |
| TVListAllocator.getInstance().release(list); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public long getMaxPlanIndex() { |
| return maxPlanIndex; |
| } |
| |
| @Override |
| public long getMinPlanIndex() { |
| return minPlanIndex; |
| } |
| |
| void updatePlanIndexes(long index) { |
| maxPlanIndex = Math.max(index, maxPlanIndex); |
| minPlanIndex = Math.min(index, minPlanIndex); |
| } |
| } |