| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.engine.storagegroup; |
| |
| import org.apache.iotdb.common.rpc.thrift.TSStatus; |
| import org.apache.iotdb.commons.conf.CommonDescriptor; |
| import org.apache.iotdb.commons.exception.IllegalPathException; |
| import org.apache.iotdb.commons.exception.MetadataException; |
| import org.apache.iotdb.commons.path.AlignedPath; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.commons.utils.TestOnly; |
| import org.apache.iotdb.db.conf.IoTDBConfig; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.conf.adapter.CompressionRatio; |
| import org.apache.iotdb.db.engine.StorageEngine; |
| import org.apache.iotdb.db.engine.flush.CloseFileListener; |
| import org.apache.iotdb.db.engine.flush.FlushListener; |
| import org.apache.iotdb.db.engine.flush.FlushManager; |
| import org.apache.iotdb.db.engine.flush.MemTableFlushTask; |
| import org.apache.iotdb.db.engine.flush.NotifyFlushMemTable; |
| import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunk; |
| import org.apache.iotdb.db.engine.memtable.AlignedWritableMemChunkGroup; |
| import org.apache.iotdb.db.engine.memtable.IMemTable; |
| import org.apache.iotdb.db.engine.modification.Deletion; |
| import org.apache.iotdb.db.engine.modification.Modification; |
| import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; |
| import org.apache.iotdb.db.engine.storagegroup.DataRegion.UpdateEndTimeCallBack; |
| import org.apache.iotdb.db.exception.TsFileProcessorException; |
| import org.apache.iotdb.db.exception.WriteProcessException; |
| import org.apache.iotdb.db.exception.WriteProcessRejectException; |
| import org.apache.iotdb.db.exception.query.QueryProcessException; |
| import org.apache.iotdb.db.metadata.idtable.entry.DeviceIDFactory; |
| import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID; |
| import org.apache.iotdb.db.metadata.utils.ResourceByPathUtils; |
| import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode; |
| import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode; |
| import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode; |
| import org.apache.iotdb.db.qp.physical.crud.DeletePlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; |
| import org.apache.iotdb.db.query.context.QueryContext; |
| import org.apache.iotdb.db.rescon.MemTableManager; |
| import org.apache.iotdb.db.rescon.PrimitiveArrayManager; |
| import org.apache.iotdb.db.rescon.SystemInfo; |
| import org.apache.iotdb.db.sync.SyncService; |
| import org.apache.iotdb.db.sync.sender.manager.ISyncManager; |
| import org.apache.iotdb.db.utils.MemUtils; |
| import org.apache.iotdb.db.utils.datastructure.AlignedTVList; |
| import org.apache.iotdb.db.utils.datastructure.TVList; |
| import org.apache.iotdb.db.wal.WALManager; |
| import org.apache.iotdb.db.wal.node.IWALNode; |
| import org.apache.iotdb.db.wal.utils.listener.WALFlushListener; |
| import org.apache.iotdb.rpc.RpcUtils; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.utils.Binary; |
| import org.apache.iotdb.tsfile.utils.Pair; |
| import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentLinkedDeque; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| @SuppressWarnings("java:S1135") // ignore todos |
| public class TsFileProcessor { |
| |
| /** logger fot this class */ |
| private static final Logger logger = LoggerFactory.getLogger(TsFileProcessor.class); |
| |
| /** storgae group name of this tsfile */ |
| private final String storageGroupName; |
| |
| /** IoTDB config */ |
| private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); |
| |
| /** whether it's enable mem control */ |
| private final boolean enableMemControl = config.isEnableMemControl(); |
| |
| /** storage group info for mem control */ |
| private StorageGroupInfo storageGroupInfo; |
| /** tsfile processor info for mem control */ |
| private TsFileProcessorInfo tsFileProcessorInfo; |
| |
| /** sync this object in query() and asyncTryToFlush() */ |
| private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new ConcurrentLinkedDeque<>(); |
| |
| /** modification to memtable mapping */ |
| private List<Pair<Modification, IMemTable>> modsToMemtable = new ArrayList<>(); |
| |
| /** writer for restore tsfile and flushing */ |
| private RestorableTsFileIOWriter writer; |
| |
| /** tsfile resource for index this tsfile */ |
| private final TsFileResource tsFileResource; |
| |
| /** time range index to indicate this processor belongs to which time range */ |
| private long timeRangeId; |
| /** |
| * Whether the processor is in the queue of the FlushManager or being flushed by a flush thread. |
| */ |
| private volatile boolean managedByFlushManager; |
| |
| /** a lock to mutual exclude query and query */ |
| private final ReadWriteLock flushQueryLock = new ReentrantReadWriteLock(); |
| /** |
| * It is set by the StorageGroupProcessor and checked by flush threads. (If shouldClose == true |
| * and its flushingMemTables are all flushed, then the flush thread will close this file.) |
| */ |
| private volatile boolean shouldClose; |
| |
| /** working memtable */ |
| private IMemTable workMemTable; |
| |
| /** last flush time to flush the working memtable */ |
| private long lastWorkMemtableFlushTime; |
| |
| /** this callback is called before the workMemtable is added into the flushingMemTables. */ |
| private final UpdateEndTimeCallBack updateLatestFlushTimeCallback; |
| |
| /** wal node */ |
| private final IWALNode walNode; |
| |
| /** whether it's a sequence file or not */ |
| private final boolean sequence; |
| |
| /** total memtable size for mem control */ |
| private long totalMemTableSize; |
| |
| private static final String FLUSH_QUERY_WRITE_LOCKED = "{}: {} get flushQueryLock write lock"; |
| private static final String FLUSH_QUERY_WRITE_RELEASE = |
| "{}: {} get flushQueryLock write lock released"; |
| |
| /** close file listener */ |
| private List<CloseFileListener> closeFileListeners = new ArrayList<>(); |
| |
| /** flush file listener */ |
| private List<FlushListener> flushListeners = new ArrayList<>(); |
| |
| @SuppressWarnings("squid:S107") |
| TsFileProcessor( |
| String storageGroupName, |
| File tsfile, |
| StorageGroupInfo storageGroupInfo, |
| CloseFileListener closeTsFileCallback, |
| UpdateEndTimeCallBack updateLatestFlushTimeCallback, |
| boolean sequence) |
| throws IOException { |
| this.storageGroupName = storageGroupName; |
| this.tsFileResource = new TsFileResource(tsfile, this); |
| this.storageGroupInfo = storageGroupInfo; |
| this.writer = |
| new RestorableTsFileIOWriter( |
| tsfile, |
| (long) |
| (IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold() |
| * IoTDBDescriptor.getInstance() |
| .getConfig() |
| .getChunkMetadataSizeProportionInWrite())); |
| this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback; |
| this.sequence = sequence; |
| this.walNode = WALManager.getInstance().applyForWALNode(storageGroupName); |
| flushListeners.add(FlushListener.DefaultMemTableFLushListener.INSTANCE); |
| flushListeners.add(this.walNode); |
| closeFileListeners.add(closeTsFileCallback); |
| logger.info("create a new tsfile processor {}", tsfile.getAbsolutePath()); |
| } |
| |
| @SuppressWarnings("java:S107") // ignore number of arguments |
| public TsFileProcessor( |
| String storageGroupName, |
| StorageGroupInfo storageGroupInfo, |
| TsFileResource tsFileResource, |
| CloseFileListener closeUnsealedTsFileProcessor, |
| UpdateEndTimeCallBack updateLatestFlushTimeCallback, |
| boolean sequence, |
| RestorableTsFileIOWriter writer) { |
| this.storageGroupName = storageGroupName; |
| this.tsFileResource = tsFileResource; |
| this.storageGroupInfo = storageGroupInfo; |
| this.writer = writer; |
| this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback; |
| this.sequence = sequence; |
| this.walNode = WALManager.getInstance().applyForWALNode(storageGroupName); |
| flushListeners.add(FlushListener.DefaultMemTableFLushListener.INSTANCE); |
| flushListeners.add(this.walNode); |
| closeFileListeners.add(closeUnsealedTsFileProcessor); |
| logger.info("reopen a tsfile processor {}", tsFileResource.getTsFile()); |
| } |
| |
| /** |
| * insert data in an InsertRowPlan into the workingMemtable. |
| * |
| * @param insertRowPlan physical plan of insertion |
| */ |
| public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException { |
| |
| if (workMemTable == null) { |
| createNewWorkingMemTable(); |
| } |
| |
| long[] memIncrements = null; |
| if (enableMemControl) { |
| if (insertRowPlan.isAligned()) { |
| memIncrements = |
| checkAlignedMemCostAndAddToTspInfo( |
| insertRowPlan.getDevicePath().getFullPath(), insertRowPlan.getMeasurements(), |
| insertRowPlan.getDataTypes(), insertRowPlan.getValues()); |
| } else { |
| memIncrements = |
| checkMemCostAndAddToTspInfo( |
| insertRowPlan.getDevicePath().getFullPath(), insertRowPlan.getMeasurements(), |
| insertRowPlan.getDataTypes(), insertRowPlan.getValues()); |
| } |
| } |
| |
| try { |
| WALFlushListener walFlushListener = walNode.log(workMemTable.getMemTableId(), insertRowPlan); |
| if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) { |
| throw walFlushListener.getCause(); |
| } |
| } catch (Exception e) { |
| if (enableMemControl && memIncrements != null) { |
| rollbackMemoryInfo(memIncrements); |
| } |
| throw new WriteProcessException( |
| String.format( |
| "%s: %s write WAL failed", |
| storageGroupName, tsFileResource.getTsFile().getAbsolutePath()), |
| e); |
| } |
| |
| if (insertRowPlan.isAligned()) { |
| workMemTable.insertAlignedRow(insertRowPlan); |
| } else { |
| workMemTable.insert(insertRowPlan); |
| } |
| |
| // update start time of this memtable |
| tsFileResource.updateStartTime( |
| insertRowPlan.getDeviceID().toStringID(), insertRowPlan.getTime()); |
| // for sequence tsfile, we update the endTime only when the file is prepared to be closed. |
| // for unsequence tsfile, we have to update the endTime for each insertion. |
| if (!sequence) { |
| tsFileResource.updateEndTime( |
| insertRowPlan.getDeviceID().toStringID(), insertRowPlan.getTime()); |
| } |
| tsFileResource.updatePlanIndexes(insertRowPlan.getIndex()); |
| } |
| |
| /** |
| * insert data in an InsertRowNode into the workingMemtable. |
| * |
| * @param insertRowNode physical plan of insertion |
| */ |
| public void insert(InsertRowNode insertRowNode) throws WriteProcessException { |
| |
| if (workMemTable == null) { |
| createNewWorkingMemTable(); |
| } |
| |
| long[] memIncrements = null; |
| if (enableMemControl) { |
| if (insertRowNode.isAligned()) { |
| memIncrements = |
| checkAlignedMemCostAndAddToTspInfo( |
| insertRowNode.getDevicePath().getFullPath(), insertRowNode.getMeasurements(), |
| insertRowNode.getDataTypes(), insertRowNode.getValues()); |
| } else { |
| memIncrements = |
| checkMemCostAndAddToTspInfo( |
| insertRowNode.getDevicePath().getFullPath(), insertRowNode.getMeasurements(), |
| insertRowNode.getDataTypes(), insertRowNode.getValues()); |
| } |
| } |
| |
| try { |
| WALFlushListener walFlushListener = walNode.log(workMemTable.getMemTableId(), insertRowNode); |
| if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) { |
| throw walFlushListener.getCause(); |
| } |
| } catch (Exception e) { |
| if (enableMemControl && memIncrements != null) { |
| rollbackMemoryInfo(memIncrements); |
| } |
| throw new WriteProcessException( |
| String.format( |
| "%s: %s write WAL failed", |
| storageGroupName, tsFileResource.getTsFile().getAbsolutePath()), |
| e); |
| } |
| |
| if (insertRowNode.isAligned()) { |
| workMemTable.insertAlignedRow(insertRowNode); |
| } else { |
| workMemTable.insert(insertRowNode); |
| } |
| |
| // update start time of this memtable |
| tsFileResource.updateStartTime( |
| insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime()); |
| // for sequence tsfile, we update the endTime only when the file is prepared to be closed. |
| // for unsequence tsfile, we have to update the endTime for each insertion. |
| if (!sequence) { |
| tsFileResource.updateEndTime( |
| insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime()); |
| } |
| // tsFileResource.updatePlanIndexes(insertRowNode.getIndex()); |
| } |
| |
| /** |
| * insert batch data of insertTabletPlan into the workingMemtable. The rows to be inserted are in |
| * the range [start, end). Null value in each column values will be replaced by the subsequent |
| * non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5} |
| * |
| * @param insertTabletPlan insert a tablet of a device |
| * @param start start index of rows to be inserted in insertTabletPlan |
| * @param end end index of rows to be inserted in insertTabletPlan |
| * @param results result array |
| */ |
| public void insertTablet( |
| InsertTabletPlan insertTabletPlan, int start, int end, TSStatus[] results) |
| throws WriteProcessException { |
| |
| if (workMemTable == null) { |
| createNewWorkingMemTable(); |
| } |
| |
| long[] memIncrements = null; |
| try { |
| if (enableMemControl) { |
| if (insertTabletPlan.isAligned()) { |
| memIncrements = |
| checkAlignedMemCostAndAddToTsp( |
| insertTabletPlan.getDevicePath().getFullPath(), |
| insertTabletPlan.getMeasurements(), |
| insertTabletPlan.getDataTypes(), |
| insertTabletPlan.getColumns(), |
| start, |
| end); |
| } else { |
| memIncrements = |
| checkMemCostAndAddToTspInfo( |
| insertTabletPlan.getDevicePath().getFullPath(), |
| insertTabletPlan.getMeasurements(), |
| insertTabletPlan.getDataTypes(), |
| insertTabletPlan.getColumns(), |
| start, |
| end); |
| } |
| } |
| } catch (WriteProcessException e) { |
| for (int i = start; i < end; i++) { |
| results[i] = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, e.getMessage()); |
| } |
| throw new WriteProcessException(e); |
| } |
| |
| try { |
| WALFlushListener walFlushListener = |
| walNode.log(workMemTable.getMemTableId(), insertTabletPlan, start, end); |
| if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) { |
| throw walFlushListener.getCause(); |
| } |
| } catch (Exception e) { |
| for (int i = start; i < end; i++) { |
| results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| if (enableMemControl && memIncrements != null) { |
| rollbackMemoryInfo(memIncrements); |
| } |
| throw new WriteProcessException(e); |
| } |
| |
| try { |
| if (insertTabletPlan.isAligned()) { |
| workMemTable.insertAlignedTablet(insertTabletPlan, start, end); |
| } else { |
| workMemTable.insertTablet(insertTabletPlan, start, end); |
| } |
| } catch (WriteProcessException e) { |
| for (int i = start; i < end; i++) { |
| results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| throw new WriteProcessException(e); |
| } |
| |
| for (int i = start; i < end; i++) { |
| results[i] = RpcUtils.SUCCESS_STATUS; |
| } |
| tsFileResource.updateStartTime( |
| insertTabletPlan.getDeviceID().toStringID(), insertTabletPlan.getTimes()[start]); |
| |
| // for sequence tsfile, we update the endTime only when the file is prepared to be closed. |
| // for unsequence tsfile, we have to update the endTime for each insertion. |
| if (!sequence) { |
| tsFileResource.updateEndTime( |
| insertTabletPlan.getDeviceID().toStringID(), insertTabletPlan.getTimes()[end - 1]); |
| } |
| tsFileResource.updatePlanIndexes(insertTabletPlan.getIndex()); |
| } |
| |
| private void createNewWorkingMemTable() throws WriteProcessException { |
| workMemTable = MemTableManager.getInstance().getAvailableMemTable(storageGroupName); |
| walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath()); |
| } |
| |
| /** |
| * insert batch data of insertTabletPlan into the workingMemtable. The rows to be inserted are in |
| * the range [start, end). Null value in each column values will be replaced by the subsequent |
| * non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5} |
| * |
| * @param insertTabletNode insert a tablet of a device |
| * @param start start index of rows to be inserted in insertTabletPlan |
| * @param end end index of rows to be inserted in insertTabletPlan |
| * @param results result array |
| */ |
| public void insertTablet( |
| InsertTabletNode insertTabletNode, int start, int end, TSStatus[] results) |
| throws WriteProcessException { |
| |
| if (workMemTable == null) { |
| createNewWorkingMemTable(); |
| } |
| |
| long[] memIncrements = null; |
| try { |
| if (enableMemControl) { |
| if (insertTabletNode.isAligned()) { |
| memIncrements = |
| checkAlignedMemCostAndAddToTsp( |
| insertTabletNode.getDevicePath().getFullPath(), |
| insertTabletNode.getMeasurements(), |
| insertTabletNode.getDataTypes(), |
| insertTabletNode.getColumns(), |
| start, |
| end); |
| } else { |
| memIncrements = |
| checkMemCostAndAddToTspInfo( |
| insertTabletNode.getDevicePath().getFullPath(), |
| insertTabletNode.getMeasurements(), |
| insertTabletNode.getDataTypes(), |
| insertTabletNode.getColumns(), |
| start, |
| end); |
| } |
| } |
| } catch (WriteProcessException e) { |
| for (int i = start; i < end; i++) { |
| results[i] = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, e.getMessage()); |
| } |
| throw new WriteProcessException(e); |
| } |
| |
| try { |
| WALFlushListener walFlushListener = |
| walNode.log(workMemTable.getMemTableId(), insertTabletNode, start, end); |
| if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) { |
| throw walFlushListener.getCause(); |
| } |
| } catch (Exception e) { |
| for (int i = start; i < end; i++) { |
| results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| if (enableMemControl && memIncrements != null) { |
| rollbackMemoryInfo(memIncrements); |
| } |
| throw new WriteProcessException(e); |
| } |
| |
| try { |
| if (insertTabletNode.isAligned()) { |
| workMemTable.insertAlignedTablet(insertTabletNode, start, end); |
| } else { |
| workMemTable.insertTablet(insertTabletNode, start, end); |
| } |
| } catch (WriteProcessException e) { |
| for (int i = start; i < end; i++) { |
| results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| throw new WriteProcessException(e); |
| } |
| |
| for (int i = start; i < end; i++) { |
| results[i] = RpcUtils.SUCCESS_STATUS; |
| } |
| tsFileResource.updateStartTime( |
| insertTabletNode.getDeviceID().toStringID(), insertTabletNode.getTimes()[start]); |
| |
| // for sequence tsfile, we update the endTime only when the file is prepared to be closed. |
| // for unsequence tsfile, we have to update the endTime for each insertion. |
| if (!sequence) { |
| tsFileResource.updateEndTime( |
| insertTabletNode.getDeviceID().toStringID(), insertTabletNode.getTimes()[end - 1]); |
| } |
| // TODO: PlanIndex |
| tsFileResource.updatePlanIndexes(0); |
| // tsFileResource.updatePlanIndexes(insertTabletPlan.getIndex()); |
| } |
| |
| @SuppressWarnings("squid:S3776") // high Cognitive Complexity |
| private long[] checkMemCostAndAddToTspInfo( |
| String deviceId, String[] measurements, TSDataType[] dataTypes, Object[] values) |
| throws WriteProcessException { |
| // memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8 |
| long memTableIncrement = 0L; |
| long textDataIncrement = 0L; |
| long chunkMetadataIncrement = 0L; |
| // get device id |
| IDeviceID deviceID; |
| try { |
| deviceID = getDeviceID(deviceId); |
| } catch (IllegalPathException e) { |
| throw new WriteProcessException(e); |
| } |
| |
| for (int i = 0; i < dataTypes.length; i++) { |
| // skip failed Measurements |
| if (dataTypes[i] == null || measurements[i] == null) { |
| continue; |
| } |
| if (workMemTable.checkIfChunkDoesNotExist(deviceID, measurements[i])) { |
| // ChunkMetadataIncrement |
| chunkMetadataIncrement += ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]); |
| memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]); |
| } else { |
| // here currentChunkPointNum >= 1 |
| long currentChunkPointNum = workMemTable.getCurrentTVListSize(deviceID, measurements[i]); |
| memTableIncrement += |
| (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0 |
| ? TVList.tvListArrayMemCost(dataTypes[i]) |
| : 0; |
| } |
| // TEXT data mem size |
| if (dataTypes[i] == TSDataType.TEXT && values[i] != null) { |
| textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); |
| } |
| } |
| updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement); |
| return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement}; |
| } |
| |
| @SuppressWarnings("squid:S3776") // high Cognitive Complexity |
| private long[] checkAlignedMemCostAndAddToTspInfo( |
| String deviceId, String[] measurements, TSDataType[] dataTypes, Object[] values) |
| throws WriteProcessException { |
| // memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8 |
| long memTableIncrement = 0L; |
| long textDataIncrement = 0L; |
| long chunkMetadataIncrement = 0L; |
| AlignedWritableMemChunk alignedMemChunk = null; |
| // get device id |
| IDeviceID deviceID; |
| try { |
| deviceID = getDeviceID(deviceId); |
| } catch (IllegalPathException e) { |
| throw new WriteProcessException(e); |
| } |
| |
| if (workMemTable.checkIfChunkDoesNotExist(deviceID, AlignedPath.VECTOR_PLACEHOLDER)) { |
| // ChunkMetadataIncrement |
| chunkMetadataIncrement += |
| ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR) |
| * dataTypes.length; |
| memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes); |
| } else { |
| // here currentChunkPointNum >= 1 |
| long currentChunkPointNum = |
| workMemTable.getCurrentTVListSize(deviceID, AlignedPath.VECTOR_PLACEHOLDER); |
| memTableIncrement += |
| (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0 |
| ? AlignedTVList.alignedTvListArrayMemCost(dataTypes) |
| : 0; |
| alignedMemChunk = |
| ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceID)) |
| .getAlignedMemChunk(); |
| } |
| for (int i = 0; i < dataTypes.length; i++) { |
| // skip failed Measurements |
| if (dataTypes[i] == null || measurements[i] == null) { |
| continue; |
| } |
| // extending the column of aligned mem chunk |
| if (alignedMemChunk != null && !alignedMemChunk.containsMeasurement(measurements[i])) { |
| memTableIncrement += |
| (alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1) |
| * dataTypes[i].getDataTypeSize(); |
| } |
| // TEXT data mem size |
| if (dataTypes[i] == TSDataType.TEXT && values[i] != null) { |
| textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); |
| } |
| } |
| updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement); |
| return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement}; |
| } |
| |
| private long[] checkMemCostAndAddToTspInfo( |
| String deviceId, |
| String[] measurements, |
| TSDataType[] dataTypes, |
| Object[] columns, |
| int start, |
| int end) |
| throws WriteProcessException { |
| if (start >= end) { |
| return new long[] {0, 0, 0}; |
| } |
| long[] memIncrements = new long[3]; // memTable, text, chunk metadata |
| |
| // get device id |
| IDeviceID deviceID; |
| try { |
| deviceID = getDeviceID(deviceId); |
| } catch (IllegalPathException e) { |
| throw new WriteProcessException(e); |
| } |
| |
| for (int i = 0; i < dataTypes.length; i++) { |
| // skip failed Measurements |
| if (dataTypes[i] == null || columns[i] == null || measurements[i] == null) { |
| continue; |
| } |
| updateMemCost(dataTypes[i], measurements[i], deviceID, start, end, memIncrements, columns[i]); |
| } |
| long memTableIncrement = memIncrements[0]; |
| long textDataIncrement = memIncrements[1]; |
| long chunkMetadataIncrement = memIncrements[2]; |
| updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement); |
| return memIncrements; |
| } |
| |
| private long[] checkAlignedMemCostAndAddToTsp( |
| String deviceId, |
| String[] measurements, |
| TSDataType[] dataTypes, |
| Object[] columns, |
| int start, |
| int end) |
| throws WriteProcessException { |
| if (start >= end) { |
| return new long[] {0, 0, 0}; |
| } |
| long[] memIncrements = new long[3]; // memTable, text, chunk metadata |
| |
| // get device id |
| IDeviceID deviceID; |
| try { |
| deviceID = getDeviceID(deviceId); |
| } catch (IllegalPathException e) { |
| throw new WriteProcessException(e); |
| } |
| |
| updateAlignedMemCost(dataTypes, deviceID, measurements, start, end, memIncrements, columns); |
| long memTableIncrement = memIncrements[0]; |
| long textDataIncrement = memIncrements[1]; |
| long chunkMetadataIncrement = memIncrements[2]; |
| updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement); |
| return memIncrements; |
| } |
| |
| private void updateMemCost( |
| TSDataType dataType, |
| String measurement, |
| IDeviceID deviceId, |
| int start, |
| int end, |
| long[] memIncrements, |
| Object column) { |
| // memIncrements = [memTable, text, chunk metadata] respectively |
| |
| if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurement)) { |
| // ChunkMetadataIncrement |
| memIncrements[2] += ChunkMetadata.calculateRamSize(measurement, dataType); |
| memIncrements[0] += |
| ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1) |
| * TVList.tvListArrayMemCost(dataType); |
| } else { |
| long currentChunkPointNum = workMemTable.getCurrentTVListSize(deviceId, measurement); |
| if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) { |
| memIncrements[0] += |
| ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1) |
| * TVList.tvListArrayMemCost(dataType); |
| } else { |
| long acquireArray = |
| (end - start - 1 + (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE)) |
| / PrimitiveArrayManager.ARRAY_SIZE; |
| memIncrements[0] += |
| acquireArray == 0 ? 0 : acquireArray * TVList.tvListArrayMemCost(dataType); |
| } |
| } |
| // TEXT data size |
| if (dataType == TSDataType.TEXT) { |
| Binary[] binColumn = (Binary[]) column; |
| memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, end); |
| } |
| } |
| |
| private void updateAlignedMemCost( |
| TSDataType[] dataTypes, |
| IDeviceID deviceId, |
| String[] measurementIds, |
| int start, |
| int end, |
| long[] memIncrements, |
| Object[] columns) { |
| AlignedWritableMemChunk vectorMemChunk = null; |
| // memIncrements = [memTable, text, chunk metadata] respectively |
| if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) { |
| // ChunkMetadataIncrement |
| memIncrements[2] += |
| dataTypes.length |
| * ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR); |
| memIncrements[0] += |
| ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1) |
| * AlignedTVList.alignedTvListArrayMemCost(dataTypes); |
| } else { |
| int currentChunkPointNum = |
| (int) workMemTable.getCurrentTVListSize(deviceId, AlignedPath.VECTOR_PLACEHOLDER); |
| if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) { |
| memIncrements[0] += |
| ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1) |
| * AlignedTVList.alignedTvListArrayMemCost(dataTypes); |
| } else { |
| int acquireArray = |
| (end - start - 1 + (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE)) |
| / PrimitiveArrayManager.ARRAY_SIZE; |
| memIncrements[0] += |
| acquireArray == 0 |
| ? 0 |
| : acquireArray * AlignedTVList.alignedTvListArrayMemCost(dataTypes); |
| } |
| vectorMemChunk = |
| ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId)) |
| .getAlignedMemChunk(); |
| } |
| for (int i = 0; i < dataTypes.length; i++) { |
| TSDataType dataType = dataTypes[i]; |
| String measurement = measurementIds[i]; |
| Object column = columns[i]; |
| if (dataType == null || column == null || measurement == null) { |
| continue; |
| } |
| // extending the column of aligned mem chunk |
| if (vectorMemChunk != null && !vectorMemChunk.containsMeasurement(measurementIds[i])) { |
| memIncrements[0] += |
| (vectorMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1) |
| * dataType.getDataTypeSize(); |
| } |
| // TEXT data size |
| if (dataType == TSDataType.TEXT) { |
| Binary[] binColumn = (Binary[]) columns[i]; |
| memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, end); |
| } |
| } |
| } |
| |
| private void updateMemoryInfo( |
| long memTableIncrement, long chunkMetadataIncrement, long textDataIncrement) |
| throws WriteProcessException { |
| memTableIncrement += textDataIncrement; |
| storageGroupInfo.addStorageGroupMemCost(memTableIncrement); |
| tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement); |
| if (storageGroupInfo.needToReportToSystem()) { |
| try { |
| if (!SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo, this)) { |
| StorageEngine.blockInsertionIfReject(this); |
| } |
| } catch (WriteProcessRejectException e) { |
| storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement); |
| tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement); |
| SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo); |
| throw e; |
| } |
| } |
| workMemTable.addTVListRamCost(memTableIncrement); |
| workMemTable.addTextDataSize(textDataIncrement); |
| } |
| |
| private void rollbackMemoryInfo(long[] memIncrements) { |
| long memTableIncrement = memIncrements[0]; |
| long textDataIncrement = memIncrements[1]; |
| long chunkMetadataIncrement = memIncrements[2]; |
| |
| memTableIncrement += textDataIncrement; |
| storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement); |
| tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement); |
| SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo); |
| workMemTable.releaseTVListRamCost(memTableIncrement); |
| workMemTable.releaseTextDataSize(textDataIncrement); |
| } |
| |
| /** |
| * Delete data which belongs to the timeseries `deviceId.measurementId` and the timestamp of which |
| * <= 'timestamp' in the deletion. <br> |
| * |
| * <p>Delete data in both working MemTable and flushing MemTables. |
| */ |
| public void deleteDataInMemory(Deletion deletion, Set<PartialPath> devicePaths) { |
| flushQueryLock.writeLock().lock(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName()); |
| } |
| try { |
| if (workMemTable != null) { |
| for (PartialPath device : devicePaths) { |
| workMemTable.delete( |
| deletion.getPath(), device, deletion.getStartTime(), deletion.getEndTime()); |
| } |
| } |
| // flushing memTables are immutable, only record this deletion in these memTables for query |
| if (!flushingMemTables.isEmpty()) { |
| modsToMemtable.add(new Pair<>(deletion, flushingMemTables.getLast())); |
| } |
| for (ISyncManager syncManager : |
| SyncService.getInstance() |
| .getOrCreateSyncManager(storageGroupInfo.getDataRegion().getDataRegionId())) { |
| syncManager.syncRealTimeDeletion(deletion); |
| } |
| } finally { |
| flushQueryLock.writeLock().unlock(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| FLUSH_QUERY_WRITE_RELEASE, storageGroupName, tsFileResource.getTsFile().getName()); |
| } |
| } |
| } |
| |
| WALFlushListener logDeleteInWAL(DeletePlan deletePlan) { |
| return walNode.log(workMemTable.getMemTableId(), deletePlan); |
| } |
| |
| WALFlushListener logDeleteDataNodeInWAL(DeleteDataNode deleteDataNode) { |
| return walNode.log(workMemTable.getMemTableId(), deleteDataNode); |
| } |
| |
| public TsFileResource getTsFileResource() { |
| return tsFileResource; |
| } |
| |
| public boolean shouldFlush() { |
| if (workMemTable == null) { |
| return false; |
| } |
| if (workMemTable.shouldFlush()) { |
| logger.info( |
| "The memtable size {} of tsfile {} reaches the mem control threshold", |
| workMemTable.memSize(), |
| tsFileResource.getTsFile().getAbsolutePath()); |
| return true; |
| } |
| if (!enableMemControl && workMemTable.memSize() >= getMemtableSizeThresholdBasedOnSeriesNum()) { |
| logger.info( |
| "The memtable size {} of tsfile {} reaches the threshold", |
| workMemTable.memSize(), |
| tsFileResource.getTsFile().getAbsolutePath()); |
| return true; |
| } |
| if (workMemTable.reachTotalPointNumThreshold()) { |
| logger.info( |
| "The avg series points num {} of tsfile {} reaches the threshold", |
| workMemTable.getTotalPointsNum() / workMemTable.getSeriesNumber(), |
| tsFileResource.getTsFile().getAbsolutePath()); |
| return true; |
| } |
| return false; |
| } |
| |
| private long getMemtableSizeThresholdBasedOnSeriesNum() { |
| return config.getMemtableSizeThreshold(); |
| } |
| |
| public boolean shouldClose() { |
| long fileSize = tsFileResource.getTsFileSize(); |
| long fileSizeThreshold = sequence ? config.getSeqTsFileSize() : config.getUnSeqTsFileSize(); |
| |
| if (fileSize >= fileSizeThreshold) { |
| logger.info( |
| "{} fileSize {} >= fileSizeThreshold {}", |
| tsFileResource.getTsFilePath(), |
| fileSize, |
| fileSizeThreshold); |
| } |
| return fileSize >= fileSizeThreshold; |
| } |
| |
| void syncClose() { |
| logger.info( |
| "Sync close file: {}, will firstly async close it", |
| tsFileResource.getTsFile().getAbsolutePath()); |
| if (shouldClose) { |
| return; |
| } |
| synchronized (flushingMemTables) { |
| try { |
| asyncClose(); |
| logger.info("Start to wait until file {} is closed", tsFileResource); |
| long startTime = System.currentTimeMillis(); |
| while (!flushingMemTables.isEmpty()) { |
| flushingMemTables.wait(60_000); |
| if (System.currentTimeMillis() - startTime > 60_000 && !flushingMemTables.isEmpty()) { |
| logger.warn( |
| "{} has spent {}s for waiting flushing one memtable; {} left (first: {}). FlushingManager info: {}", |
| this.tsFileResource.getTsFile().getAbsolutePath(), |
| (System.currentTimeMillis() - startTime) / 1000, |
| flushingMemTables.size(), |
| flushingMemTables.getFirst(), |
| FlushManager.getInstance()); |
| } |
| } |
| } catch (InterruptedException e) { |
| logger.error( |
| "{}: {} wait close interrupted", |
| storageGroupName, |
| tsFileResource.getTsFile().getName(), |
| e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| logger.info("File {} is closed synchronously", tsFileResource.getTsFile().getAbsolutePath()); |
| } |
| |
| /** async close one tsfile, register and close it by another thread */ |
| void asyncClose() { |
| flushQueryLock.writeLock().lock(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName()); |
| } |
| try { |
| |
| if (logger.isInfoEnabled()) { |
| if (workMemTable != null) { |
| logger.info( |
| "{}: flush a working memtable in async close tsfile {}, memtable size: {}, tsfile " |
| + "size: {}, plan index: [{}, {}]", |
| storageGroupName, |
| tsFileResource.getTsFile().getAbsolutePath(), |
| workMemTable.memSize(), |
| tsFileResource.getTsFileSize(), |
| workMemTable.getMinPlanIndex(), |
| workMemTable.getMaxPlanIndex()); |
| } else { |
| logger.info( |
| "{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}", |
| storageGroupName, |
| tsFileResource.getTsFile().getAbsolutePath(), |
| tsFileResource.getTsFileSize()); |
| } |
| } |
| |
| if (shouldClose) { |
| return; |
| } |
| // when a flush thread serves this TsFileProcessor (because the processor is submitted by |
| // registerTsFileProcessor()), the thread will seal the corresponding TsFile and |
| // execute other cleanup works if "shouldClose == true and flushingMemTables is empty". |
| |
| // To ensure there must be a flush thread serving this processor after the field `shouldClose` |
| // is set true, we need to generate a NotifyFlushMemTable as a signal task and submit it to |
| // the FlushManager. |
| |
| // we have to add the memtable into flushingList first and then set the shouldClose tag. |
| // see https://issues.apache.org/jira/browse/IOTDB-510 |
| IMemTable tmpMemTable = |
| workMemTable == null || workMemTable.memSize() == 0 |
| ? new NotifyFlushMemTable() |
| : workMemTable; |
| |
| try { |
| // When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke |
| // flushing memTable in System module. |
| addAMemtableIntoFlushingList(tmpMemTable); |
| for (ISyncManager syncManager : |
| SyncService.getInstance() |
| .getOrCreateSyncManager(storageGroupInfo.getDataRegion().getDataRegionId())) { |
| syncManager.syncRealTimeTsFile(tsFileResource.getTsFile()); |
| } |
| logger.info("Memtable {} has been added to flushing list", tmpMemTable); |
| shouldClose = true; |
| } catch (Exception e) { |
| logger.error( |
| "{}: {} async close failed, because", |
| storageGroupName, |
| tsFileResource.getTsFile().getName(), |
| e); |
| } |
| } finally { |
| flushQueryLock.writeLock().unlock(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| FLUSH_QUERY_WRITE_RELEASE, storageGroupName, tsFileResource.getTsFile().getName()); |
| } |
| } |
| } |
| |
| /** |
| * TODO if the flushing thread is too fast, the tmpMemTable.wait() may never wakeup Tips: I am |
| * trying to solve this issue by checking whether the table exist before wait() |
| */ |
| public void syncFlush() throws IOException { |
| IMemTable tmpMemTable; |
| flushQueryLock.writeLock().lock(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName()); |
| } |
| try { |
| tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable; |
| if (logger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) { |
| logger.debug( |
| "{}: {} add a signal memtable into flushing memtable list when sync flush", |
| storageGroupName, |
| tsFileResource.getTsFile().getName()); |
| } |
| addAMemtableIntoFlushingList(tmpMemTable); |
| } finally { |
| flushQueryLock.writeLock().unlock(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| FLUSH_QUERY_WRITE_RELEASE, storageGroupName, tsFileResource.getTsFile().getName()); |
| } |
| } |
| |
| synchronized (tmpMemTable) { |
| try { |
| long startWait = System.currentTimeMillis(); |
| while (flushingMemTables.contains(tmpMemTable)) { |
| tmpMemTable.wait(1000); |
| |
| if ((System.currentTimeMillis() - startWait) > 60_000) { |
| logger.warn( |
| "has waited for synced flushing a memtable in {} for 60 seconds.", |
| this.tsFileResource.getTsFile().getAbsolutePath()); |
| startWait = System.currentTimeMillis(); |
| } |
| } |
| } catch (InterruptedException e) { |
| logger.error( |
| "{}: {} wait flush finished meets error", |
| storageGroupName, |
| tsFileResource.getTsFile().getName(), |
| e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| /** put the working memtable into flushing list and set the working memtable to null */ |
| public void asyncFlush() { |
| flushQueryLock.writeLock().lock(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName()); |
| } |
| try { |
| if (workMemTable == null) { |
| return; |
| } |
| logger.info( |
| "Async flush a memtable to tsfile: {}", tsFileResource.getTsFile().getAbsolutePath()); |
| addAMemtableIntoFlushingList(workMemTable); |
| } catch (Exception e) { |
| logger.error( |
| "{}: {} add a memtable into flushing list failed", |
| storageGroupName, |
| tsFileResource.getTsFile().getName(), |
| e); |
| } finally { |
| flushQueryLock.writeLock().unlock(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| FLUSH_QUERY_WRITE_RELEASE, storageGroupName, tsFileResource.getTsFile().getName()); |
| } |
| } |
| } |
| |
| /** |
| * this method calls updateLatestFlushTimeCallback and move the given memtable into the flushing |
| * queue, set the current working memtable as null and then register the tsfileProcessor into the |
| * flushManager again. |
| */ |
| private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException { |
| if (!tobeFlushed.isSignalMemTable() |
| && (!updateLatestFlushTimeCallback.call(this) || tobeFlushed.memSize() == 0)) { |
| logger.warn( |
| "This normal memtable is empty, skip it in flush. {}: {} Memetable info: {}", |
| storageGroupName, |
| tsFileResource.getTsFile().getName(), |
| tobeFlushed.getMemTableMap()); |
| return; |
| } |
| |
| for (FlushListener flushListener : flushListeners) { |
| flushListener.onMemTableFlushStarted(tobeFlushed); |
| } |
| |
| if (enableMemControl) { |
| SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost()); |
| } |
| flushingMemTables.addLast(tobeFlushed); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: {} Memtable (signal = {}) is added into the flushing Memtable, queue size = {}", |
| storageGroupName, |
| tsFileResource.getTsFile().getName(), |
| tobeFlushed.isSignalMemTable(), |
| flushingMemTables.size()); |
| } |
| |
| if (!tobeFlushed.isSignalMemTable()) { |
| totalMemTableSize += tobeFlushed.memSize(); |
| } |
| workMemTable = null; |
| lastWorkMemtableFlushTime = System.currentTimeMillis(); |
| FlushManager.getInstance().registerTsFileProcessor(this); |
| } |
| |
| /** put back the memtable to MemTablePool and make metadata in writer visible */ |
| private void releaseFlushedMemTable(IMemTable memTable) { |
| flushQueryLock.writeLock().lock(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName()); |
| } |
| try { |
| writer.makeMetadataVisible(); |
| if (!flushingMemTables.remove(memTable)) { |
| logger.warn( |
| "{}: {} put the memtable (signal={}) out of flushingMemtables but it is not in the queue.", |
| storageGroupName, |
| tsFileResource.getTsFile().getName(), |
| memTable.isSignalMemTable()); |
| } else if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: {} memtable (signal={}) is removed from the queue. {} left.", |
| storageGroupName, |
| tsFileResource.getTsFile().getName(), |
| memTable.isSignalMemTable(), |
| flushingMemTables.size()); |
| } |
| memTable.release(); |
| MemTableManager.getInstance().decreaseMemtableNumber(); |
| if (enableMemControl) { |
| // reset the mem cost in StorageGroupProcessorInfo |
| storageGroupInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost()); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "[mem control] {}: {} flush finished, try to reset system memcost, " |
| + "flushing memtable list size: {}", |
| storageGroupName, |
| tsFileResource.getTsFile().getName(), |
| flushingMemTables.size()); |
| } |
| // report to System |
| SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo); |
| SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost()); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: {} flush finished, remove a memtable from flushing list, " |
| + "flushing memtable list size: {}", |
| storageGroupName, |
| tsFileResource.getTsFile().getName(), |
| flushingMemTables.size()); |
| } |
| } catch (Exception e) { |
| logger.error("{}: {}", storageGroupName, tsFileResource.getTsFile().getName(), e); |
| } finally { |
| flushQueryLock.writeLock().unlock(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| FLUSH_QUERY_WRITE_RELEASE, storageGroupName, tsFileResource.getTsFile().getName()); |
| } |
| } |
| } |
| |
| /** This method will synchronize the memTable and release its flushing resources */ |
| private void syncReleaseFlushedMemTable(IMemTable memTable) { |
| synchronized (memTable) { |
| releaseFlushedMemTable(memTable); |
| memTable.notifyAll(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: {} released a memtable (signal={}), flushingMemtables size ={}", |
| storageGroupName, |
| tsFileResource.getTsFile().getName(), |
| memTable.isSignalMemTable(), |
| flushingMemTables.size()); |
| } |
| } |
| } |
| |
| /** |
| * Take the first MemTable from the flushingMemTables and flush it. Called by a flush thread of |
| * the flush manager pool |
| */ |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| public void flushOneMemTable() { |
| IMemTable memTableToFlush = flushingMemTables.getFirst(); |
| |
| // signal memtable only may appear when calling asyncClose() |
| if (!memTableToFlush.isSignalMemTable()) { |
| try { |
| writer.mark(); |
| MemTableFlushTask flushTask = |
| new MemTableFlushTask(memTableToFlush, writer, storageGroupName); |
| flushTask.syncFlushMemTable(); |
| } catch (Throwable e) { |
| if (writer == null) { |
| logger.info( |
| "{}: {} is closed during flush, abandon flush task", |
| storageGroupName, |
| tsFileResource.getTsFile().getAbsolutePath()); |
| synchronized (flushingMemTables) { |
| flushingMemTables.notifyAll(); |
| } |
| } else { |
| logger.error( |
| "{}: {} meet error when flushing a memtable, change system mode to error", |
| storageGroupName, |
| tsFileResource.getTsFile().getAbsolutePath(), |
| e); |
| CommonDescriptor.getInstance().getConfig().handleUnrecoverableError(); |
| try { |
| logger.error( |
| "{}: {} IOTask meets error, truncate the corrupted data", |
| storageGroupName, |
| tsFileResource.getTsFile().getAbsolutePath(), |
| e); |
| writer.reset(); |
| } catch (IOException e1) { |
| logger.error( |
| "{}: {} Truncate corrupted data meets error", |
| storageGroupName, |
| tsFileResource.getTsFile().getAbsolutePath(), |
| e1); |
| } |
| // release resource |
| try { |
| syncReleaseFlushedMemTable(memTableToFlush); |
| // make sure no query will search this file |
| tsFileResource.setTimeIndex(config.getTimeIndexLevel().getTimeIndex()); |
| // this callback method will register this empty tsfile into TsFileManager |
| for (CloseFileListener closeFileListener : closeFileListeners) { |
| closeFileListener.onClosed(this); |
| } |
| // close writer |
| writer.close(); |
| writer = null; |
| synchronized (flushingMemTables) { |
| flushingMemTables.notifyAll(); |
| } |
| } catch (Exception e1) { |
| logger.error( |
| "{}: {} Release resource meets error", |
| storageGroupName, |
| tsFileResource.getTsFile().getAbsolutePath(), |
| e1); |
| } |
| return; |
| } |
| } |
| } |
| |
| for (FlushListener flushListener : flushListeners) { |
| flushListener.onMemTableFlushed(memTableToFlush); |
| } |
| |
| try { |
| flushQueryLock.writeLock().lock(); |
| Iterator<Pair<Modification, IMemTable>> iterator = modsToMemtable.iterator(); |
| while (iterator.hasNext()) { |
| Pair<Modification, IMemTable> entry = iterator.next(); |
| if (entry.right.equals(memTableToFlush)) { |
| entry.left.setFileOffset(tsFileResource.getTsFileSize()); |
| this.tsFileResource.getModFile().write(entry.left); |
| tsFileResource.getModFile().close(); |
| iterator.remove(); |
| logger.info( |
| "[Deletion] Deletion with path: {}, time:{}-{} written when flush memtable", |
| entry.left.getPath(), |
| ((Deletion) (entry.left)).getStartTime(), |
| ((Deletion) (entry.left)).getEndTime()); |
| } |
| } |
| } catch (IOException e) { |
| logger.error( |
| "Meet error when writing into ModificationFile file of {} ", |
| tsFileResource.getTsFile().getAbsolutePath(), |
| e); |
| } finally { |
| flushQueryLock.writeLock().unlock(); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: {} try get lock to release a memtable (signal={})", |
| storageGroupName, |
| tsFileResource.getTsFile().getAbsolutePath(), |
| memTableToFlush.isSignalMemTable()); |
| } |
| |
| // for sync flush |
| syncReleaseFlushedMemTable(memTableToFlush); |
| |
| // retry to avoid unnecessary read-only mode |
| int retryCnt = 0; |
| while (shouldClose && flushingMemTables.isEmpty() && writer != null) { |
| try { |
| writer.mark(); |
| updateCompressionRatio(memTableToFlush); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: {} flushingMemtables is empty and will close the file", |
| storageGroupName, |
| tsFileResource.getTsFile().getAbsolutePath()); |
| } |
| endFile(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{} flushingMemtables is clear", storageGroupName); |
| } |
| } catch (Exception e) { |
| logger.error( |
| "{}: {} marking or ending file meet error", |
| storageGroupName, |
| tsFileResource.getTsFile().getAbsolutePath(), |
| e); |
| // truncate broken metadata |
| try { |
| writer.reset(); |
| } catch (IOException e1) { |
| logger.error( |
| "{}: {} truncate corrupted data meets error", |
| storageGroupName, |
| tsFileResource.getTsFile().getAbsolutePath(), |
| e1); |
| } |
| // retry or set read-only |
| if (retryCnt < 3) { |
| logger.warn( |
| "{} meet error when flush FileMetadata to {}, retry it again", |
| storageGroupName, |
| tsFileResource.getTsFile().getAbsolutePath(), |
| e); |
| retryCnt++; |
| continue; |
| } else { |
| logger.error( |
| "{} meet error when flush FileMetadata to {}, change system mode to error", |
| storageGroupName, |
| tsFileResource.getTsFile().getAbsolutePath(), |
| e); |
| CommonDescriptor.getInstance().getConfig().handleUnrecoverableError(); |
| break; |
| } |
| } |
| // for sync close |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: {} try to get flushingMemtables lock.", |
| storageGroupName, |
| tsFileResource.getTsFile().getAbsolutePath()); |
| } |
| synchronized (flushingMemTables) { |
| flushingMemTables.notifyAll(); |
| } |
| } |
| } |
| |
| private void updateCompressionRatio(IMemTable memTableToFlush) { |
| try { |
| double compressionRatio = ((double) totalMemTableSize) / writer.getPos(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "The compression ratio of tsfile {} is {}, totalMemTableSize: {}, the file size: {}", |
| writer.getFile().getAbsolutePath(), |
| compressionRatio, |
| totalMemTableSize, |
| writer.getPos()); |
| } |
| if (compressionRatio == 0 && !memTableToFlush.isSignalMemTable()) { |
| logger.error( |
| "{} The compression ratio of tsfile {} is 0, totalMemTableSize: {}, the file size: {}", |
| storageGroupName, |
| writer.getFile().getAbsolutePath(), |
| totalMemTableSize, |
| writer.getPos()); |
| } |
| CompressionRatio.getInstance().updateRatio(compressionRatio); |
| } catch (IOException e) { |
| logger.error( |
| "{}: {} update compression ratio failed", |
| storageGroupName, |
| tsFileResource.getTsFile().getName(), |
| e); |
| } |
| } |
| |
| /** end file and write some meta */ |
| private void endFile() throws IOException, TsFileProcessorException { |
| logger.info("Start to end file {}", tsFileResource); |
| long closeStartTime = System.currentTimeMillis(); |
| writer.endFile(); |
| tsFileResource.serialize(); |
| for (ISyncManager syncManager : |
| SyncService.getInstance() |
| .getOrCreateSyncManager(storageGroupInfo.getDataRegion().getDataRegionId())) { |
| syncManager.syncRealTimeResource(tsFileResource.getTsFile()); |
| } |
| logger.info("Ended file {}", tsFileResource); |
| |
| // remove this processor from Closing list in StorageGroupProcessor, |
| // mark the TsFileResource closed, no need writer anymore |
| for (CloseFileListener closeFileListener : closeFileListeners) { |
| closeFileListener.onClosed(this); |
| } |
| |
| if (enableMemControl) { |
| tsFileProcessorInfo.clear(); |
| storageGroupInfo.closeTsFileProcessorAndReportToSystem(this); |
| } |
| if (logger.isInfoEnabled()) { |
| long closeEndTime = System.currentTimeMillis(); |
| logger.info( |
| "Storage group {} close the file {}, TsFile size is {}, " |
| + "time consumption of flushing metadata is {}ms", |
| storageGroupName, |
| tsFileResource.getTsFile().getAbsoluteFile(), |
| writer.getFile().length(), |
| closeEndTime - closeStartTime); |
| } |
| |
| writer = null; |
| } |
| |
| public boolean isManagedByFlushManager() { |
| return managedByFlushManager; |
| } |
| |
| public void setManagedByFlushManager(boolean managedByFlushManager) { |
| this.managedByFlushManager = managedByFlushManager; |
| } |
| |
| /** sync method */ |
| public boolean isMemtableNotNull() { |
| flushQueryLock.writeLock().lock(); |
| try { |
| return workMemTable != null; |
| } finally { |
| flushQueryLock.writeLock().unlock(); |
| } |
| } |
| |
| /** close this tsfile */ |
| public void close() throws TsFileProcessorException { |
| try { |
| // when closing resource file, its corresponding mod file is also closed. |
| tsFileResource.close(); |
| } catch (IOException e) { |
| throw new TsFileProcessorException(e); |
| } |
| } |
| |
| public int getFlushingMemTableSize() { |
| return flushingMemTables.size(); |
| } |
| |
| RestorableTsFileIOWriter getWriter() { |
| return writer; |
| } |
| |
| public String getStorageGroupName() { |
| return storageGroupName; |
| } |
| |
| /** |
| * get the chunk(s) in the memtable (one from work memtable and the other ones in flushing |
| * memtables and then compact them into one TimeValuePairSorter). Then get the related |
| * ChunkMetadata of data on disk. |
| * |
| * @param seriesPaths selected paths |
| */ |
| public void query( |
| List<PartialPath> seriesPaths, |
| QueryContext context, |
| List<TsFileResource> tsfileResourcesForQuery) |
| throws IOException { |
| Map<PartialPath, List<IChunkMetadata>> pathToChunkMetadataListMap = new HashMap<>(); |
| Map<PartialPath, List<ReadOnlyMemChunk>> pathToReadOnlyMemChunkMap = new HashMap<>(); |
| |
| flushQueryLock.readLock().lock(); |
| try { |
| for (PartialPath seriesPath : seriesPaths) { |
| List<ReadOnlyMemChunk> readOnlyMemChunks = new ArrayList<>(); |
| for (IMemTable flushingMemTable : flushingMemTables) { |
| if (flushingMemTable.isSignalMemTable()) { |
| continue; |
| } |
| ReadOnlyMemChunk memChunk = |
| flushingMemTable.query(seriesPath, context.getQueryTimeLowerBound(), modsToMemtable); |
| if (memChunk != null) { |
| readOnlyMemChunks.add(memChunk); |
| } |
| } |
| if (workMemTable != null) { |
| ReadOnlyMemChunk memChunk = |
| workMemTable.query(seriesPath, context.getQueryTimeLowerBound(), null); |
| if (memChunk != null) { |
| readOnlyMemChunks.add(memChunk); |
| } |
| } |
| |
| List<IChunkMetadata> chunkMetadataList = |
| ResourceByPathUtils.getResourceInstance(seriesPath) |
| .getVisibleMetadataListFromWriter(writer, tsFileResource, context); |
| |
| // get in memory data |
| if (!readOnlyMemChunks.isEmpty() || !chunkMetadataList.isEmpty()) { |
| pathToReadOnlyMemChunkMap.put(seriesPath, readOnlyMemChunks); |
| pathToChunkMetadataListMap.put(seriesPath, chunkMetadataList); |
| } |
| } |
| } catch (QueryProcessException | MetadataException e) { |
| logger.error( |
| "{}: {} get ReadOnlyMemChunk has error", |
| storageGroupName, |
| tsFileResource.getTsFile().getName(), |
| e); |
| } finally { |
| flushQueryLock.readLock().unlock(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: {} release flushQueryLock", |
| storageGroupName, |
| tsFileResource.getTsFile().getName()); |
| } |
| } |
| |
| if (!pathToReadOnlyMemChunkMap.isEmpty() || !pathToChunkMetadataListMap.isEmpty()) { |
| tsfileResourcesForQuery.add( |
| new TsFileResource( |
| pathToReadOnlyMemChunkMap, pathToChunkMetadataListMap, tsFileResource)); |
| } |
| } |
| |
| public long getTimeRangeId() { |
| return timeRangeId; |
| } |
| |
| public void setTimeRangeId(long timeRangeId) { |
| this.timeRangeId = timeRangeId; |
| } |
| |
| /** release resource of a memtable */ |
| public void putMemTableBackAndClose() throws TsFileProcessorException { |
| if (workMemTable != null) { |
| workMemTable.release(); |
| workMemTable = null; |
| } |
| try { |
| writer.close(); |
| } catch (IOException e) { |
| throw new TsFileProcessorException(e); |
| } |
| } |
| |
| public TsFileProcessorInfo getTsFileProcessorInfo() { |
| return tsFileProcessorInfo; |
| } |
| |
| public void setTsFileProcessorInfo(TsFileProcessorInfo tsFileProcessorInfo) { |
| this.tsFileProcessorInfo = tsFileProcessorInfo; |
| } |
| |
| public long getWorkMemTableRamCost() { |
| return workMemTable != null ? workMemTable.getTVListsRamCost() : 0; |
| } |
| |
| /** Return Long.MAX_VALUE if workMemTable is null */ |
| public long getWorkMemTableCreatedTime() { |
| return workMemTable != null ? workMemTable.getCreatedTime() : Long.MAX_VALUE; |
| } |
| |
| public long getLastWorkMemtableFlushTime() { |
| return lastWorkMemtableFlushTime; |
| } |
| |
| public boolean isSequence() { |
| return sequence; |
| } |
| |
| public void setWorkMemTableShouldFlush() { |
| workMemTable.setShouldFlush(); |
| } |
| |
| public void addFlushListener(FlushListener listener) { |
| flushListeners.add(listener); |
| } |
| |
| public void addCloseFileListener(CloseFileListener listener) { |
| closeFileListeners.add(listener); |
| } |
| |
| public void addFlushListeners(Collection<FlushListener> listeners) { |
| flushListeners.addAll(listeners); |
| } |
| |
| public void addCloseFileListeners(Collection<CloseFileListener> listeners) { |
| closeFileListeners.addAll(listeners); |
| } |
| |
| public void submitAFlushTask() { |
| this.storageGroupInfo.getDataRegion().submitAFlushTaskWhenShouldFlush(this); |
| } |
| |
| public boolean alreadyMarkedClosing() { |
| return shouldClose; |
| } |
| |
| private IDeviceID getDeviceID(String deviceId) throws IllegalPathException { |
| try { |
| return DeviceIDFactory.getInstance().getDeviceID(new PartialPath(deviceId)); |
| } catch (IllegalPathException e) { |
| logger.error("device id is illegal"); |
| throw e; |
| } |
| } |
| |
| @TestOnly |
| public IMemTable getWorkMemTable() { |
| return workMemTable; |
| } |
| |
| @TestOnly |
| public ConcurrentLinkedDeque<IMemTable> getFlushingMemTable() { |
| return flushingMemTables; |
| } |
| } |