temporay support parallel insert and pass session-example
diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index d5f4045..088f3f1 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -90,11 +90,11 @@ // createTemplate(); createTimeseries(); createMultiTimeseries(); - insertRecord(); - insertTablet(); + // insertRecord(); + // insertTablet(); // insertTabletWithNullValues(); // insertTablets(); - // insertRecords(); + insertRecords(); // insertText(); // selectInto(); // createAndDropContinuousQueries();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 854401d..3b50f45 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1061,6 +1061,9 @@ // customizedProperties, this should be empty by default. private Properties customizedProperties = new Properties(); + private boolean enableMultiThreadingInsert = true; + private int insertThreadNum = Runtime.getRuntime().availableProcessors(); + // IoTConsensus Config private int maxLogEntriesNumPerBatch = 1024; private int maxSizePerBatch = 16 * 1024 * 1024; @@ -3801,4 +3804,20 @@ double innerCompactionTaskSelectionDiskRedundancy) { this.innerCompactionTaskSelectionDiskRedundancy = innerCompactionTaskSelectionDiskRedundancy; } + + public boolean isEnableMultiThreadingInsert() { + return enableMultiThreadingInsert; + } + + public void setEnableMultiThreadingInsert(boolean enableMultiThreadingInsert) { + this.enableMultiThreadingInsert = enableMultiThreadingInsert; + } + + public int getInsertThreadNum() { + return insertThreadNum; + } + + public void setInsertThreadNum(int insertThreadNum) { + this.insertThreadNum = insertThreadNum; + } }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index e4c5c24..13a0b2c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -663,6 +663,16 @@ "inner_compaction_task_selection_mods_file_threshold", Long.toString(conf.getInnerCompactionTaskSelectionModsFileThreshold())))); + conf.setEnableMultiThreadingInsert( + Boolean.parseBoolean( + properties.getProperty( + "enable_multi_threading_insert", + String.valueOf(conf.isEnableMultiThreadingInsert())))); + conf.setInsertThreadNum( + Integer.parseInt( + properties.getProperty( + "insert_thread_num", Integer.toString(conf.getInsertThreadNum())))); + conf.setEnablePartialInsert( Boolean.parseBoolean( properties.getProperty(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertTask.java new file mode 100644 index 0000000..4e312b5 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertTask.java
@@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.experiment; + +import org.apache.iotdb.db.exception.WriteProcessException; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; +import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; + +public class InsertTask implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(InsertTask.class); + private TsFileProcessor processor; + private InsertRowNode insertRowNode; + private CountDownLatch countDownLatch; + private AtomicLong[] costForMetrics; + private List<InsertRowNode> executedList; + + public InsertTask( + TsFileProcessor processor, + InsertRowNode insertRowNode, + AtomicLong[] costForMetrics, + CountDownLatch countDownLatch, + List<InsertRowNode> executedList) { + this.processor = processor; + this.insertRowNode = insertRowNode; + this.countDownLatch = countDownLatch; + this.costForMetrics = costForMetrics; + this.executedList = executedList; + } + + @Override + public void run() { + try { + long[] tempCostForMetrics = new long[4]; + processor.insert(insertRowNode, tempCostForMetrics); + for (int i = 0; i < 4; i++) { + costForMetrics[i].addAndGet(tempCostForMetrics[i]); + } + executedList.add(insertRowNode); + } catch (WriteProcessException e) { + LOGGER.error("Insertion failed", e); + } finally { + countDownLatch.countDown(); + } + } +}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertWorkers.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertWorkers.java new file mode 100644 index 0000000..6e254d2 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/experiment/InsertWorkers.java
@@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.experiment; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class InsertWorkers { + private static final Logger LOGGER = LoggerFactory.getLogger(InsertWorkers.class); + private static ThreadPoolExecutor threadPool = null; + + static { + if (IoTDBDescriptor.getInstance().getConfig().isEnableMultiThreadingInsert()) { + LOGGER.info("Initializing thread pool for parallel insert"); + threadPool = + new ThreadPoolExecutor( + Runtime.getRuntime().availableProcessors(), + IoTDBDescriptor.getInstance().getConfig().getInsertThreadNum(), + 10, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(8196), + new ThreadPoolExecutor.CallerRunsPolicy()); + } + } + + public static void submit(InsertTask task) { + threadPool.submit(task); + } +}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index c408b73..e3c847d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -163,6 +163,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -214,6 +215,11 @@ private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS = PerformanceOverviewMetrics.getInstance(); + public static final AtomicLong partitionSettingTimeInTotal = new AtomicLong(0); + public static final AtomicLong partitionSettingTimeCount = new AtomicLong(0); + public static final AtomicLong partitionFetchingTimeInTotal = new AtomicLong(0); + public static final AtomicLong partitionFetchingTimeCount = new AtomicLong(0); + public AnalyzeVisitor(IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) { this.partitionFetcher = partitionFetcher; this.schemaFetcher = schemaFetcher; @@ -2526,6 +2532,7 @@ private Analysis computeAnalysisForInsertRows( Analysis analysis, InsertRowsStatement insertRowsStatement, String userName) { + final long startTime = System.nanoTime(); Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>(); for (InsertRowStatement insertRowStatement : insertRowsStatement.getInsertRowStatementList()) { Set<TTimePartitionSlot> timePartitionSlotSet = @@ -2541,7 +2548,15 @@ dataPartitionQueryParam.setTimePartitionSlotList(new ArrayList<>(entry.getValue())); dataPartitionQueryParams.add(dataPartitionQueryParam); } - + long timeCost = System.nanoTime() - startTime; + long time = partitionSettingTimeInTotal.addAndGet(timeCost); + long count = partitionSettingTimeCount.incrementAndGet(); + if (count % 1000 == 0) { + logger.info( + String.format( + "The average time cost of setting partition is %.2f ms", + (double) time / count / 1000000)); + } return getAnalysisForWriting(analysis, dataPartitionQueryParams, userName); } @@ -2703,7 +2718,7 @@ /** get analysis according to statement and params */ private Analysis getAnalysisForWriting( Analysis analysis, List<DataPartitionQueryParam> dataPartitionQueryParams, String userName) { - + final long startTime = System.nanoTime(); DataPartition dataPartition = partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams, userName); if (dataPartition.isEmpty()) { @@ -2715,6 +2730,14 @@ + "because enable_auto_create_schema is FALSE.")); } analysis.setDataPartitionInfo(dataPartition); + long timeCost = System.nanoTime() - startTime; + long time = partitionFetchingTimeCount.addAndGet(timeCost); + long count = partitionFetchingTimeCount.incrementAndGet(); + if (count % 1000 == 0) { + logger.info( + String.format( + "Average time cost of fetching partition is %.2f ms", (double) time / count / 1e6)); + } return analysis; }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index d5cf082..da1675a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -49,6 +49,8 @@ import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.quota.ExceedQuotaException; +import org.apache.iotdb.db.experiment.InsertTask; +import org.apache.iotdb.db.experiment.InsertWorkers; import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet; @@ -144,6 +146,7 @@ import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.Phaser; @@ -3199,7 +3202,11 @@ > lastFlushTimeMap.getFlushedTime( timePartitionIds[i], insertRowNode.getDevicePath().getFullPath()); } - insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds); + if (IoTDBDescriptor.getInstance().getConfig().isEnableMultiThreadingInsert()) { + insertToTsFileProcessorsParallel(insertRowsNode, areSequence, timePartitionIds); + } else { + insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds); + } if (!insertRowsNode.getResults().isEmpty()) { throw new BatchProcessException("Partial failed inserting rows"); } @@ -3208,6 +3215,61 @@ } } + public void insertToTsFileProcessorsParallel( + InsertRowsNode insertRowsNode, boolean[] areSequence, long[] timePartitionIds) { + List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>(); + AtomicLong[] costsForMetrics = new AtomicLong[4]; + for (int i = 0; i < costsForMetrics.length; i++) { + costsForMetrics[i] = new AtomicLong(0); + } + Map<TsFileProcessor, Boolean> tsFileProcessorMapForFlushing = new HashMap<>(); + CountDownLatch latch = new CountDownLatch(insertRowsNode.getInsertRowNodeList().size()); + for (int i = 0; i < areSequence.length; i++) { + InsertRowNode insertRowNode = insertRowsNode.getInsertRowNodeList().get(i); + if (insertRowNode.allMeasurementFailed()) { + latch.countDown(); + continue; + } + TsFileProcessor tsFileProcessor = + getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]); + if (tsFileProcessor == null) { + latch.countDown(); + continue; + } + tsFileProcessorMapForFlushing.put(tsFileProcessor, areSequence[i]); + InsertWorkers.submit( + new InsertTask( + tsFileProcessor, insertRowNode, costsForMetrics, latch, executedInsertRowNodeList)); + } + try { + latch.await(); + } catch (InterruptedException e) { + logger.error("Insertion is interrupted", e); + return; + } + // check memtable size and may asyncTryToFlush the work memtable + for (Map.Entry<TsFileProcessor, Boolean> entry : tsFileProcessorMapForFlushing.entrySet()) { + if (entry.getKey().shouldFlush()) { + fileFlushPolicy.apply(this, entry.getKey(), entry.getValue()); + } + } + + PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0].get()); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1].get()); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2].get()); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3].get()); + if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) { + if ((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) + && insertRowsNode.isSyncFromLeaderWhenUsingIoTConsensus())) { + return; + } + // disable updating last cache on follower + long startTime = System.nanoTime(); + tryToUpdateInsertRowsLastCache(executedInsertRowNodeList); + PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime() - startTime); + } + } + /** * insert batch of tablets belongs to multiple devices *
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index e54e7ef..62cc8ad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -51,6 +51,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -70,13 +71,15 @@ private volatile FlushStatus flushStatus = FlushStatus.WORKING; private final int avgSeriesPointNumThreshold = IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold(); + /** Memory size of data points, including TEXT values. */ - private long memSize = 0; + private AtomicLong memSize = new AtomicLong(0); + /** * Memory usage of all TVLists memory usage regardless of whether these TVLists are full, * including TEXT values. */ - private long tvListRamCost = 0; + private AtomicLong tvListRamCost = new AtomicLong(0); private int seriesNumber = 0; @@ -110,13 +113,13 @@ protected AbstractMemTable() { this.database = null; this.dataRegionId = null; - this.memTableMap = new HashMap<>(); + this.memTableMap = new ConcurrentHashMap<>(); } protected AbstractMemTable(String database, String dataRegionId) { this.database = database; this.dataRegionId = dataRegionId; - this.memTableMap = new HashMap<>(); + this.memTableMap = new ConcurrentHashMap<>(); } protected AbstractMemTable( @@ -142,13 +145,15 @@ IDeviceID deviceId, List<IMeasurementSchema> schemaList) { IWritableMemChunkGroup memChunkGroup = memTableMap.computeIfAbsent(deviceId, k -> new WritableMemChunkGroup()); - for (IMeasurementSchema schema : schemaList) { - if (schema != null && !memChunkGroup.contains(schema.getMeasurementId())) { - seriesNumber++; - totalPointsNumThreshold += avgSeriesPointNumThreshold; + synchronized (memChunkGroup) { + for (IMeasurementSchema schema : schemaList) { + if (schema != null && !memChunkGroup.contains(schema.getMeasurementId())) { + seriesNumber++; + totalPointsNumThreshold += avgSeriesPointNumThreshold; + } } + return memChunkGroup; } - return memChunkGroup; } private IWritableMemChunkGroup createAlignedMemChunkGroupIfNotExistAndGet( @@ -199,7 +204,7 @@ dataTypes.add(schema.getType()); } } - memSize += MemUtils.getRowRecordSize(dataTypes, values); + memSize.addAndGet(MemUtils.getRowRecordSize(dataTypes, values)); write(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values); int pointsInserted = @@ -246,7 +251,7 @@ if (schemaList.isEmpty()) { return; } - memSize += MemUtils.getAlignedRowRecordSize(dataTypes, values); + memSize.addAndGet(MemUtils.getAlignedRowRecordSize(dataTypes, values)); writeAlignedRow(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values); int pointsInserted = insertRowNode.getMeasurements().length - insertRowNode.getFailedMeasurementNumber(); @@ -270,7 +275,7 @@ throws WriteProcessException { try { writeTabletNode(insertTabletNode, start, end); - memSize += MemUtils.getTabletSize(insertTabletNode, start, end); + memSize.addAndGet(MemUtils.getTabletSize(insertTabletNode, start, end)); int pointsInserted = (insertTabletNode.getDataTypes().length - insertTabletNode.getFailedMeasurementNumber()) * (end - start); @@ -296,7 +301,7 @@ throws WriteProcessException { try { writeAlignedTablet(insertTabletNode, start, end); - memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end); + memSize.addAndGet(MemUtils.getAlignedTabletSize(insertTabletNode, start, end)); int pointsInserted = (insertTabletNode.getDataTypes().length - insertTabletNode.getFailedMeasurementNumber()) * (end - start); @@ -436,7 +441,7 @@ @Override public long memSize() { - return memSize; + return memSize.get(); } @Override @@ -450,11 +455,11 @@ @Override public void clear() { memTableMap.clear(); - memSize = 0; + memSize = new AtomicLong(0); seriesNumber = 0; totalPointsNum = 0; totalPointsNumThreshold = 0; - tvListRamCost = 0; + tvListRamCost = new AtomicLong(0); maxPlanIndex = 0; minPlanIndex = 0; } @@ -531,27 +536,27 @@ @Override public void addTVListRamCost(long cost) { - this.tvListRamCost += cost; + this.tvListRamCost.addAndGet(cost); } @Override public void releaseTVListRamCost(long cost) { - this.tvListRamCost -= cost; + this.tvListRamCost.addAndGet(-cost); } @Override public long getTVListsRamCost() { - return tvListRamCost; + return tvListRamCost.get(); } @Override public void addTextDataSize(long textDataSize) { - this.memSize += textDataSize; + this.memSize.addAndGet(textDataSize); } @Override public void releaseTextDataSize(long textDataSize) { - this.memSize -= textDataSize; + this.memSize.addAndGet(-textDataSize); } @Override @@ -634,8 +639,8 @@ return; } buffer.putInt(seriesNumber); - buffer.putLong(memSize); - buffer.putLong(tvListRamCost); + buffer.putLong(memSize.get()); + buffer.putLong(tvListRamCost.get()); buffer.putLong(totalPointsNum); buffer.putLong(totalPointsNumThreshold); buffer.putLong(maxPlanIndex); @@ -653,8 +658,8 @@ public void deserialize(DataInputStream stream) throws IOException { seriesNumber = stream.readInt(); - memSize = stream.readLong(); - tvListRamCost = stream.readLong(); + memSize = new AtomicLong(stream.readLong()); + tvListRamCost = new AtomicLong(stream.readLong()); totalPointsNum = stream.readLong(); totalPointsNumThreshold = stream.readLong(); maxPlanIndex = stream.readLong();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java index 70b0f90..b8ec268 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTable.java
@@ -19,8 +19,8 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class PrimitiveMemTable extends AbstractMemTable { // this constructor only used when deserialize @@ -39,7 +39,7 @@ @Override public IMemTable copy() { - Map<IDeviceID, IWritableMemChunkGroup> newMap = new HashMap<>(getMemTableMap()); + Map<IDeviceID, IWritableMemChunkGroup> newMap = new ConcurrentHashMap<>(getMemTableMap()); return new PrimitiveMemTable(getDatabase(), getDataRegionId(), newMap); }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 953606e..6a20269 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -139,7 +139,7 @@ private volatile boolean shouldClose; /** working memtable. */ - private IMemTable workMemTable; + private volatile IMemTable workMemTable; /** last flush time to flush the working memtable. */ private long lastWorkMemtableFlushTime; @@ -239,12 +239,16 @@ throws WriteProcessException { if (workMemTable == null) { - long startTime = System.nanoTime(); - createNewWorkingMemTable(); - // recordCreateMemtableBlockCost - costsForMetrics[0] += System.nanoTime() - startTime; - WritingMetrics.getInstance() - .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1); + synchronized (this) { + if (workMemTable == null) { + long startTime = System.nanoTime(); + createNewWorkingMemTable(); + // recordCreateMemtableBlockCost + costsForMetrics[0] += System.nanoTime() - startTime; + WritingMetrics.getInstance() + .recordActiveMemTableCount(dataRegionInfo.getDataRegion().getDataRegionId(), 1); + } + } } long[] memIncrements = null;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfo.java index b06ed63..9ebbdbe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorInfo.java
@@ -21,6 +21,8 @@ import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo; +import java.util.concurrent.atomic.AtomicLong; + /** The TsFileProcessorInfo records the memory cost of this TsFileProcessor. */ public class TsFileProcessorInfo { @@ -28,13 +30,13 @@ private final DataRegionInfo dataRegionInfo; /** memory occupation of unsealed TsFileResource, ChunkMetadata, WAL */ - private long memCost; + private AtomicLong memCost; private final TsFileProcessorInfoMetrics metrics; public TsFileProcessorInfo(DataRegionInfo dataRegionInfo) { this.dataRegionInfo = dataRegionInfo; - this.memCost = 0L; + this.memCost = new AtomicLong(0); this.metrics = new TsFileProcessorInfoMetrics(dataRegionInfo.getDataRegion().getDatabaseName(), this); MetricService.getInstance().addMetricSet(metrics); @@ -42,25 +44,25 @@ /** called in each insert */ public void addTSPMemCost(long cost) { - memCost += cost; + memCost.addAndGet(cost); dataRegionInfo.addStorageGroupMemCost(cost); } /** called when meet exception */ public void releaseTSPMemCost(long cost) { dataRegionInfo.releaseStorageGroupMemCost(cost); - memCost -= cost; + memCost.addAndGet(-cost); } /** called when closing TSP */ public void clear() { - dataRegionInfo.releaseStorageGroupMemCost(memCost); - memCost = 0L; + dataRegionInfo.releaseStorageGroupMemCost(memCost.get()); + memCost.set(0); MetricService.getInstance().removeMetricSet(metrics); } /** get memCost */ public long getMemCost() { - return memCost; + return memCost.get(); } }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index c1a71d3..6248026 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -59,28 +59,30 @@ @Override public boolean writeWithFlushCheck(long insertTime, Object objectValue) { - switch (schema.getType()) { - case BOOLEAN: - putBoolean(insertTime, (boolean) objectValue); - break; - case INT32: - putInt(insertTime, (int) objectValue); - break; - case INT64: - putLong(insertTime, (long) objectValue); - break; - case FLOAT: - putFloat(insertTime, (float) objectValue); - break; - case DOUBLE: - putDouble(insertTime, (double) objectValue); - break; - case TEXT: - return putBinaryWithFlushCheck(insertTime, (Binary) objectValue); - default: - throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + synchronized (this) { + switch (schema.getType()) { + case BOOLEAN: + putBoolean(insertTime, (boolean) objectValue); + break; + case INT32: + putInt(insertTime, (int) objectValue); + break; + case INT64: + putLong(insertTime, (long) objectValue); + break; + case FLOAT: + putFloat(insertTime, (float) objectValue); + break; + case DOUBLE: + putDouble(insertTime, (double) objectValue); + break; + case TEXT: + return putBinaryWithFlushCheck(insertTime, (Binary) objectValue); + default: + throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); + } + return false; } - return false; } @Override
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java index 707eace..2245808 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java
@@ -29,18 +29,18 @@ import java.io.DataInputStream; import java.io.IOException; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; public class WritableMemChunkGroup implements IWritableMemChunkGroup { private Map<String, IWritableMemChunk> memChunkMap; public WritableMemChunkGroup() { - memChunkMap = new HashMap<>(); + memChunkMap = new ConcurrentHashMap<>(); } @Override