| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.storageengine.dataregion; |
| |
| import org.apache.iotdb.common.rpc.thrift.TSStatus; |
| import org.apache.iotdb.commons.cluster.NodeStatus; |
| import org.apache.iotdb.commons.conf.CommonDescriptor; |
| import org.apache.iotdb.commons.consensus.DataRegionId; |
| import org.apache.iotdb.commons.exception.MetadataException; |
| import org.apache.iotdb.commons.file.SystemFileFactory; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.commons.schema.SchemaConstant; |
| import org.apache.iotdb.commons.service.metric.MetricService; |
| import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; |
| import org.apache.iotdb.commons.utils.CommonDateTimeUtils; |
| import org.apache.iotdb.commons.utils.TestOnly; |
| import org.apache.iotdb.commons.utils.TimePartitionUtils; |
| import org.apache.iotdb.consensus.ConsensusFactory; |
| import org.apache.iotdb.db.conf.IoTDBConfig; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.exception.BatchProcessException; |
| import org.apache.iotdb.db.exception.DataRegionException; |
| import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; |
| import org.apache.iotdb.db.exception.LoadFileException; |
| import org.apache.iotdb.db.exception.TsFileProcessorException; |
| import org.apache.iotdb.db.exception.WriteProcessException; |
| import org.apache.iotdb.db.exception.WriteProcessRejectException; |
| import org.apache.iotdb.db.exception.query.OutOfTTLException; |
| import org.apache.iotdb.db.exception.query.QueryProcessException; |
| import org.apache.iotdb.db.exception.quota.ExceedQuotaException; |
| import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener; |
| import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; |
| import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet; |
| import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; |
| import org.apache.iotdb.db.service.SettleService; |
| import org.apache.iotdb.db.service.metrics.CompactionMetrics; |
| import org.apache.iotdb.db.service.metrics.FileMetrics; |
| import org.apache.iotdb.db.service.metrics.WritingMetrics; |
| import org.apache.iotdb.db.storageengine.StorageEngine; |
| import org.apache.iotdb.db.storageengine.buffer.BloomFilterCache; |
| import org.apache.iotdb.db.storageengine.buffer.ChunkCache; |
| import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.recover.CompactionRecoverManager; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleSummary; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; |
| import org.apache.iotdb.db.storageengine.dataregion.flush.CloseFileListener; |
| import org.apache.iotdb.db.storageengine.dataregion.flush.FlushListener; |
| import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus; |
| import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; |
| import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessorInfo; |
| import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; |
| import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; |
| import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; |
| import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.VersionController; |
| import org.apache.iotdb.db.storageengine.dataregion.utils.validate.TsFileValidator; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.recover.file.SealedTsFileRecoverPerformer; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.recover.file.UnsealedTsFileRecoverPerformer; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener; |
| import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener; |
| import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; |
| import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionInfo; |
| import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionManager; |
| import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager; |
| import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager; |
| import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool; |
| import org.apache.iotdb.db.utils.DateTimeUtils; |
| import org.apache.iotdb.rpc.RpcUtils; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.IDeviceID; |
| import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID; |
| import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; |
| import org.apache.iotdb.tsfile.fileSystem.FSType; |
| import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; |
| import org.apache.iotdb.tsfile.read.filter.basic.Filter; |
| import org.apache.iotdb.tsfile.utils.FSUtils; |
| import org.apache.iotdb.tsfile.utils.Pair; |
| import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; |
| import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.Phaser; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.stream.Collectors; |
| |
| import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR; |
| import static org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.SEQUENCE_TSFILE; |
| import static org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.UNSEQUENCE_TSFILE; |
| import static org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource.BROKEN_SUFFIX; |
| import static org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource.RESOURCE_SUFFIX; |
| import static org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource.TEMP_SUFFIX; |
| import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; |
| |
| /** |
| * For sequence data, a {@link DataRegion} has some {@link TsFileProcessor}s, in which there is only |
| * one {@link TsFileProcessor} in the working status. <br> |
| * |
| * <p>There are two situations to set the working {@link TsFileProcessor} to closing status:<br> |
| * |
| * <p>(1) when inserting data into the {@link TsFileProcessor}, and the {@link TsFileProcessor} |
| * shouldFlush() (or shouldClose())<br> |
| * |
| * <p>(2) someone calls syncCloseAllWorkingTsFileProcessors(). (up to now, only flush command from |
| * cli will call this method)<br> |
| * |
| * <p>UnSequence data has the similar process as above. |
| * |
| * <p>When a sequence {@link TsFileProcessor} is submitted to be flushed, the |
| * updateLatestFlushTimeCallback() method will be called as a callback.<br> |
| * |
| * <p>When a {@link TsFileProcessor} is closed, the closeUnsealedTsFileProcessorCallBack() method |
| * will be called as a callback. |
| */ |
| public class DataRegion implements IDataRegionForQuery { |
| |
| private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); |
| private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG"); |
| |
| /** |
| * All newly generated chunks after merge have version number 0, so we set merged Modification |
| * file version to 1 to take effect. |
| */ |
| private static final int MERGE_MOD_START_VERSION_NUM = 1; |
| |
| private static final Logger logger = LoggerFactory.getLogger(DataRegion.class); |
| |
| /** |
| * A read write lock for guaranteeing concurrent safety when accessing all fields in this class |
| * (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor, |
| * closing(Un)SequenceTsFileProcessor, latestTimeForEachDevice, and |
| * partitionLatestFlushedTimeForEachDevice) |
| */ |
| private final ReadWriteLock insertLock = new ReentrantReadWriteLock(); |
| /** Condition to safely delete data region. */ |
| private final Condition deletedCondition = insertLock.writeLock().newCondition(); |
| /** Data region has been deleted or not. */ |
| private volatile boolean deleted = false; |
| /** closeStorageGroupCondition is used to wait for all currently closing TsFiles to be done. */ |
| private final Object closeStorageGroupCondition = new Object(); |
| /** |
| * Avoid some tsfileResource is changed (e.g., from unsealed to sealed) when a read is executed. |
| */ |
| private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock(); |
| /** time partition id in the database -> {@link TsFileProcessor} for this time partition. */ |
| private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>(); |
| /** time partition id in the database -> {@link TsFileProcessor} for this time partition. */ |
| private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>(); |
| |
| /** sequence {@link TsFileProcessor}s which are closing. */ |
| private final Set<TsFileProcessor> closingSequenceTsFileProcessor = ConcurrentHashMap.newKeySet(); |
| |
| /** unsequence {@link TsFileProcessor}s which are closing. */ |
| private final Set<TsFileProcessor> closingUnSequenceTsFileProcessor = |
| ConcurrentHashMap.newKeySet(); |
| |
| /** data region id. */ |
| private final String dataRegionId; |
| /** database name. */ |
| private final String databaseName; |
| /** database system directory. */ |
| private File storageGroupSysDir; |
| /** manage seqFileList and unSeqFileList. */ |
| private final TsFileManager tsFileManager; |
| |
| /** manage tsFileResource degrade. */ |
| private final TsFileResourceManager tsFileResourceManager = TsFileResourceManager.getInstance(); |
| |
| /** |
| * time partition id -> version controller which assigns a version for each MemTable and |
| * deletion/update such that after they are persisted, the order of insertions, deletions and |
| * updates can be re-determined. Will be empty if there are not MemTables in memory. |
| */ |
| private final HashMap<Long, VersionController> timePartitionIdVersionControllerMap = |
| new HashMap<>(); |
| /** |
| * When the data in a database is older than dataTTL, it is considered invalid and will be |
| * eventually removed. |
| */ |
| private long dataTTL = Long.MAX_VALUE; |
| /** File system factory (local or hdfs). */ |
| private final FSFactory fsFactory = FSFactoryProducer.getFSFactory(); |
| /** File flush policy. */ |
| private TsFileFlushPolicy fileFlushPolicy; |
| /** |
| * The max file versions in each partition. By recording this, if several IoTDB instances have the |
| * same policy of closing file and their ingestion is identical, then files of the same version in |
| * different IoTDB instance will have identical data, providing convenience for data comparison |
| * across different instances. partition number -> max version number |
| */ |
| private Map<Long, Long> partitionMaxFileVersions = new ConcurrentHashMap<>(); |
| /** database info for mem control. */ |
| private final DataRegionInfo dataRegionInfo = new DataRegionInfo(this); |
| /** whether it's ready from recovery. */ |
| private boolean isReady = false; |
| /** close file listeners. */ |
| private List<CloseFileListener> customCloseFileListeners = Collections.emptyList(); |
| /** flush listeners. */ |
| private List<FlushListener> customFlushListeners = Collections.emptyList(); |
| |
| private ILastFlushTimeMap lastFlushTimeMap; |
| |
| /** |
| * Record the insertWriteLock in SG is being hold by which method, it will be empty string if no |
| * one holds the insertWriteLock. |
| */ |
| private String insertWriteLockHolder = ""; |
| |
| private final AtomicBoolean isCompactionSelecting = new AtomicBoolean(false); |
| |
| private static final QueryResourceMetricSet QUERY_RESOURCE_METRIC_SET = |
| QueryResourceMetricSet.getInstance(); |
| |
| private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS = |
| PerformanceOverviewMetrics.getInstance(); |
| |
| /** |
| * Construct a database processor. |
| * |
| * @param systemDir system dir path |
| * @param dataRegionId data region id e.g. 1 |
| * @param fileFlushPolicy file flush policy |
| * @param databaseName database name e.g. root.sg1 |
| */ |
| public DataRegion( |
| String systemDir, String dataRegionId, TsFileFlushPolicy fileFlushPolicy, String databaseName) |
| throws DataRegionException { |
| this.dataRegionId = dataRegionId; |
| this.databaseName = databaseName; |
| this.fileFlushPolicy = fileFlushPolicy; |
| |
| storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, dataRegionId); |
| this.tsFileManager = |
| new TsFileManager(databaseName, dataRegionId, storageGroupSysDir.getPath()); |
| if (storageGroupSysDir.mkdirs()) { |
| logger.info( |
| "Database system Directory {} doesn't exist, create it", storageGroupSysDir.getPath()); |
| } else if (!storageGroupSysDir.exists()) { |
| logger.error("create database system Directory {} failed", storageGroupSysDir.getPath()); |
| } |
| |
| lastFlushTimeMap = new HashLastFlushTimeMap(); |
| |
| // recover tsfiles unless consensus protocol is ratis and storage storageengine is not ready |
| if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS) |
| && !StorageEngine.getInstance().isAllSgReady()) { |
| logger.debug( |
| "Skip recovering data region {}[{}] when consensus protocol is ratis and storage engine is not ready.", |
| databaseName, |
| dataRegionId); |
| for (String fileFolder : TierManager.getInstance().getAllFilesFolders()) { |
| File dataRegionFolder = |
| fsFactory.getFile(fileFolder, databaseName + File.separator + dataRegionId); |
| try { |
| fsFactory.deleteDirectory(dataRegionFolder.getPath()); |
| } catch (IOException e) { |
| logger.error( |
| "Exception occurs when deleting data region folder for {}-{}", |
| databaseName, |
| dataRegionId, |
| e); |
| } |
| if (FSUtils.getFSType(dataRegionFolder) == FSType.LOCAL) { |
| dataRegionFolder.mkdirs(); |
| } |
| } |
| } else { |
| recover(); |
| } |
| |
| MetricService.getInstance().addMetricSet(new DataRegionMetrics(this)); |
| } |
| |
| @TestOnly |
| public DataRegion(String databaseName, String id) { |
| this.databaseName = databaseName; |
| this.dataRegionId = id; |
| this.tsFileManager = new TsFileManager(databaseName, id, ""); |
| this.partitionMaxFileVersions = new HashMap<>(); |
| partitionMaxFileVersions.put(0L, 0L); |
| } |
| |
| @Override |
| public String getDatabaseName() { |
| return databaseName; |
| } |
| |
| public boolean isReady() { |
| return isReady; |
| } |
| |
| public void setReady(boolean ready) { |
| isReady = ready; |
| } |
| |
| private Map<Long, List<TsFileResource>> splitResourcesByPartition( |
| List<TsFileResource> resources) { |
| Map<Long, List<TsFileResource>> ret = new TreeMap<>(); |
| for (TsFileResource resource : resources) { |
| ret.computeIfAbsent(resource.getTimePartition(), l -> new ArrayList<>()).add(resource); |
| } |
| return ret; |
| } |
| |
| /** this class is used to store recovering context. */ |
| private class DataRegionRecoveryContext { |
| /** number of files to be recovered. */ |
| private final long numOfFilesToRecover; |
| |
| /** number of already recovered files. */ |
| private long recoveredFilesNum; |
| /** last recovery log time. */ |
| private long lastLogTime; |
| |
| /** recover performers of unsealed TsFiles. */ |
| private final List<UnsealedTsFileRecoverPerformer> recoverPerformers = new ArrayList<>(); |
| |
| public DataRegionRecoveryContext(long numOfFilesToRecover) { |
| this.numOfFilesToRecover = numOfFilesToRecover; |
| this.recoveredFilesNum = 0; |
| this.lastLogTime = System.currentTimeMillis(); |
| } |
| |
| public void incrementRecoveredFilesNum() { |
| recoveredFilesNum++; |
| if (recoveredFilesNum < numOfFilesToRecover) { |
| if (System.currentTimeMillis() - lastLogTime > config.getRecoveryLogIntervalInMs()) { |
| logger.info( |
| "The TsFiles of data region {}[{}] has recovered {}/{}.", |
| databaseName, |
| dataRegionId, |
| recoveredFilesNum, |
| numOfFilesToRecover); |
| lastLogTime = System.currentTimeMillis(); |
| } |
| } else { |
| logger.info( |
| "The TsFiles of data region {}[{}] has recovered" + " {}/{}.", |
| databaseName, |
| dataRegionId, |
| numOfFilesToRecover, |
| numOfFilesToRecover); |
| } |
| } |
| } |
| |
| /** recover from file */ |
| @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive Complexity warning |
| private void recover() throws DataRegionException { |
| try { |
| recoverCompaction(); |
| } catch (Exception e) { |
| // signal wal recover manager to recover this region's files |
| WALRecoverManager.getInstance() |
| .getAllDataRegionScannedLatch() |
| .countDownWithException(e.getMessage()); |
| throw new DataRegionException(e); |
| } |
| |
| try { |
| // collect candidate TsFiles from sequential and unsequential data directory |
| List<TsFileResource> tmpSeqTsFiles = |
| getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders()); |
| List<TsFileResource> tmpUnseqTsFiles = |
| getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders()); |
| |
| // split by partition so that we can find the last file of each partition and decide to |
| // close it or not |
| DataRegionRecoveryContext dataRegionRecoveryContext = |
| new DataRegionRecoveryContext((long) tmpSeqTsFiles.size() + tmpUnseqTsFiles.size()); |
| Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles = |
| splitResourcesByPartition(tmpSeqTsFiles); |
| Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles = |
| splitResourcesByPartition(tmpUnseqTsFiles); |
| // submit unsealed TsFiles to recover |
| List<WALRecoverListener> recoverListeners = new ArrayList<>(); |
| for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) { |
| // tsFiles without resource file are unsealed |
| for (TsFileResource resource : value) { |
| if (resource.resourceFileExists()) { |
| FileMetrics.getInstance() |
| .addTsFile( |
| resource.getDatabaseName(), |
| resource.getDataRegionId(), |
| resource.getTsFile().length(), |
| true, |
| resource.getTsFile().getName()); |
| if (resource.getModFile().exists()) { |
| FileMetrics.getInstance().increaseModFileNum(1); |
| FileMetrics.getInstance().increaseModFileSize(resource.getModFile().getSize()); |
| } |
| } |
| } |
| while (!value.isEmpty()) { |
| TsFileResource tsFileResource = value.get(value.size() - 1); |
| if (tsFileResource.resourceFileExists()) { |
| break; |
| } else { |
| value.remove(value.size() - 1); |
| WALRecoverListener recoverListener = |
| recoverUnsealedTsFile(tsFileResource, dataRegionRecoveryContext, true); |
| if (recoverListener != null) { |
| recoverListeners.add(recoverListener); |
| } |
| } |
| } |
| } |
| for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) { |
| // tsFiles without resource file are unsealed |
| for (TsFileResource resource : value) { |
| if (resource.resourceFileExists()) { |
| FileMetrics.getInstance() |
| .addTsFile( |
| resource.getDatabaseName(), |
| resource.getDataRegionId(), |
| resource.getTsFile().length(), |
| false, |
| resource.getTsFile().getName()); |
| } |
| if (resource.getModFile().exists()) { |
| FileMetrics.getInstance().increaseModFileNum(1); |
| FileMetrics.getInstance().increaseModFileSize(resource.getModFile().getSize()); |
| } |
| } |
| while (!value.isEmpty()) { |
| TsFileResource tsFileResource = value.get(value.size() - 1); |
| if (tsFileResource.resourceFileExists()) { |
| break; |
| } else { |
| value.remove(value.size() - 1); |
| WALRecoverListener recoverListener = |
| recoverUnsealedTsFile(tsFileResource, dataRegionRecoveryContext, false); |
| if (recoverListener != null) { |
| recoverListeners.add(recoverListener); |
| } |
| } |
| } |
| } |
| // signal wal recover manager to recover this region's files |
| WALRecoverManager.getInstance().getAllDataRegionScannedLatch().countDown(); |
| // recover sealed TsFiles |
| if (!partitionTmpSeqTsFiles.isEmpty() || !partitionTmpUnseqTsFiles.isEmpty()) { |
| long latestPartitionId = Long.MIN_VALUE; |
| if (!partitionTmpSeqTsFiles.isEmpty()) { |
| latestPartitionId = |
| ((TreeMap<Long, List<TsFileResource>>) partitionTmpSeqTsFiles).lastKey(); |
| } |
| if (!partitionTmpUnseqTsFiles.isEmpty()) { |
| latestPartitionId = |
| Math.max( |
| latestPartitionId, |
| ((TreeMap<Long, List<TsFileResource>>) partitionTmpUnseqTsFiles).lastKey()); |
| } |
| for (Entry<Long, List<TsFileResource>> partitionFiles : partitionTmpSeqTsFiles.entrySet()) { |
| recoverFilesInPartition( |
| partitionFiles.getKey(), dataRegionRecoveryContext, partitionFiles.getValue(), true); |
| } |
| for (Entry<Long, List<TsFileResource>> partitionFiles : |
| partitionTmpUnseqTsFiles.entrySet()) { |
| recoverFilesInPartition( |
| partitionFiles.getKey(), dataRegionRecoveryContext, partitionFiles.getValue(), false); |
| } |
| if (config.isEnableSeparateData()) { |
| TimePartitionManager.getInstance() |
| .registerTimePartitionInfo( |
| new TimePartitionInfo( |
| new DataRegionId(Integer.parseInt(dataRegionId)), |
| latestPartitionId, |
| false, |
| Long.MAX_VALUE, |
| lastFlushTimeMap.getMemSize(latestPartitionId))); |
| } |
| } |
| // wait until all unsealed TsFiles have been recovered |
| for (WALRecoverListener recoverListener : recoverListeners) { |
| if (recoverListener.waitForResult() == WALRecoverListener.Status.FAILURE) { |
| logger.error( |
| "Fail to recover unsealed TsFile {}, skip it.", |
| recoverListener.getFilePath(), |
| recoverListener.getCause()); |
| } |
| // update VSGRecoveryContext |
| dataRegionRecoveryContext.incrementRecoveredFilesNum(); |
| } |
| // recover unsealed TsFiles, sort make sure last flush time not be replaced by early files |
| dataRegionRecoveryContext.recoverPerformers.sort( |
| (p1, p2) -> |
| compareFileName( |
| p1.getTsFileResource().getTsFile(), p2.getTsFileResource().getTsFile())); |
| for (UnsealedTsFileRecoverPerformer recoverPerformer : |
| dataRegionRecoveryContext.recoverPerformers) { |
| recoverUnsealedTsFileCallBack(recoverPerformer); |
| } |
| for (TsFileResource resource : tsFileManager.getTsFileList(true)) { |
| long partitionNum = resource.getTimePartition(); |
| updatePartitionFileVersion(partitionNum, resource.getVersion()); |
| } |
| for (TsFileResource resource : tsFileManager.getTsFileList(false)) { |
| long partitionNum = resource.getTimePartition(); |
| updatePartitionFileVersion(partitionNum, resource.getVersion()); |
| } |
| } catch (IOException e) { |
| // signal wal recover manager to recover this region's files |
| WALRecoverManager.getInstance() |
| .getAllDataRegionScannedLatch() |
| .countDownWithException(e.getMessage()); |
| throw new DataRegionException(e); |
| } |
| |
| initCompactionSchedule(); |
| |
| if (StorageEngine.getInstance().isAllSgReady()) { |
| logger.info("The data region {}[{}] is created successfully", databaseName, dataRegionId); |
| } else { |
| logger.info("The data region {}[{}] is recovered successfully", databaseName, dataRegionId); |
| } |
| } |
| |
| private void updateLastFlushTime(TsFileResource resource, boolean isSeq) { |
| long timePartitionId = resource.getTimePartition(); |
| Map<IDeviceID, Long> endTimeMap = new HashMap<>(); |
| for (IDeviceID deviceId : resource.getDevices()) { |
| long endTime = resource.getEndTime(deviceId); |
| endTimeMap.put(deviceId, endTime); |
| } |
| if (config.isEnableSeparateData()) { |
| lastFlushTimeMap.updateMultiDeviceFlushedTime(timePartitionId, endTimeMap); |
| } |
| if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) { |
| lastFlushTimeMap.updateMultiDeviceGlobalFlushedTime(endTimeMap); |
| } |
| } |
| |
| public void initCompactionSchedule() { |
| if (!config.isEnableSeqSpaceCompaction() |
| && !config.isEnableUnseqSpaceCompaction() |
| && !config.isEnableCrossSpaceCompaction()) { |
| return; |
| } |
| CompactionScheduleTaskManager.getInstance().registerDataRegion(this); |
| } |
| |
| private void recoverCompaction() { |
| CompactionRecoverManager compactionRecoverManager = |
| new CompactionRecoverManager(tsFileManager, databaseName, dataRegionId); |
| compactionRecoverManager.recoverInnerSpaceCompaction(true); |
| compactionRecoverManager.recoverInnerSpaceCompaction(false); |
| compactionRecoverManager.recoverCrossSpaceCompaction(); |
| } |
| |
| public void updatePartitionFileVersion(long partitionNum, long fileVersion) { |
| partitionMaxFileVersions.compute( |
| partitionNum, |
| (key, oldVersion) -> |
| (oldVersion == null || fileVersion > oldVersion) ? fileVersion : oldVersion); |
| } |
| |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| private List<TsFileResource> getAllFiles(List<String> folders) |
| throws IOException, DataRegionException { |
| // "{partition id}/{tsfile name}" -> tsfile file, remove duplicate files in one time partition |
| Map<String, File> tsFilePartitionPath2File = new HashMap<>(); |
| for (String baseDir : folders) { |
| File fileFolder = fsFactory.getFile(baseDir + File.separator + databaseName, dataRegionId); |
| if (!fileFolder.exists()) { |
| continue; |
| } |
| // some TsFileResource may be being persisted when the system crashed, try recovering such |
| // resources |
| continueFailedRenames(fileFolder, TEMP_SUFFIX); |
| |
| File[] subFiles = fileFolder.listFiles(); |
| if (subFiles != null) { |
| for (File partitionFolder : subFiles) { |
| if (!partitionFolder.isDirectory()) { |
| logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath()); |
| } else { |
| // some TsFileResource may be being persisted when the system crashed, try recovering |
| // such resources |
| continueFailedRenames(partitionFolder, TEMP_SUFFIX); |
| String partitionName = partitionFolder.getName(); |
| File[] tsFilesInThisFolder = |
| fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX); |
| for (File f : tsFilesInThisFolder) { |
| String tsFilePartitionPath = partitionName + File.separator + f.getName(); |
| tsFilePartitionPath2File.put(tsFilePartitionPath, f); |
| } |
| } |
| } |
| } |
| } |
| |
| List<File> sortedFiles = new ArrayList<>(tsFilePartitionPath2File.values()); |
| sortedFiles.sort(this::compareFileName); |
| |
| long currentTime = System.currentTimeMillis(); |
| List<TsFileResource> ret = new ArrayList<>(); |
| for (File f : sortedFiles) { |
| checkTsFileTime(f, currentTime); |
| ret.add(new TsFileResource(f)); |
| } |
| return ret; |
| } |
| |
| private void continueFailedRenames(File fileFolder, String suffix) throws IOException { |
| File[] files = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), suffix); |
| if (files != null) { |
| for (File tempResource : files) { |
| File originResource = fsFactory.getFile(tempResource.getPath().replace(suffix, "")); |
| if (originResource.exists()) { |
| Files.delete(tempResource.toPath()); |
| } else { |
| Files.move(tempResource.toPath(), originResource.toPath()); |
| } |
| } |
| } |
| } |
| |
| /** check if the tsfile's time is smaller than system current time. */ |
| private void checkTsFileTime(File tsFile, long currentTime) throws DataRegionException { |
| String[] items = tsFile.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR); |
| long fileTime = Long.parseLong(items[0]); |
| if (fileTime > currentTime) { |
| throw new DataRegionException( |
| String.format( |
| "data region %s[%s] is down, because the time of tsfile %s is larger than system current time, " |
| + "file time is %d while system current time is %d, please check it.", |
| databaseName, dataRegionId, tsFile.getAbsolutePath(), fileTime, currentTime)); |
| } |
| } |
| |
| /** submit unsealed TsFile to WALRecoverManager. */ |
| private WALRecoverListener recoverUnsealedTsFile( |
| TsFileResource unsealedTsFile, DataRegionRecoveryContext context, boolean isSeq) { |
| UnsealedTsFileRecoverPerformer recoverPerformer = |
| new UnsealedTsFileRecoverPerformer(unsealedTsFile, isSeq, context.recoverPerformers::add); |
| // remember to close UnsealedTsFileRecoverPerformer |
| return WALRecoverManager.getInstance().addRecoverPerformer(recoverPerformer); |
| } |
| |
| private void recoverUnsealedTsFileCallBack(UnsealedTsFileRecoverPerformer recoverPerformer) { |
| try { |
| TsFileResource tsFileResource = recoverPerformer.getTsFileResource(); |
| boolean isSeq = recoverPerformer.isSequence(); |
| if (!recoverPerformer.canWrite()) { |
| // cannot write, just close it |
| try { |
| tsFileResource.close(); |
| } catch (IOException e) { |
| logger.error("Fail to close TsFile {} when recovering", tsFileResource.getTsFile(), e); |
| } |
| if (!TsFileValidator.getInstance().validateTsFile(tsFileResource)) { |
| tsFileResource.remove(); |
| return; |
| } |
| updateLastFlushTime(tsFileResource, isSeq); |
| tsFileResourceManager.registerSealedTsFileResource(tsFileResource); |
| FileMetrics.getInstance() |
| .addTsFile( |
| tsFileResource.getDatabaseName(), |
| tsFileResource.getDataRegionId(), |
| tsFileResource.getTsFile().length(), |
| recoverPerformer.isSequence(), |
| tsFileResource.getTsFile().getName()); |
| } else { |
| // the last file is not closed, continue writing to it |
| RestorableTsFileIOWriter writer = recoverPerformer.getWriter(); |
| long timePartitionId = tsFileResource.getTimePartition(); |
| TimePartitionManager.getInstance() |
| .updateAfterOpeningTsFileProcessor( |
| new DataRegionId(Integer.parseInt(dataRegionId)), timePartitionId); |
| TsFileProcessor tsFileProcessor = |
| new TsFileProcessor( |
| dataRegionId, |
| dataRegionInfo, |
| tsFileResource, |
| this::closeUnsealedTsFileProcessorCallBack, |
| this::flushCallback, |
| isSeq, |
| writer); |
| if (workSequenceTsFileProcessors.get(tsFileProcessor.getTimeRangeId()) == null |
| && workUnsequenceTsFileProcessors.get(tsFileProcessor.getTimeRangeId()) == null) { |
| WritingMetrics.getInstance().recordActiveTimePartitionCount(1); |
| } |
| if (isSeq) { |
| workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor); |
| } else { |
| workUnsequenceTsFileProcessors.put(timePartitionId, tsFileProcessor); |
| } |
| tsFileResource.setProcessor(tsFileProcessor); |
| tsFileResource.removeResourceFile(); |
| tsFileProcessor.setTimeRangeId(timePartitionId); |
| writer.makeMetadataVisible(); |
| TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo); |
| tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); |
| this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor); |
| // get chunkMetadata size |
| long chunkMetadataSize = 0; |
| for (Map<String, List<ChunkMetadata>> metaMap : writer.getMetadatasForQuery().values()) { |
| for (List<ChunkMetadata> metadatas : metaMap.values()) { |
| for (ChunkMetadata chunkMetadata : metadatas) { |
| chunkMetadataSize += chunkMetadata.getRetainedSizeInBytes(); |
| } |
| } |
| } |
| tsFileProcessorInfo.addTSPMemCost(chunkMetadataSize); |
| } |
| tsFileManager.add(tsFileResource, recoverPerformer.isSequence()); |
| } catch (Throwable e) { |
| logger.error( |
| "Fail to recover unsealed TsFile {}, skip it.", |
| recoverPerformer.getTsFileAbsolutePath(), |
| e); |
| } |
| } |
| |
| /** recover sealed TsFile. */ |
| private void recoverSealedTsFiles( |
| TsFileResource sealedTsFile, DataRegionRecoveryContext context, boolean isSeq) { |
| try (SealedTsFileRecoverPerformer recoverPerformer = |
| new SealedTsFileRecoverPerformer(sealedTsFile)) { |
| recoverPerformer.recover(); |
| sealedTsFile.close(); |
| tsFileManager.add(sealedTsFile, isSeq); |
| tsFileResourceManager.registerSealedTsFileResource(sealedTsFile); |
| } catch (Throwable e) { |
| logger.error("Fail to recover sealed TsFile {}, skip it.", sealedTsFile.getTsFilePath(), e); |
| } finally { |
| // update recovery context |
| context.incrementRecoveredFilesNum(); |
| } |
| } |
| |
| private void recoverFilesInPartition( |
| long partitionId, |
| DataRegionRecoveryContext context, |
| List<TsFileResource> resourceList, |
| boolean isSeq) { |
| for (TsFileResource tsFileResource : resourceList) { |
| recoverSealedTsFiles(tsFileResource, context, isSeq); |
| } |
| if (config.isEnableSeparateData()) { |
| if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(partitionId)) { |
| TimePartitionManager.getInstance() |
| .registerTimePartitionInfo( |
| new TimePartitionInfo( |
| new DataRegionId(Integer.parseInt(dataRegionId)), |
| partitionId, |
| false, |
| Long.MAX_VALUE, |
| lastFlushTimeMap.getMemSize(partitionId))); |
| } |
| for (TsFileResource tsFileResource : resourceList) { |
| updateLastFlushTime(tsFileResource, isSeq); |
| } |
| TimePartitionManager.getInstance() |
| .updateAfterFlushing( |
| new DataRegionId(Integer.parseInt(dataRegionId)), |
| partitionId, |
| System.currentTimeMillis(), |
| lastFlushTimeMap.getMemSize(partitionId), |
| false); |
| } |
| } |
| |
| // ({systemTime}-{versionNum}-{mergeNum}.tsfile) |
| private int compareFileName(File o1, File o2) { |
| String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR); |
| String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR); |
| long ver1 = Long.parseLong(items1[0]); |
| long ver2 = Long.parseLong(items2[0]); |
| int cmp = Long.compare(ver1, ver2); |
| if (cmp == 0) { |
| return Long.compare(Long.parseLong(items1[1]), Long.parseLong(items2[1])); |
| } else { |
| return cmp; |
| } |
| } |
| |
| /** |
| * insert one row of data. |
| * |
| * @param insertRowNode one row of data |
| */ |
| public void insert(InsertRowNode insertRowNode) throws WriteProcessException { |
| // reject insertions that are out of ttl |
| if (!isAlive(insertRowNode.getTime())) { |
| throw new OutOfTTLException( |
| insertRowNode.getTime(), (CommonDateTimeUtils.currentTime() - dataTTL)); |
| } |
| StorageEngine.blockInsertionIfReject(null); |
| long startTime = System.nanoTime(); |
| writeLock("InsertRow"); |
| PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime); |
| try { |
| if (deleted) { |
| return; |
| } |
| // init map |
| long timePartitionId = TimePartitionUtils.getTimePartitionId(insertRowNode.getTime()); |
| |
| if (config.isEnableSeparateData() |
| && !lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) { |
| TimePartitionManager.getInstance() |
| .registerTimePartitionInfo( |
| new TimePartitionInfo( |
| new DataRegionId(Integer.parseInt(dataRegionId)), |
| timePartitionId, |
| true, |
| Long.MAX_VALUE, |
| 0)); |
| } |
| |
| boolean isSequence = |
| config.isEnableSeparateData() |
| && insertRowNode.getTime() |
| > lastFlushTimeMap.getFlushedTime(timePartitionId, insertRowNode.getDeviceID()); |
| |
| Map<TsFileProcessor, Boolean> tsFileProcessorMapForFlushing = new HashMap<>(); |
| |
| // insert to sequence or unSequence file |
| insertToTsFileProcessor( |
| insertRowNode, isSequence, timePartitionId, tsFileProcessorMapForFlushing); |
| |
| // check memtable size and may asyncTryToFlush the work memtable |
| for (Map.Entry<TsFileProcessor, Boolean> entry : tsFileProcessorMapForFlushing.entrySet()) { |
| if (entry.getKey().shouldFlush()) { |
| fileFlushPolicy.apply(this, entry.getKey(), entry.getValue()); |
| } |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /** |
| * Insert a tablet (rows belonging to the same devices) into this database. |
| * |
| * @throws BatchProcessException if some of the rows failed to be inserted |
| */ |
| @SuppressWarnings({"squid:S3776", "squid:S6541"}) // Suppress high Cognitive Complexity warning |
| public void insertTablet(InsertTabletNode insertTabletNode) |
| throws BatchProcessException, WriteProcessException { |
| StorageEngine.blockInsertionIfReject(null); |
| long startTime = System.nanoTime(); |
| writeLock("insertTablet"); |
| PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime); |
| try { |
| if (deleted) { |
| return; |
| } |
| TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()]; |
| Arrays.fill(results, RpcUtils.SUCCESS_STATUS); |
| boolean noFailure = true; |
| |
| /* |
| * assume that batch has been sorted by client |
| */ |
| int loc = 0; |
| while (loc < insertTabletNode.getRowCount()) { |
| long currTime = insertTabletNode.getTimes()[loc]; |
| // skip points that do not satisfy TTL |
| if (!isAlive(currTime)) { |
| results[loc] = |
| RpcUtils.getStatus( |
| TSStatusCode.OUT_OF_TTL, |
| String.format( |
| "Insertion time [%s] is less than ttl time bound [%s]", |
| DateTimeUtils.convertLongToDate(currTime), |
| DateTimeUtils.convertLongToDate( |
| CommonDateTimeUtils.currentTime() - dataTTL))); |
| loc++; |
| noFailure = false; |
| } else { |
| break; |
| } |
| } |
| // loc pointing at first legal position |
| if (loc == insertTabletNode.getRowCount()) { |
| throw new OutOfTTLException( |
| insertTabletNode.getTimes()[insertTabletNode.getTimes().length - 1], |
| (CommonDateTimeUtils.currentTime() - dataTTL)); |
| } |
| // before is first start point |
| int before = loc; |
| // before time partition |
| long beforeTimePartition = |
| TimePartitionUtils.getTimePartitionId(insertTabletNode.getTimes()[before]); |
| // init map |
| |
| if (config.isEnableSeparateData() |
| && !lastFlushTimeMap.checkAndCreateFlushedTimePartition(beforeTimePartition)) { |
| TimePartitionManager.getInstance() |
| .registerTimePartitionInfo( |
| new TimePartitionInfo( |
| new DataRegionId(Integer.parseInt(dataRegionId)), |
| beforeTimePartition, |
| true, |
| Long.MAX_VALUE, |
| 0)); |
| } |
| |
| long lastFlushTime = |
| config.isEnableSeparateData() |
| ? lastFlushTimeMap.getFlushedTime(beforeTimePartition, insertTabletNode.getDeviceID()) |
| : Long.MAX_VALUE; |
| |
| // if is sequence |
| boolean isSequence = false; |
| while (loc < insertTabletNode.getRowCount()) { |
| long time = insertTabletNode.getTimes()[loc]; |
| // always in some time partition |
| // judge if we should insert sequence |
| if (!isSequence && time > lastFlushTime) { |
| // insert into unsequence and then start sequence |
| noFailure = |
| insertTabletToTsFileProcessor( |
| insertTabletNode, before, loc, false, results, beforeTimePartition) |
| && noFailure; |
| before = loc; |
| isSequence = true; |
| } |
| loc++; |
| } |
| |
| // do not forget last part |
| if (before < loc) { |
| noFailure = |
| insertTabletToTsFileProcessor( |
| insertTabletNode, before, loc, isSequence, results, beforeTimePartition) |
| && noFailure; |
| } |
| startTime = System.nanoTime(); |
| tryToUpdateInsertTabletLastCache(insertTabletNode); |
| PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime() - startTime); |
| |
| if (!noFailure) { |
| throw new BatchProcessException(results); |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /** |
| * Check whether the time falls in TTL. |
| * |
| * @return whether the given time falls in ttl |
| */ |
| private boolean isAlive(long time) { |
| return dataTTL == Long.MAX_VALUE || (CommonDateTimeUtils.currentTime() - time) <= dataTTL; |
| } |
| |
| /** |
| * insert batch to tsfile processor thread-safety that the caller need to guarantee The rows to be |
| * inserted are in the range [start, end) Null value in each column values will be replaced by the |
| * subsequent non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5} |
| * |
| * @param insertTabletNode insert a tablet of a device |
| * @param sequence whether is sequence |
| * @param start start index of rows to be inserted in insertTabletPlan |
| * @param end end index of rows to be inserted in insertTabletPlan |
| * @param results result array |
| * @param timePartitionId time partition id |
| * @return false if any failure occurs when inserting the tablet, true otherwise |
| */ |
| private boolean insertTabletToTsFileProcessor( |
| InsertTabletNode insertTabletNode, |
| int start, |
| int end, |
| boolean sequence, |
| TSStatus[] results, |
| long timePartitionId) { |
| // return when start >= end or all measurement failed |
| if (start >= end || insertTabletNode.allMeasurementFailed()) { |
| return true; |
| } |
| |
| TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence); |
| if (tsFileProcessor == null) { |
| for (int i = start; i < end; i++) { |
| results[i] = |
| RpcUtils.getStatus( |
| TSStatusCode.INTERNAL_SERVER_ERROR, |
| "can not create TsFileProcessor, timePartitionId: " + timePartitionId); |
| } |
| return false; |
| } |
| |
| try { |
| tsFileProcessor.insertTablet(insertTabletNode, start, end, results); |
| } catch (WriteProcessRejectException e) { |
| logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage()); |
| return false; |
| } catch (WriteProcessException e) { |
| logger.error("insert to TsFileProcessor error ", e); |
| return false; |
| } |
| |
| // check memtable size and may async try to flush the work memtable |
| if (tsFileProcessor.shouldFlush()) { |
| fileFlushPolicy.apply(this, tsFileProcessor, sequence); |
| } |
| return true; |
| } |
| |
| private void tryToUpdateInsertTabletLastCache(InsertTabletNode node) { |
| if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable() |
| || (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) |
| && node.isSyncFromLeaderWhenUsingIoTConsensus())) { |
| // disable updating last cache on follower |
| return; |
| } |
| long latestFlushedTime = lastFlushTimeMap.getGlobalFlushedTime(node.getDeviceID()); |
| String[] measurements = node.getMeasurements(); |
| MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas(); |
| String[] rawMeasurements = new String[measurements.length]; |
| for (int i = 0; i < measurements.length; i++) { |
| if (measurementSchemas[i] != null) { |
| // get raw measurement rather than alias |
| rawMeasurements[i] = measurementSchemas[i].getMeasurementId(); |
| } else { |
| rawMeasurements[i] = measurements[i]; |
| } |
| } |
| DataNodeSchemaCache.getInstance() |
| .updateLastCache( |
| getDatabaseName(), |
| node.getDevicePath(), |
| rawMeasurements, |
| node.getMeasurementSchemas(), |
| node.isAligned(), |
| node::composeLastTimeValuePair, |
| index -> node.getColumns()[index] != null, |
| true, |
| latestFlushedTime); |
| } |
| |
| private void insertToTsFileProcessor( |
| InsertRowNode insertRowNode, |
| boolean sequence, |
| long timePartitionId, |
| Map<TsFileProcessor, Boolean> tsFileProcessorMapForFlushing) |
| throws WriteProcessException { |
| if (insertRowNode.allMeasurementFailed()) { |
| return; |
| } |
| TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence); |
| if (tsFileProcessor == null) { |
| return; |
| } |
| long[] costsForMetrics = new long[4]; |
| tsFileProcessor.insert(insertRowNode, costsForMetrics); |
| PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); |
| PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); |
| PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]); |
| PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]); |
| |
| tsFileProcessorMapForFlushing.put(tsFileProcessor, sequence); |
| |
| if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) { |
| if ((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) |
| && insertRowNode.isSyncFromLeaderWhenUsingIoTConsensus())) { |
| return; |
| } |
| // disable updating last cache on follower |
| long startTime = System.nanoTime(); |
| tryToUpdateInsertRowLastCache(insertRowNode); |
| PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime() - startTime); |
| } |
| } |
| |
| private void tryToUpdateInsertRowLastCache(InsertRowNode node) { |
| long latestFlushedTime = lastFlushTimeMap.getGlobalFlushedTime(node.getDeviceID()); |
| String[] measurements = node.getMeasurements(); |
| MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas(); |
| String[] rawMeasurements = new String[measurements.length]; |
| for (int i = 0; i < measurements.length; i++) { |
| if (measurementSchemas[i] != null) { |
| // get raw measurement rather than alias |
| rawMeasurements[i] = measurementSchemas[i].getMeasurementId(); |
| } else { |
| rawMeasurements[i] = measurements[i]; |
| } |
| } |
| DataNodeSchemaCache.getInstance() |
| .updateLastCache( |
| getDatabaseName(), |
| node.getDevicePath(), |
| rawMeasurements, |
| node.getMeasurementSchemas(), |
| node.isAligned(), |
| node::composeTimeValuePair, |
| index -> node.getValues()[index] != null, |
| true, |
| latestFlushedTime); |
| } |
| |
| private void insertToTsFileProcessors( |
| InsertRowsNode insertRowsNode, boolean[] areSequence, long[] timePartitionIds) { |
| List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>(); |
| long[] costsForMetrics = new long[4]; |
| Map<TsFileProcessor, Boolean> tsFileProcessorMapForFlushing = new HashMap<>(); |
| for (int i = 0; i < areSequence.length; i++) { |
| InsertRowNode insertRowNode = insertRowsNode.getInsertRowNodeList().get(i); |
| if (insertRowNode.allMeasurementFailed()) { |
| continue; |
| } |
| TsFileProcessor tsFileProcessor = |
| getOrCreateTsFileProcessor(timePartitionIds[i], areSequence[i]); |
| if (tsFileProcessor == null) { |
| continue; |
| } |
| tsFileProcessorMapForFlushing.put(tsFileProcessor, areSequence[i]); |
| try { |
| tsFileProcessor.insert(insertRowNode, costsForMetrics); |
| } catch (WriteProcessException e) { |
| insertRowsNode.getResults().put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); |
| } |
| executedInsertRowNodeList.add(insertRowNode); |
| } |
| |
| // check memtable size and may asyncTryToFlush the work memtable |
| for (Map.Entry<TsFileProcessor, Boolean> entry : tsFileProcessorMapForFlushing.entrySet()) { |
| if (entry.getKey().shouldFlush()) { |
| fileFlushPolicy.apply(this, entry.getKey(), entry.getValue()); |
| } |
| } |
| |
| PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(costsForMetrics[0]); |
| PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(costsForMetrics[1]); |
| PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(costsForMetrics[2]); |
| PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(costsForMetrics[3]); |
| |
| if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) { |
| if ((config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) |
| && insertRowsNode.isSyncFromLeaderWhenUsingIoTConsensus())) { |
| return; |
| } |
| // disable updating last cache on follower |
| long startTime = System.nanoTime(); |
| tryToUpdateInsertRowsLastCache(executedInsertRowNodeList); |
| PERFORMANCE_OVERVIEW_METRICS.recordScheduleUpdateLastCacheCost(System.nanoTime() - startTime); |
| } |
| } |
| |
| private void tryToUpdateInsertRowsLastCache(List<InsertRowNode> nodeList) { |
| DataNodeSchemaCache.getInstance().takeReadLock(); |
| try { |
| for (InsertRowNode node : nodeList) { |
| long latestFlushedTime = lastFlushTimeMap.getGlobalFlushedTime(node.getDeviceID()); |
| String[] measurements = node.getMeasurements(); |
| MeasurementSchema[] measurementSchemas = node.getMeasurementSchemas(); |
| String[] rawMeasurements = new String[measurements.length]; |
| for (int i = 0; i < measurements.length; i++) { |
| if (measurementSchemas[i] != null) { |
| // get raw measurement rather than alias |
| rawMeasurements[i] = measurementSchemas[i].getMeasurementId(); |
| } else { |
| rawMeasurements[i] = measurements[i]; |
| } |
| } |
| DataNodeSchemaCache.getInstance() |
| .updateLastCacheWithoutLock( |
| getDatabaseName(), |
| node.getDevicePath(), |
| rawMeasurements, |
| node.getMeasurementSchemas(), |
| node.isAligned(), |
| node::composeTimeValuePair, |
| index -> node.getValues()[index] != null, |
| true, |
| latestFlushedTime); |
| } |
| } finally { |
| DataNodeSchemaCache.getInstance().releaseReadLock(); |
| } |
| } |
| |
| /** |
| * WAL module uses this method to flush memTable |
| * |
| * @return True if flush task is submitted successfully |
| */ |
| public boolean submitAFlushTask(long timeRangeId, boolean sequence, IMemTable memTable) { |
| writeLock("submitAFlushTask"); |
| try { |
| if (memTable.getFlushStatus() != FlushStatus.WORKING) { |
| return false; |
| } |
| |
| TsFileProcessor tsFileProcessor; |
| if (sequence) { |
| tsFileProcessor = workSequenceTsFileProcessors.get(timeRangeId); |
| } else { |
| tsFileProcessor = workUnsequenceTsFileProcessors.get(timeRangeId); |
| } |
| // only submit when tsFileProcessor exists and memTables are same |
| boolean shouldSubmit = |
| tsFileProcessor != null && tsFileProcessor.getWorkMemTable() == memTable; |
| if (shouldSubmit) { |
| fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); |
| } |
| return shouldSubmit; |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /** |
| * mem control module uses this method to flush memTable |
| * |
| * @param tsFileProcessor tsfile processor in which memTable to be flushed |
| */ |
| public void submitAFlushTaskWhenShouldFlush(TsFileProcessor tsFileProcessor) { |
| if (closingSequenceTsFileProcessor.contains(tsFileProcessor) |
| || closingUnSequenceTsFileProcessor.contains(tsFileProcessor) |
| || tsFileProcessor.alreadyMarkedClosing()) { |
| return; |
| } |
| writeLock("submitAFlushTaskWhenShouldFlush"); |
| try { |
| // check memtable size and may asyncTryToFlush the work memtable |
| if (tsFileProcessor.shouldFlush()) { |
| fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean sequence) { |
| TsFileProcessor tsFileProcessor = null; |
| int retryCnt = 0; |
| do { |
| try { |
| if (IoTDBDescriptor.getInstance().getConfig().isQuotaEnable()) { |
| if (!DataNodeSpaceQuotaManager.getInstance().checkRegionDisk(databaseName)) { |
| throw new ExceedQuotaException( |
| "Unable to continue writing data, because the space allocated to the database " |
| + databaseName |
| + " has already used the upper limit", |
| TSStatusCode.SPACE_QUOTA_EXCEEDED.getStatusCode()); |
| } |
| } |
| if (sequence) { |
| tsFileProcessor = |
| getOrCreateTsFileProcessorIntern(timeRangeId, workSequenceTsFileProcessors, true); |
| } else { |
| tsFileProcessor = |
| getOrCreateTsFileProcessorIntern(timeRangeId, workUnsequenceTsFileProcessors, false); |
| } |
| } catch (DiskSpaceInsufficientException e) { |
| logger.error( |
| "disk space is insufficient when creating TsFile processor, change system mode to read-only", |
| e); |
| CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly); |
| break; |
| } catch (IOException e) { |
| if (retryCnt < 3) { |
| logger.warn("meet IOException when creating TsFileProcessor, retry it again", e); |
| retryCnt++; |
| } else { |
| logger.error( |
| "meet IOException when creating TsFileProcessor, change system mode to error", e); |
| CommonDescriptor.getInstance().getConfig().handleUnrecoverableError(); |
| break; |
| } |
| } catch (ExceedQuotaException e) { |
| logger.error(e.getMessage()); |
| break; |
| } |
| } while (tsFileProcessor == null); |
| return tsFileProcessor; |
| } |
| |
| /** |
| * get processor from hashmap, flush oldest processor if necessary |
| * |
| * @param timeRangeId time partition range |
| * @param tsFileProcessorTreeMap tsFileProcessorTreeMap |
| * @param sequence whether is sequence or not |
| */ |
| private TsFileProcessor getOrCreateTsFileProcessorIntern( |
| long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap, boolean sequence) |
| throws IOException, DiskSpaceInsufficientException { |
| |
| TsFileProcessor res = tsFileProcessorTreeMap.get(timeRangeId); |
| if (null == res) { |
| // build new processor, memory control module will control the number of memtables |
| TimePartitionManager.getInstance() |
| .updateAfterOpeningTsFileProcessor( |
| new DataRegionId(Integer.valueOf(dataRegionId)), timeRangeId); |
| res = newTsFileProcessor(sequence, timeRangeId); |
| if (workSequenceTsFileProcessors.get(timeRangeId) == null |
| && workUnsequenceTsFileProcessors.get(timeRangeId) == null) { |
| WritingMetrics.getInstance().recordActiveTimePartitionCount(1); |
| } |
| tsFileProcessorTreeMap.put(timeRangeId, res); |
| tsFileManager.add(res.getTsFileResource(), sequence); |
| } |
| |
| return res; |
| } |
| |
| private TsFileProcessor newTsFileProcessor(boolean sequence, long timePartitionId) |
| throws IOException, DiskSpaceInsufficientException { |
| long version = |
| partitionMaxFileVersions.compute( |
| timePartitionId, (key, oldVersion) -> (oldVersion == null ? 1 : oldVersion + 1)); |
| String filePath = |
| TsFileNameGenerator.generateNewTsFilePathWithMkdir( |
| sequence, |
| databaseName, |
| dataRegionId, |
| timePartitionId, |
| System.currentTimeMillis(), |
| version, |
| 0, |
| 0); |
| |
| return getTsFileProcessor(sequence, filePath, timePartitionId); |
| } |
| |
| private TsFileProcessor getTsFileProcessor( |
| boolean sequence, String filePath, long timePartitionId) throws IOException { |
| TsFileProcessor tsFileProcessor = |
| new TsFileProcessor( |
| databaseName + FILE_NAME_SEPARATOR + dataRegionId, |
| fsFactory.getFileWithParent(filePath), |
| dataRegionInfo, |
| this::closeUnsealedTsFileProcessorCallBack, |
| this::flushCallback, |
| sequence); |
| |
| TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(dataRegionInfo); |
| tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); |
| this.dataRegionInfo.initTsFileProcessorInfo(tsFileProcessor); |
| |
| tsFileProcessor.addCloseFileListeners(customCloseFileListeners); |
| tsFileProcessor.addFlushListeners(customFlushListeners); |
| tsFileProcessor.setTimeRangeId(timePartitionId); |
| |
| return tsFileProcessor; |
| } |
| |
| /** |
| * Create a new tsfile name |
| * |
| * @return file name |
| */ |
| private String getNewTsFileName(long timePartitionId) { |
| long version = |
| partitionMaxFileVersions.compute( |
| timePartitionId, (key, oldVersion) -> (oldVersion == null ? 1 : oldVersion + 1)); |
| return getNewTsFileName(System.currentTimeMillis(), version, 0, 0); |
| } |
| |
| private String getNewTsFileName(long time, long version, int mergeCnt, int unseqCompactionCnt) { |
| return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt, unseqCompactionCnt); |
| } |
| |
| /** |
| * close one tsfile processor |
| * |
| * @param sequence whether this tsfile processor is sequence or not |
| * @param tsFileProcessor tsfile processor |
| */ |
| public void syncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) { |
| synchronized (closeStorageGroupCondition) { |
| try { |
| asyncCloseOneTsFileProcessor(sequence, tsFileProcessor); |
| long startTime = System.currentTimeMillis(); |
| while (closingSequenceTsFileProcessor.contains(tsFileProcessor) |
| || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) { |
| closeStorageGroupCondition.wait(60_000); |
| if (System.currentTimeMillis() - startTime > 60_000) { |
| logger.warn( |
| "{} has spent {}s to wait for closing one tsfile.", |
| databaseName + "-" + this.dataRegionId, |
| (System.currentTimeMillis() - startTime) / 1000); |
| } |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| logger.error( |
| "syncCloseOneTsFileProcessor error occurs while waiting for closing the storage " |
| + "group {}", |
| databaseName + "-" + dataRegionId, |
| e); |
| } |
| } |
| } |
| |
| /** |
| * close one tsfile processor, thread-safety should be ensured by caller |
| * |
| * @param sequence whether this tsfile processor is sequence or not |
| * @param tsFileProcessor tsfile processor |
| */ |
| public Future<?> asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) { |
| // for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed. |
| // for unsequence tsfile, we have maintained the endTimeMap when an insertion comes. |
| if (closingSequenceTsFileProcessor.contains(tsFileProcessor) |
| || closingUnSequenceTsFileProcessor.contains(tsFileProcessor) |
| || tsFileProcessor.alreadyMarkedClosing()) { |
| return CompletableFuture.completedFuture(null); |
| } |
| logger.info( |
| "Async close tsfile: {}", |
| tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath()); |
| Future<?> future; |
| if (sequence) { |
| closingSequenceTsFileProcessor.add(tsFileProcessor); |
| future = tsFileProcessor.asyncClose(); |
| |
| workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId()); |
| // if unsequence files don't contain this time range id, we should remove it's version |
| // controller |
| if (!workUnsequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) { |
| timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId()); |
| } |
| } else { |
| closingUnSequenceTsFileProcessor.add(tsFileProcessor); |
| future = tsFileProcessor.asyncClose(); |
| |
| workUnsequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId()); |
| // if sequence files don't contain this time range id, we should remove it's version |
| // controller |
| if (!workSequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) { |
| timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId()); |
| } |
| } |
| if (workSequenceTsFileProcessors.get(tsFileProcessor.getTimeRangeId()) == null |
| && workUnsequenceTsFileProcessors.get(tsFileProcessor.getTimeRangeId()) == null) { |
| WritingMetrics.getInstance().recordActiveTimePartitionCount(-1); |
| } |
| return future; |
| } |
| |
| /** |
| * delete the database's own folder in folder data/system/databases |
| * |
| * @param systemDir system dir |
| */ |
| public void deleteFolder(String systemDir) { |
| logger.info( |
| "{} will close all files for deleting data folder {}", |
| databaseName + "-" + dataRegionId, |
| systemDir); |
| writeLock("deleteFolder"); |
| try { |
| File dataRegionSystemFolder = |
| SystemFileFactory.INSTANCE.getFile( |
| systemDir + File.separator + databaseName, dataRegionId); |
| org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent( |
| dataRegionSystemFolder); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /** close all tsfile resource */ |
| public void closeAllResources() { |
| for (TsFileResource tsFileResource : tsFileManager.getTsFileList(false)) { |
| try { |
| tsFileResource.close(); |
| } catch (IOException e) { |
| logger.error("Cannot close a TsFileResource {}", tsFileResource, e); |
| } |
| } |
| for (TsFileResource tsFileResource : tsFileManager.getTsFileList(true)) { |
| try { |
| tsFileResource.close(); |
| } catch (IOException e) { |
| logger.error("Cannot close a TsFileResource {}", tsFileResource, e); |
| } |
| } |
| } |
| |
| /** delete tsfile */ |
| public void syncDeleteDataFiles() { |
| logger.info( |
| "{} will close all files for deleting data files", databaseName + "-" + dataRegionId); |
| writeLock("syncDeleteDataFiles"); |
| try { |
| |
| syncCloseAllWorkingTsFileProcessors(); |
| // normally, mergingModification is just need to be closed by after a merge task is finished. |
| // we close it here just for IT test. |
| closeAllResources(); |
| List<TsFileResource> tsFileResourceList = tsFileManager.getTsFileList(true); |
| tsFileResourceList.addAll(tsFileManager.getTsFileList(false)); |
| tsFileResourceList.forEach( |
| x -> { |
| FileMetrics.getInstance().deleteTsFile(x.isSeq(), Collections.singletonList(x)); |
| if (x.getModFile().exists()) { |
| FileMetrics.getInstance().decreaseModFileNum(1); |
| FileMetrics.getInstance().decreaseModFileSize(x.getModFile().getSize()); |
| } |
| }); |
| deleteAllSGFolders(TierManager.getInstance().getAllFilesFolders()); |
| this.workSequenceTsFileProcessors.clear(); |
| this.workUnsequenceTsFileProcessors.clear(); |
| this.tsFileManager.clear(); |
| lastFlushTimeMap.clearFlushedTime(); |
| lastFlushTimeMap.clearGlobalFlushedTime(); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| private void deleteAllSGFolders(List<String> folder) { |
| for (String tsfilePath : folder) { |
| File dataRegionDataFolder = |
| fsFactory.getFile(tsfilePath, databaseName + File.separator + dataRegionId); |
| if (FSUtils.getFSType(dataRegionDataFolder) != FSType.LOCAL) { |
| try { |
| fsFactory.deleteDirectory(dataRegionDataFolder.getPath()); |
| } catch (IOException e) { |
| logger.error("Fail to delete data region folder {}", dataRegionDataFolder); |
| } |
| } else { |
| if (dataRegionDataFolder.exists()) { |
| org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent( |
| dataRegionDataFolder); |
| } |
| } |
| } |
| } |
| |
| /** Iterate each TsFile and try to lock and remove those out of TTL. */ |
| public synchronized void checkFilesTTL() { |
| if (dataTTL == Long.MAX_VALUE) { |
| logger.debug("{}: TTL not set, ignore the check", databaseName + "-" + dataRegionId); |
| return; |
| } |
| long ttlLowerBound = CommonDateTimeUtils.currentTime() - dataTTL; |
| logger.debug( |
| "{}: TTL removing files before {}", |
| databaseName + "-" + dataRegionId, |
| new Date(ttlLowerBound)); |
| |
| // copy to avoid concurrent modification of deletion |
| List<TsFileResource> seqFiles = new ArrayList<>(tsFileManager.getTsFileList(true)); |
| List<TsFileResource> unseqFiles = new ArrayList<>(tsFileManager.getTsFileList(false)); |
| |
| for (TsFileResource tsFileResource : seqFiles) { |
| checkFileTTL(tsFileResource, ttlLowerBound, true); |
| } |
| for (TsFileResource tsFileResource : unseqFiles) { |
| checkFileTTL(tsFileResource, ttlLowerBound, false); |
| } |
| } |
| |
| private void checkFileTTL(TsFileResource resource, long ttlLowerBound, boolean isSeq) { |
| if (!resource.isClosed() || !resource.isDeleted() && resource.stillLives(ttlLowerBound)) { |
| return; |
| } |
| // Try to set the resource to DELETED status and return if it failed |
| if (!resource.setStatus(TsFileResourceStatus.DELETED)) { |
| return; |
| } |
| tsFileManager.remove(resource, isSeq); |
| // ensure that the file is not used by any queries |
| resource.writeLock(); |
| try { |
| // try to delete physical data file |
| resource.remove(); |
| FileMetrics.getInstance().deleteTsFile(isSeq, Collections.singletonList(resource)); |
| logger.info( |
| "Removed a file {} before {} by ttl ({} {})", |
| resource.getTsFilePath(), |
| new Date(ttlLowerBound), |
| dataTTL, |
| CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); |
| } finally { |
| resource.writeUnlock(); |
| } |
| } |
| |
| public void timedFlushSeqMemTable() { |
| int count = 0; |
| writeLock("timedFlushSeqMemTable"); |
| try { |
| // only check sequence tsfiles' memtables |
| List<TsFileProcessor> tsFileProcessors = |
| new ArrayList<>(workSequenceTsFileProcessors.values()); |
| long timeLowerBound = System.currentTimeMillis() - config.getSeqMemtableFlushInterval(); |
| for (TsFileProcessor tsFileProcessor : tsFileProcessors) { |
| if (tsFileProcessor.getWorkMemTableUpdateTime() < timeLowerBound) { |
| logger.info( |
| "Exceed sequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]", |
| tsFileProcessor.getTimeRangeId(), |
| databaseName, |
| dataRegionId); |
| fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); |
| count++; |
| } |
| } |
| } finally { |
| writeUnlock(); |
| } |
| WritingMetrics.getInstance().recordTimedFlushMemTableCount(dataRegionId, count); |
| } |
| |
| public void timedFlushUnseqMemTable() { |
| int count = 0; |
| writeLock("timedFlushUnseqMemTable"); |
| try { |
| // only check unsequence tsfiles' memtables |
| List<TsFileProcessor> tsFileProcessors = |
| new ArrayList<>(workUnsequenceTsFileProcessors.values()); |
| long timeLowerBound = System.currentTimeMillis() - config.getUnseqMemtableFlushInterval(); |
| |
| for (TsFileProcessor tsFileProcessor : tsFileProcessors) { |
| if (tsFileProcessor.getWorkMemTableUpdateTime() < timeLowerBound) { |
| logger.info( |
| "Exceed unsequence memtable flush interval, so flush working memtable of time partition {} in database {}[{}]", |
| tsFileProcessor.getTimeRangeId(), |
| databaseName, |
| dataRegionId); |
| fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); |
| count++; |
| } |
| } |
| } finally { |
| writeUnlock(); |
| } |
| WritingMetrics.getInstance().recordTimedFlushMemTableCount(dataRegionId, count); |
| } |
| |
| /** This method will be blocked until all tsfile processors are closed. */ |
| public void syncCloseAllWorkingTsFileProcessors() { |
| try { |
| List<Future<?>> tsFileProcessorsClosingFutures = asyncCloseAllWorkingTsFileProcessors(); |
| long startTime = System.currentTimeMillis(); |
| while (!closingSequenceTsFileProcessor.isEmpty() |
| || !closingUnSequenceTsFileProcessor.isEmpty()) { |
| synchronized (closeStorageGroupCondition) { |
| // double check to avoid unnecessary waiting |
| if (!closingSequenceTsFileProcessor.isEmpty() |
| || !closingUnSequenceTsFileProcessor.isEmpty()) { |
| closeStorageGroupCondition.wait(60_000); |
| } |
| } |
| if (System.currentTimeMillis() - startTime > 60_000) { |
| logger.warn( |
| "{} has spent {}s to wait for closing all TsFiles.", |
| databaseName + "-" + this.dataRegionId, |
| (System.currentTimeMillis() - startTime) / 1000); |
| } |
| } |
| for (Future<?> f : tsFileProcessorsClosingFutures) { |
| if (f != null) { |
| f.get(); |
| } |
| } |
| } catch (InterruptedException | ExecutionException e) { |
| logger.error( |
| "CloseFileNodeCondition error occurs while waiting for closing the storage " + "group {}", |
| databaseName + "-" + dataRegionId, |
| e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| /** close all working tsfile processors */ |
| public List<Future<?>> asyncCloseAllWorkingTsFileProcessors() { |
| writeLock("asyncCloseAllWorkingTsFileProcessors"); |
| List<Future<?>> futures = new ArrayList<>(); |
| try { |
| logger.info("async force close all files in database: {}", databaseName + "-" + dataRegionId); |
| // to avoid concurrent modification problem, we need a new array list |
| for (TsFileProcessor tsFileProcessor : |
| new ArrayList<>(workSequenceTsFileProcessors.values())) { |
| futures.add(asyncCloseOneTsFileProcessor(true, tsFileProcessor)); |
| } |
| // to avoid concurrent modification problem, we need a new array list |
| for (TsFileProcessor tsFileProcessor : |
| new ArrayList<>(workUnsequenceTsFileProcessors.values())) { |
| futures.add(asyncCloseOneTsFileProcessor(false, tsFileProcessor)); |
| } |
| } finally { |
| writeUnlock(); |
| } |
| return futures; |
| } |
| |
| /** force close all working tsfile processors */ |
| public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException { |
| writeLock("forceCloseAllWorkingTsFileProcessors"); |
| try { |
| logger.info("force close all processors in database: {}", databaseName + "-" + dataRegionId); |
| // to avoid concurrent modification problem, we need a new array list |
| for (TsFileProcessor tsFileProcessor : |
| new ArrayList<>(workSequenceTsFileProcessors.values())) { |
| tsFileProcessor.putMemTableBackAndClose(); |
| } |
| // to avoid concurrent modification problem, we need a new array list |
| for (TsFileProcessor tsFileProcessor : |
| new ArrayList<>(workUnsequenceTsFileProcessors.values())) { |
| tsFileProcessor.putMemTableBackAndClose(); |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /** used for queryengine */ |
| @Override |
| public QueryDataSource query( |
| List<PartialPath> pathList, |
| String singleDeviceId, |
| QueryContext context, |
| Filter globalTimeFilter, |
| List<Long> timePartitions) |
| throws QueryProcessException { |
| try { |
| List<TsFileResource> seqResources = |
| getFileResourceListForQuery( |
| tsFileManager.getTsFileList(true, timePartitions, globalTimeFilter), |
| pathList, |
| singleDeviceId, |
| context, |
| globalTimeFilter, |
| true); |
| List<TsFileResource> unseqResources = |
| getFileResourceListForQuery( |
| tsFileManager.getTsFileList(false, timePartitions, globalTimeFilter), |
| pathList, |
| singleDeviceId, |
| context, |
| globalTimeFilter, |
| false); |
| |
| QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, seqResources.size()); |
| QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(UNSEQUENCE_TSFILE, unseqResources.size()); |
| |
| QueryDataSource dataSource = new QueryDataSource(seqResources, unseqResources); |
| dataSource.setDataTTL(dataTTL); |
| return dataSource; |
| } catch (MetadataException e) { |
| throw new QueryProcessException(e); |
| } |
| } |
| |
| /** lock the read lock of the insert lock */ |
| @Override |
| public void readLock() { |
| // apply read lock for SG insert lock to prevent inconsistent with concurrently writing memtable |
| insertLock.readLock().lock(); |
| // apply read lock for TsFileResource list |
| tsFileManager.readLock(); |
| } |
| |
| /** unlock the read lock of insert lock */ |
| @Override |
| public void readUnlock() { |
| tsFileManager.readUnlock(); |
| insertLock.readLock().unlock(); |
| } |
| |
| /** lock the write lock of the insert lock */ |
| public void writeLock(String holder) { |
| insertLock.writeLock().lock(); |
| insertWriteLockHolder = holder; |
| } |
| |
| /** unlock the write lock of the insert lock */ |
| public void writeUnlock() { |
| insertWriteLockHolder = ""; |
| insertLock.writeLock().unlock(); |
| } |
| |
| /** |
| * @param tsFileResources includes sealed and unsealed tsfile resources |
| * @return fill unsealed tsfile resources with memory data and ChunkMetadataList of data in disk |
| */ |
| private List<TsFileResource> getFileResourceListForQuery( |
| Collection<TsFileResource> tsFileResources, |
| List<PartialPath> pathList, |
| String singleDeviceId, |
| QueryContext context, |
| Filter globalTimeFilter, |
| boolean isSeq) |
| throws MetadataException { |
| |
| if (context.isDebug()) { |
| DEBUG_LOGGER.info( |
| "Path: {}, get tsfile list: {} isSeq: {} timefilter: {}", |
| pathList, |
| tsFileResources, |
| isSeq, |
| (globalTimeFilter == null ? "null" : globalTimeFilter)); |
| } |
| |
| List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); |
| |
| long timeLowerBound = |
| dataTTL != Long.MAX_VALUE ? CommonDateTimeUtils.currentTime() - dataTTL : Long.MIN_VALUE; |
| context.setQueryTimeLowerBound(timeLowerBound); |
| |
| for (TsFileResource tsFileResource : tsFileResources) { |
| if (!tsFileResource.isSatisfied( |
| singleDeviceId == null ? null : new PlainDeviceID(singleDeviceId), |
| globalTimeFilter, |
| isSeq, |
| dataTTL, |
| context.isDebug())) { |
| continue; |
| } |
| closeQueryLock.readLock().lock(); |
| try { |
| if (tsFileResource.isClosed()) { |
| tsfileResourcesForQuery.add(tsFileResource); |
| } else { |
| tsFileResource.getProcessor().query(pathList, context, tsfileResourcesForQuery); |
| } |
| } catch (IOException e) { |
| throw new MetadataException(e); |
| } finally { |
| closeQueryLock.readLock().unlock(); |
| } |
| } |
| return tsfileResourcesForQuery; |
| } |
| |
| /** Seperate tsfiles in TsFileManager to sealedList and unsealedList. */ |
| private void separateTsFile( |
| List<TsFileResource> sealedResource, |
| List<TsFileResource> unsealedResource, |
| long startTime, |
| long endTime) { |
| tsFileManager |
| .getTsFileList(true, startTime, endTime) |
| .forEach( |
| tsFileResource -> { |
| if (tsFileResource.isClosed()) { |
| sealedResource.add(tsFileResource); |
| } else { |
| unsealedResource.add(tsFileResource); |
| } |
| }); |
| tsFileManager |
| .getTsFileList(false, startTime, endTime) |
| .forEach( |
| tsFileResource -> { |
| if (tsFileResource.isClosed()) { |
| sealedResource.add(tsFileResource); |
| } else { |
| unsealedResource.add(tsFileResource); |
| } |
| }); |
| } |
| |
| /** |
| * @param pattern Must be a pattern start with a precise device path |
| * @param startTime |
| * @param endTime |
| * @param searchIndex |
| * @throws IOException |
| */ |
| public void deleteByDevice(PartialPath pattern, long startTime, long endTime, long searchIndex) |
| throws IOException { |
| if (SettleService.getINSTANCE().getFilesToBeSettledCount().get() != 0) { |
| throw new IOException( |
| "Delete failed. " + "Please do not delete until the old files settled."); |
| } |
| // TODO: how to avoid partial deletion? |
| // FIXME: notice that if we may remove a SGProcessor out of memory, we need to close all opened |
| // mod files in mergingModification, sequenceFileList, and unsequenceFileList |
| writeLock("delete"); |
| |
| boolean hasReleasedLock = false; |
| |
| try { |
| DataNodeSchemaCache.getInstance().invalidateLastCache(pattern); |
| Set<PartialPath> devicePaths = new HashSet<>(pattern.getDevicePathPattern()); |
| // write log to impacted working TsFileProcessors |
| List<WALFlushListener> walListeners = |
| logDeletionInWAL(startTime, endTime, searchIndex, pattern); |
| |
| for (WALFlushListener walFlushListener : walListeners) { |
| if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) { |
| logger.error("Fail to log delete to wal.", walFlushListener.getCause()); |
| throw walFlushListener.getCause(); |
| } |
| } |
| |
| Deletion deletion = new Deletion(pattern, MERGE_MOD_START_VERSION_NUM, startTime, endTime); |
| |
| List<TsFileResource> sealedTsFileResource = new ArrayList<>(); |
| List<TsFileResource> unsealedTsFileResource = new ArrayList<>(); |
| separateTsFile(sealedTsFileResource, unsealedTsFileResource, startTime, endTime); |
| // deviceMatchInfo is used for filter the matched deviceId in TsFileResource |
| // deviceMatchInfo contains the DeviceId means this device matched the pattern |
| Set<String> deviceMatchInfo = new HashSet<>(); |
| deleteDataInFiles(unsealedTsFileResource, deletion, devicePaths, deviceMatchInfo); |
| writeUnlock(); |
| hasReleasedLock = true; |
| |
| deleteDataInFiles(sealedTsFileResource, deletion, devicePaths, deviceMatchInfo); |
| } catch (Exception e) { |
| throw new IOException(e); |
| } finally { |
| if (!hasReleasedLock) { |
| writeUnlock(); |
| } |
| } |
| } |
| |
| public void deleteDataDirectly( |
| PartialPath pathToDelete, long startTime, long endTime, long searchIndex) throws IOException { |
| logger.info( |
| "{} will delete data files directly for deleting data between {} and {}", |
| databaseName + "-" + dataRegionId, |
| startTime, |
| endTime); |
| |
| writeLock("deleteDataDirect"); |
| boolean releasedLock = false; |
| |
| try { |
| DataNodeSchemaCache.getInstance().invalidateLastCacheInDataRegion(getDatabaseName()); |
| // write log to impacted working TsFileProcessors |
| List<WALFlushListener> walListeners = |
| logDeletionInWAL(startTime, endTime, searchIndex, pathToDelete); |
| |
| for (WALFlushListener walFlushListener : walListeners) { |
| if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) { |
| logger.error("Fail to log delete to wal.", walFlushListener.getCause()); |
| throw walFlushListener.getCause(); |
| } |
| } |
| List<TsFileResource> sealedTsFileResource = new ArrayList<>(); |
| List<TsFileResource> unsealedTsFileResource = new ArrayList<>(); |
| separateTsFile(sealedTsFileResource, unsealedTsFileResource, startTime, endTime); |
| deleteDataDirectlyInFile(unsealedTsFileResource, pathToDelete, startTime, endTime); |
| writeUnlock(); |
| releasedLock = true; |
| deleteDataDirectlyInFile(sealedTsFileResource, pathToDelete, startTime, endTime); |
| } catch (Exception e) { |
| throw new IOException(e); |
| } finally { |
| if (!releasedLock) { |
| writeUnlock(); |
| } |
| } |
| } |
| |
| private List<WALFlushListener> logDeletionInWAL( |
| long startTime, long endTime, long searchIndex, PartialPath path) { |
| List<WALFlushListener> walFlushListeners = new ArrayList<>(); |
| if (config.getWalMode() == WALMode.DISABLE) { |
| return walFlushListeners; |
| } |
| DeleteDataNode deleteDataNode = |
| new DeleteDataNode(new PlanNodeId(""), Collections.singletonList(path), startTime, endTime); |
| deleteDataNode.setSearchIndex(searchIndex); |
| for (Map.Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) { |
| if (TimePartitionUtils.satisfyPartitionId(startTime, endTime, entry.getKey())) { |
| WALFlushListener walFlushListener = entry.getValue().logDeleteDataNodeInWAL(deleteDataNode); |
| walFlushListeners.add(walFlushListener); |
| } |
| } |
| for (Map.Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) { |
| if (TimePartitionUtils.satisfyPartitionId(startTime, endTime, entry.getKey())) { |
| WALFlushListener walFlushListener = entry.getValue().logDeleteDataNodeInWAL(deleteDataNode); |
| walFlushListeners.add(walFlushListener); |
| } |
| } |
| return walFlushListeners; |
| } |
| |
| private boolean canSkipDelete( |
| TsFileResource tsFileResource, |
| Set<PartialPath> devicePaths, |
| long deleteStart, |
| long deleteEnd, |
| Set<String> deviceMatchInfo) { |
| long fileStartTime = tsFileResource.getTimeIndex().getMinStartTime(); |
| long fileEndTime = tsFileResource.getTimeIndex().getMaxEndTime(); |
| |
| for (PartialPath device : devicePaths) { |
| long deviceStartTime, deviceEndTime; |
| if (device.hasWildcard()) { |
| if (!tsFileResource.isClosed() && fileEndTime == Long.MIN_VALUE) { |
| // unsealed seq file |
| if (deleteEnd < fileStartTime) { |
| // time range of file has not overlapped with the deletion |
| return true; |
| } |
| } else { |
| if (deleteEnd < fileStartTime || deleteStart > fileEndTime) { |
| // time range of file has not overlapped with the deletion |
| return true; |
| } |
| } |
| if (databaseName.contentEquals(device.getDevice())) { |
| return false; |
| } |
| Pair<Long, Long> startAndEndTime = |
| tsFileResource.getPossibleStartTimeAndEndTime( |
| device, |
| deviceMatchInfo.stream().map(PlainDeviceID::new).collect(Collectors.toSet())); |
| if (startAndEndTime == null) { |
| continue; |
| } |
| deviceStartTime = startAndEndTime.getLeft(); |
| deviceEndTime = startAndEndTime.getRight(); |
| } else { |
| // TODO: DELETE |
| IDeviceID deviceId = new PlainDeviceID(device.getFullPath()); |
| if (tsFileResource.definitelyNotContains(deviceId)) { |
| // resource does not contain this device |
| continue; |
| } |
| deviceStartTime = tsFileResource.getStartTime(deviceId); |
| deviceEndTime = tsFileResource.getEndTime(deviceId); |
| } |
| |
| if (!tsFileResource.isClosed() && deviceEndTime == Long.MIN_VALUE) { |
| // unsealed seq file |
| if (deleteEnd >= deviceStartTime) { |
| return false; |
| } |
| } else { |
| // sealed file or unsealed unseq file |
| if (deleteEnd >= deviceStartTime && deleteStart <= deviceEndTime) { |
| // time range of device has overlap with the deletion |
| return false; |
| } |
| } |
| } |
| return true; |
| } |
| |
| // suppress warn of Throwable catch |
| @SuppressWarnings("java:S1181") |
| private void deleteDataInFiles( |
| Collection<TsFileResource> tsFileResourceList, |
| Deletion deletion, |
| Set<PartialPath> devicePaths, |
| Set<String> deviceMatchInfo) |
| throws IOException { |
| for (TsFileResource tsFileResource : tsFileResourceList) { |
| if (canSkipDelete( |
| tsFileResource, |
| devicePaths, |
| deletion.getStartTime(), |
| deletion.getEndTime(), |
| deviceMatchInfo)) { |
| continue; |
| } |
| |
| ModificationFile modFile = tsFileResource.getModFile(); |
| if (tsFileResource.isClosed()) { |
| long originSize = -1; |
| synchronized (modFile) { |
| try { |
| originSize = modFile.getSize(); |
| // delete data in sealed file |
| if (tsFileResource.isCompacting()) { |
| // we have to set modification offset to MAX_VALUE, as the offset of source chunk may |
| // change after compaction |
| deletion.setFileOffset(Long.MAX_VALUE); |
| // write deletion into compaction modification file |
| tsFileResource.getCompactionModFile().write(deletion); |
| // write deletion into modification file to enable read during compaction |
| modFile.write(deletion); |
| // remember to close mod file |
| tsFileResource.getCompactionModFile().close(); |
| modFile.close(); |
| } else { |
| deletion.setFileOffset(tsFileResource.getTsFileSize()); |
| // write deletion into modification file |
| boolean modFileExists = modFile.exists(); |
| |
| modFile.write(deletion); |
| |
| // remember to close mod file |
| modFile.close(); |
| |
| // if file length greater than 1M,execute compact. |
| modFile.compact(); |
| |
| if (!modFileExists) { |
| FileMetrics.getInstance().increaseModFileNum(1); |
| } |
| |
| // The file size may be smaller than the original file, so the increment here may be |
| // negative |
| FileMetrics.getInstance().increaseModFileSize(modFile.getSize() - originSize); |
| } |
| } catch (Throwable t) { |
| if (originSize != -1) { |
| modFile.truncate(originSize); |
| } |
| throw t; |
| } |
| logger.info( |
| "[Deletion] Deletion with path:{}, time:{}-{} written into mods file:{}.", |
| deletion.getPath(), |
| deletion.getStartTime(), |
| deletion.getEndTime(), |
| modFile.getFilePath()); |
| } |
| } else { |
| // delete data in memory of unsealed file |
| tsFileResource.getProcessor().deleteDataInMemory(deletion, devicePaths); |
| } |
| } |
| } |
| |
| private void deleteDataDirectlyInFile( |
| List<TsFileResource> tsfileResourceList, |
| PartialPath pathToDelete, |
| long startTime, |
| long endTime) |
| throws IOException { |
| List<TsFileResource> deletedByMods = new ArrayList<>(); |
| List<TsFileResource> deletedByFiles = new ArrayList<>(); |
| separateTsFileToDelete( |
| new HashSet<>(pathToDelete.getDevicePathPattern()), |
| tsfileResourceList, |
| deletedByMods, |
| deletedByFiles, |
| startTime, |
| endTime); |
| Deletion deletion = new Deletion(pathToDelete, MERGE_MOD_START_VERSION_NUM, startTime, endTime); |
| // can be deleted by mods. |
| for (TsFileResource tsFileResource : deletedByMods) { |
| ModificationFile modFile = tsFileResource.getModFile(); |
| if (tsFileResource.isClosed()) { |
| long originSize = -1; |
| synchronized (modFile) { |
| try { |
| originSize = modFile.getSize(); |
| // delete data in sealed file |
| if (tsFileResource.isCompacting()) { |
| // we have to set modification offset to MAX_VALUE, as the offset of source chunk |
| // may change after compaction |
| deletion.setFileOffset(Long.MAX_VALUE); |
| // write deletion into compaction modification file |
| tsFileResource.getCompactionModFile().write(deletion); |
| // write deletion into modification file to enable read during compaction |
| modFile.write(deletion); |
| // remember to close mod file |
| tsFileResource.getCompactionModFile().close(); |
| modFile.close(); |
| } else { |
| deletion.setFileOffset(tsFileResource.getTsFileSize()); |
| // write deletion into modification file |
| boolean modFileExists = modFile.exists(); |
| |
| modFile.write(deletion); |
| |
| // remember to close mod file |
| modFile.close(); |
| |
| // if file length greater than 1M,execute compact. |
| modFile.compact(); |
| |
| if (!modFileExists) { |
| FileMetrics.getInstance().increaseModFileNum(1); |
| } |
| |
| // The file size may be smaller than the original file, so the increment here may be |
| // negative |
| FileMetrics.getInstance().increaseModFileSize(modFile.getSize() - originSize); |
| } |
| } catch (Throwable t) { |
| if (originSize != -1) { |
| modFile.truncate(originSize); |
| } |
| throw t; |
| } |
| logger.info( |
| "[Deletion] Deletion with path:{}, time:{}-{} written into mods file:{}.", |
| deletion.getPath(), |
| deletion.getStartTime(), |
| deletion.getEndTime(), |
| modFile.getFilePath()); |
| } |
| } else { |
| // delete data in memory of unsealed file |
| tsFileResource |
| .getProcessor() |
| .deleteDataInMemory(deletion, new HashSet<>(pathToDelete.getDevicePathPattern())); |
| } |
| } |
| |
| // can be deleted by files |
| for (TsFileResource tsFileResource : deletedByFiles) { |
| tsFileManager.remove(tsFileResource, tsFileResource.isSeq()); |
| tsFileResource.writeLock(); |
| try { |
| FileMetrics.getInstance() |
| .deleteTsFile(tsFileResource.isSeq(), Collections.singletonList(tsFileResource)); |
| if (tsFileResource.getModFile().exists()) { |
| FileMetrics.getInstance().decreaseModFileNum(1); |
| FileMetrics.getInstance().decreaseModFileSize(tsFileResource.getModFile().getSize()); |
| } |
| tsFileResource.remove(); |
| logger.info("Remove tsfile {} directly when delete data", tsFileResource.getTsFilePath()); |
| } finally { |
| tsFileResource.writeUnlock(); |
| } |
| } |
| } |
| |
| private void separateTsFileToDelete( |
| Set<PartialPath> pathToDelete, |
| List<TsFileResource> tsFileResourceList, |
| List<TsFileResource> deletedByMods, |
| List<TsFileResource> deletedByFiles, |
| long startTime, |
| long endTime) { |
| Set<String> deviceMatchInfo = new HashSet<>(); |
| for (TsFileResource file : tsFileResourceList) { |
| long fileStartTime = file.getTimeIndex().getMinStartTime(); |
| long fileEndTime = file.getTimeIndex().getMaxEndTime(); |
| |
| if (!canSkipDelete(file, pathToDelete, startTime, endTime, deviceMatchInfo)) { |
| if (startTime <= fileStartTime |
| && endTime >= fileEndTime |
| && file.isClosed() |
| && file.setStatus(TsFileResourceStatus.DELETED)) { |
| deletedByFiles.add(file); |
| } else { |
| deletedByMods.add(file); |
| } |
| } |
| } |
| } |
| |
| private void flushCallback( |
| TsFileProcessor processor, Map<IDeviceID, Long> updateMap, long systemFlushTime) { |
| if (config.isEnableSeparateData() |
| && CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) { |
| // Update both partitionLastFlushTime and globalLastFlushTime |
| lastFlushTimeMap.updateLatestFlushTime(processor.getTimeRangeId(), updateMap); |
| } else { |
| // isEnableSeparateData is true and isLastCacheEnable is false, then update |
| // partitionLastFlushTime only |
| lastFlushTimeMap.updateMultiDeviceFlushedTime(processor.getTimeRangeId(), updateMap); |
| } |
| |
| if (config.isEnableSeparateData()) { |
| TimePartitionManager.getInstance() |
| .updateAfterFlushing( |
| new DataRegionId(Integer.parseInt(dataRegionId)), |
| processor.getTimeRangeId(), |
| systemFlushTime, |
| lastFlushTimeMap.getMemSize(processor.getTimeRangeId()), |
| workSequenceTsFileProcessors.get(processor.getTimeRangeId()) != null); |
| } |
| } |
| |
| /** Put the memtable back to the MemTablePool and make the metadata in writer visible */ |
| // TODO please consider concurrency with read and insert method. |
| private void closeUnsealedTsFileProcessorCallBack(TsFileProcessor tsFileProcessor) |
| throws TsFileProcessorException { |
| boolean isEmptyFile = |
| tsFileProcessor.isEmpty() || tsFileProcessor.getTsFileResource().isEmpty(); |
| boolean isValidateTsFileFailed = false; |
| if (!isEmptyFile) { |
| isValidateTsFileFailed = |
| !TsFileValidator.getInstance().validateTsFile(tsFileProcessor.getTsFileResource()); |
| } |
| closeQueryLock.writeLock().lock(); |
| try { |
| tsFileProcessor.close(); |
| if (isEmptyFile) { |
| tsFileProcessor.getTsFileResource().remove(); |
| } else if (isValidateTsFileFailed) { |
| String tsFilePath = tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath(); |
| renameAndHandleError(tsFilePath, tsFilePath + BROKEN_SUFFIX); |
| renameAndHandleError( |
| tsFilePath + RESOURCE_SUFFIX, tsFilePath + RESOURCE_SUFFIX + BROKEN_SUFFIX); |
| } else { |
| tsFileResourceManager.registerSealedTsFileResource(tsFileProcessor.getTsFileResource()); |
| } |
| } finally { |
| closeQueryLock.writeLock().unlock(); |
| } |
| if (isEmptyFile || isValidateTsFileFailed) { |
| tsFileManager.remove(tsFileProcessor.getTsFileResource(), tsFileProcessor.isSequence()); |
| } |
| |
| // closingSequenceTsFileProcessor is a thread safety class. |
| |
| synchronized (closeStorageGroupCondition) { |
| if (closingSequenceTsFileProcessor.contains(tsFileProcessor)) { |
| closingSequenceTsFileProcessor.remove(tsFileProcessor); |
| } else { |
| closingUnSequenceTsFileProcessor.remove(tsFileProcessor); |
| } |
| closeStorageGroupCondition.notifyAll(); |
| } |
| if (!isValidateTsFileFailed) { |
| TsFileResource tsFileResource = tsFileProcessor.getTsFileResource(); |
| FileMetrics.getInstance() |
| .addTsFile( |
| tsFileResource.getDatabaseName(), |
| tsFileResource.getDataRegionId(), |
| tsFileResource.getTsFileSize(), |
| tsFileProcessor.isSequence(), |
| tsFileResource.getTsFile().getName()); |
| } |
| } |
| |
| public int executeCompaction() throws InterruptedException { |
| if (!isCompactionSelecting.compareAndSet(false, true)) { |
| return 0; |
| } |
| try { |
| int trySubmitCount = 0; |
| List<Long> timePartitions = new ArrayList<>(tsFileManager.getTimePartitions()); |
| // Sort the time partition from largest to smallest |
| timePartitions.sort(Comparator.reverseOrder()); |
| |
| CompactionScheduleSummary summary = new CompactionScheduleSummary(); |
| if (IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction()) { |
| trySubmitCount += executeInsertionCompaction(timePartitions); |
| summary.incrementSubmitTaskNum(CompactionTaskType.INSERTION, trySubmitCount); |
| } |
| if (summary.getSubmitInsertionCrossSpaceCompactionTaskNum() == 0) { |
| // The name of this variable is trySubmitCount, because the task submitted to the queue |
| // could |
| // be evicted due to the low priority of the task |
| for (long timePartition : timePartitions) { |
| CompactionScheduler.sharedLockCompactionSelection(); |
| try { |
| trySubmitCount += |
| CompactionScheduler.scheduleCompaction(tsFileManager, timePartition, summary); |
| } catch (InterruptedException e) { |
| throw e; |
| } catch (Throwable e) { |
| logger.error("Meet error in compaction schedule.", e); |
| } finally { |
| CompactionScheduler.sharedUnlockCompactionSelection(); |
| } |
| } |
| } |
| if (summary.hasSubmitTask()) { |
| CompactionMetrics.getInstance().updateCompactionTaskSelectionNum(summary); |
| } |
| return trySubmitCount; |
| } finally { |
| isCompactionSelecting.set(false); |
| } |
| } |
| |
| protected int executeInsertionCompaction(List<Long> timePartitions) throws InterruptedException { |
| int trySubmitCount = 0; |
| CompactionScheduler.sharedLockCompactionSelection(); |
| try { |
| while (true) { |
| int currentSubmitCount = 0; |
| Phaser insertionTaskPhaser = new Phaser(1); |
| for (long timePartition : timePartitions) { |
| currentSubmitCount += |
| CompactionScheduler.scheduleInsertionCompaction( |
| tsFileManager, timePartition, insertionTaskPhaser); |
| } |
| trySubmitCount += currentSubmitCount; |
| insertionTaskPhaser.awaitAdvanceInterruptibly(insertionTaskPhaser.arrive()); |
| if (currentSubmitCount != 0) { |
| continue; |
| } |
| break; |
| } |
| } catch (InterruptedException e) { |
| throw e; |
| } catch (Throwable e) { |
| logger.error("Meet error in compaction schedule.", e); |
| } finally { |
| CompactionScheduler.sharedUnlockCompactionSelection(); |
| } |
| return trySubmitCount; |
| } |
| |
| /** |
| * After finishing settling tsfile, we need to do 2 things : (1) move the new tsfile to the |
| * correct folder, including deleting its old mods file (2) update the relevant data of this old |
| * tsFile in memory ,eg: TsFileSequenceReader, {@link #tsFileManager}, cache, etc. |
| */ |
| private void settleTsFileCallBack( |
| TsFileResource oldTsFileResource, List<TsFileResource> newTsFileResources) |
| throws WriteProcessException { |
| oldTsFileResource.readUnlock(); |
| oldTsFileResource.writeLock(); |
| try { |
| TsFileAndModSettleTool.moveNewTsFile(oldTsFileResource, newTsFileResources); |
| if (TsFileAndModSettleTool.getInstance().recoverSettleFileMap.size() != 0) { |
| TsFileAndModSettleTool.getInstance() |
| .recoverSettleFileMap |
| .remove(oldTsFileResource.getTsFile().getAbsolutePath()); |
| } |
| // clear Cache , including chunk cache, timeseriesMetadata cache and bloom filter cache |
| operateClearCache(); |
| |
| // if old tsfile is being deleted in the process due to its all data's being deleted. |
| if (!oldTsFileResource.getTsFile().exists()) { |
| tsFileManager.remove(oldTsFileResource, oldTsFileResource.isSeq()); |
| } |
| FileReaderManager.getInstance().closeFileAndRemoveReader(oldTsFileResource.getTsFilePath()); |
| oldTsFileResource.setSettleTsFileCallBack(null); |
| SettleService.getINSTANCE().getFilesToBeSettledCount().addAndGet(-1); |
| } catch (IOException e) { |
| logger.error("Exception to move new tsfile in settling", e); |
| throw new WriteProcessException( |
| "Meet error when settling file: " + oldTsFileResource.getTsFile().getAbsolutePath(), e); |
| } finally { |
| oldTsFileResource.writeUnlock(); |
| } |
| } |
| |
| public static void operateClearCache() { |
| ChunkCache.getInstance().clear(); |
| TimeSeriesMetadataCache.getInstance().clear(); |
| BloomFilterCache.getInstance().clear(); |
| } |
| |
| public static Optional<String> getNonSystemDatabaseName(String databaseName) { |
| if (databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) { |
| return Optional.empty(); |
| } |
| int lastIndex = databaseName.lastIndexOf("-"); |
| if (lastIndex == -1) { |
| lastIndex = databaseName.length(); |
| } |
| return Optional.of(databaseName.substring(0, lastIndex)); |
| } |
| |
| public Optional<String> getNonSystemDatabaseName() { |
| return getNonSystemDatabaseName(databaseName); |
| } |
| |
| /** Merge file under this database processor */ |
| public int compact() { |
| writeLock("merge"); |
| CompactionScheduler.exclusiveLockCompactionSelection(); |
| try { |
| return executeCompaction(); |
| } catch (InterruptedException ignored) { |
| return 0; |
| } finally { |
| CompactionScheduler.exclusiveUnlockCompactionSelection(); |
| writeUnlock(); |
| } |
| } |
| |
| /** |
| * Load a new tsfile to unsequence dir. |
| * |
| * <p>Then, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice. |
| * |
| * @param newTsFileResource tsfile resource @UsedBy load external tsfile module |
| * @param deleteOriginFile whether to delete origin tsfile |
| * @param isGeneratedByPipe whether the load tsfile request is generated by pipe |
| */ |
| public void loadNewTsFile( |
| TsFileResource newTsFileResource, boolean deleteOriginFile, boolean isGeneratedByPipe) |
| throws LoadFileException { |
| File tsfileToBeInserted = newTsFileResource.getTsFile(); |
| long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck(); |
| |
| if (!TsFileValidator.getInstance().validateTsFile(newTsFileResource)) { |
| throw new LoadFileException( |
| "tsfile validate failed, " + newTsFileResource.getTsFile().getName()); |
| } |
| |
| writeLock("loadNewTsFile"); |
| try { |
| newTsFileResource.setSeq(false); |
| String newFileName = |
| getNewTsFileName( |
| System.currentTimeMillis(), |
| getAndSetNewVersion(newFilePartitionId, newTsFileResource), |
| 0, |
| 0); |
| |
| if (!newFileName.equals(tsfileToBeInserted.getName())) { |
| logger.info( |
| "TsFile {} must be renamed to {} for loading into the unsequence list.", |
| tsfileToBeInserted.getName(), |
| newFileName); |
| newTsFileResource.setFile( |
| fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName)); |
| } |
| loadTsFileToUnSequence( |
| tsfileToBeInserted, newTsFileResource, newFilePartitionId, deleteOriginFile); |
| |
| PipeInsertionDataNodeListener.getInstance() |
| .listenToTsFile(dataRegionId, newTsFileResource, true, isGeneratedByPipe); |
| |
| FileMetrics.getInstance() |
| .addTsFile( |
| newTsFileResource.getDatabaseName(), |
| newTsFileResource.getDataRegionId(), |
| newTsFileResource.getTsFile().length(), |
| false, |
| newTsFileResource.getTsFile().getName()); |
| |
| if (config.isEnableSeparateData()) { |
| final DataRegionId dataRegionId = new DataRegionId(Integer.parseInt(this.dataRegionId)); |
| final long timePartitionId = newTsFileResource.getTimePartition(); |
| if (!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) { |
| TimePartitionManager.getInstance() |
| .registerTimePartitionInfo( |
| new TimePartitionInfo( |
| dataRegionId, |
| timePartitionId, |
| false, |
| Long.MAX_VALUE, |
| lastFlushTimeMap.getMemSize(timePartitionId))); |
| } |
| updateLastFlushTime(newTsFileResource); |
| TimePartitionManager.getInstance() |
| .updateAfterFlushing( |
| dataRegionId, |
| timePartitionId, |
| System.currentTimeMillis(), |
| lastFlushTimeMap.getMemSize(timePartitionId), |
| false); |
| } |
| |
| logger.info("TsFile {} is successfully loaded in unsequence list.", newFileName); |
| } catch (DiskSpaceInsufficientException e) { |
| logger.error( |
| "Failed to append the tsfile {} to database processor {} because the disk space is insufficient.", |
| tsfileToBeInserted.getAbsolutePath(), |
| tsfileToBeInserted.getParentFile().getName()); |
| throw new LoadFileException(e); |
| } finally { |
| writeUnlock(); |
| DataNodeSchemaCache.getInstance().invalidateLastCacheInDataRegion(databaseName); |
| } |
| } |
| |
| /** |
| * Set the version in "partition" to "version" if "version" is larger than the current version. |
| */ |
| public void setPartitionFileVersionToMax(long partition, long version) { |
| partitionMaxFileVersions.compute( |
| partition, (prt, oldVer) -> computeMaxVersion(oldVer, version)); |
| } |
| |
| private long computeMaxVersion(Long oldVersion, Long newVersion) { |
| if (oldVersion == null) { |
| return newVersion; |
| } |
| return Math.max(oldVersion, newVersion); |
| } |
| |
| /** |
| * If the historical versions of a file is a sub-set of the given file's, (close and) remove it to |
| * reduce unnecessary merge. Only used when the file sender and the receiver share the same file |
| * close policy. Warning: DO NOT REMOVE |
| */ |
| @SuppressWarnings("unused") |
| public void removeFullyOverlapFiles(TsFileResource resource) { |
| writeLock("removeFullyOverlapFiles"); |
| try { |
| Iterator<TsFileResource> iterator = tsFileManager.getIterator(true); |
| removeFullyOverlapFiles(resource, iterator, true); |
| |
| iterator = tsFileManager.getIterator(false); |
| removeFullyOverlapFiles(resource, iterator, false); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| private void removeFullyOverlapFiles( |
| TsFileResource newTsFile, Iterator<TsFileResource> iterator, boolean isSeq) { |
| while (iterator.hasNext()) { |
| TsFileResource existingTsFile = iterator.next(); |
| if (newTsFile.isPlanRangeCovers(existingTsFile) |
| && !newTsFile.getTsFile().equals(existingTsFile.getTsFile()) |
| && existingTsFile.tryWriteLock()) { |
| logger.info( |
| "{} is covered by {}: [{}, {}], [{}, {}], remove it", |
| existingTsFile, |
| newTsFile, |
| existingTsFile.minPlanIndex, |
| existingTsFile.maxPlanIndex, |
| newTsFile.minPlanIndex, |
| newTsFile.maxPlanIndex); |
| // if we fail to lock the file, it means it is being queried or merged and we will not |
| // wait until it is free, we will just leave it to the next merge |
| try { |
| removeFullyOverlapFile(existingTsFile, iterator, isSeq); |
| } catch (Exception e) { |
| logger.error( |
| "Something gets wrong while removing FullyOverlapFiles: {}", |
| existingTsFile.getTsFile().getAbsolutePath(), |
| e); |
| } finally { |
| existingTsFile.writeUnlock(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Remove the given {@link TsFileResource}. If the corresponding {@link TsFileProcessor} is in the |
| * working status, close it before remove the related resource files. maybe time-consuming for |
| * closing a tsfile. |
| */ |
| private void removeFullyOverlapFile( |
| TsFileResource tsFileResource, Iterator<TsFileResource> iterator, boolean isSeq) { |
| logger.info( |
| "Removing a covered file {}, closed: {}", tsFileResource, tsFileResource.isClosed()); |
| if (!tsFileResource.isClosed()) { |
| try { |
| // also remove the TsFileProcessor if the overlapped file is not closed |
| long timePartition = tsFileResource.getTimePartition(); |
| Map<Long, TsFileProcessor> fileProcessorMap = |
| isSeq ? workSequenceTsFileProcessors : workUnsequenceTsFileProcessors; |
| TsFileProcessor tsFileProcessor = fileProcessorMap.get(timePartition); |
| if (tsFileProcessor != null && tsFileProcessor.getTsFileResource() == tsFileResource) { |
| // have to take some time to close the tsFileProcessor |
| tsFileProcessor.syncClose(); |
| fileProcessorMap.remove(timePartition); |
| } |
| } catch (Exception e) { |
| logger.error("Cannot close {}", tsFileResource, e); |
| } |
| } |
| tsFileManager.remove(tsFileResource, isSeq); |
| iterator.remove(); |
| tsFileResource.remove(); |
| } |
| |
| private long getAndSetNewVersion(long timePartitionId, TsFileResource tsFileResource) { |
| long version = |
| partitionMaxFileVersions.compute( |
| timePartitionId, (key, oldVersion) -> (oldVersion == null ? 1 : oldVersion + 1)); |
| tsFileResource.setVersion(version); |
| return version; |
| } |
| |
| /** |
| * Update latest time in latestTimeForEachDevice and |
| * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load external tsfile module. |
| */ |
| protected void updateLastFlushTime(TsFileResource newTsFileResource) { |
| for (IDeviceID device : newTsFileResource.getDevices()) { |
| long endTime = newTsFileResource.getEndTime(device); |
| long timePartitionId = TimePartitionUtils.getTimePartitionId(endTime); |
| if (config.isEnableSeparateData()) { |
| lastFlushTimeMap.updateOneDeviceFlushedTime(timePartitionId, device, endTime); |
| } |
| if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) { |
| lastFlushTimeMap.updateOneDeviceGlobalFlushedTime(device, endTime); |
| } |
| } |
| } |
| |
| /** |
| * Execute the loading process by the type. |
| * |
| * @param tsFileResource tsfile resource to be loaded |
| * @param filePartitionId the partition id of the new file |
| * @param deleteOriginFile whether to delete the original file |
| * @return load the file successfully @UsedBy sync module, load external tsfile module. |
| */ |
| private boolean loadTsFileToUnSequence( |
| File tsFileToLoad, |
| TsFileResource tsFileResource, |
| long filePartitionId, |
| boolean deleteOriginFile) |
| throws LoadFileException, DiskSpaceInsufficientException { |
| File targetFile; |
| targetFile = |
| fsFactory.getFile( |
| TierManager.getInstance().getNextFolderForTsFile(0, false), |
| databaseName |
| + File.separatorChar |
| + dataRegionId |
| + File.separatorChar |
| + filePartitionId |
| + File.separator |
| + tsFileResource.getTsFile().getName()); |
| tsFileResource.setFile(targetFile); |
| if (tsFileManager.contains(tsFileResource, false)) { |
| logger.error("The file {} has already been loaded in unsequence list", tsFileResource); |
| return false; |
| } |
| |
| logger.info( |
| "Load tsfile in unsequence list, move file from {} to {}", |
| tsFileToLoad.getAbsolutePath(), |
| targetFile.getAbsolutePath()); |
| |
| // move file from sync dir to data dir |
| if (!targetFile.getParentFile().exists()) { |
| targetFile.getParentFile().mkdirs(); |
| } |
| try { |
| if (deleteOriginFile) { |
| FileUtils.moveFile(tsFileToLoad, targetFile); |
| } else { |
| Files.copy(tsFileToLoad.toPath(), targetFile.toPath()); |
| } |
| } catch (IOException e) { |
| logger.error( |
| "File renaming failed when loading tsfile. Origin: {}, Target: {}", |
| tsFileToLoad.getAbsolutePath(), |
| targetFile.getAbsolutePath(), |
| e); |
| throw new LoadFileException( |
| String.format( |
| "File renaming failed when loading tsfile. Origin: %s, Target: %s, because %s", |
| tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage())); |
| } |
| |
| File resourceFileToLoad = fsFactory.getFile(tsFileToLoad.getAbsolutePath() + RESOURCE_SUFFIX); |
| File targetResourceFile = fsFactory.getFile(targetFile.getAbsolutePath() + RESOURCE_SUFFIX); |
| try { |
| if (deleteOriginFile) { |
| FileUtils.moveFile(resourceFileToLoad, targetResourceFile); |
| } else { |
| Files.copy(resourceFileToLoad.toPath(), targetResourceFile.toPath()); |
| } |
| |
| } catch (IOException e) { |
| logger.error( |
| "File renaming failed when loading .resource file. Origin: {}, Target: {}", |
| resourceFileToLoad.getAbsolutePath(), |
| targetResourceFile.getAbsolutePath(), |
| e); |
| throw new LoadFileException( |
| String.format( |
| "File renaming failed when loading .resource file. Origin: %s, Target: %s, because %s", |
| resourceFileToLoad.getAbsolutePath(), |
| targetResourceFile.getAbsolutePath(), |
| e.getMessage())); |
| } |
| |
| File modFileToLoad = |
| fsFactory.getFile(tsFileToLoad.getAbsolutePath() + ModificationFile.FILE_SUFFIX); |
| if (modFileToLoad.exists()) { |
| // when successfully loaded, the filepath of the resource will be changed to the IoTDB data |
| // dir, so we can add a suffix to find the old modification file. |
| File targetModFile = |
| fsFactory.getFile(targetFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX); |
| try { |
| Files.deleteIfExists(targetModFile.toPath()); |
| } catch (IOException e) { |
| logger.warn("Cannot delete localModFile {}", targetModFile, e); |
| } |
| try { |
| if (deleteOriginFile) { |
| FileUtils.moveFile(modFileToLoad, targetModFile); |
| } else { |
| Files.copy(modFileToLoad.toPath(), targetModFile.toPath()); |
| } |
| } catch (IOException e) { |
| logger.error( |
| "File renaming failed when loading .mod file. Origin: {}, Target: {}", |
| modFileToLoad.getAbsolutePath(), |
| targetModFile.getAbsolutePath(), |
| e); |
| throw new LoadFileException( |
| String.format( |
| "File renaming failed when loading .mod file. Origin: %s, Target: %s, because %s", |
| modFileToLoad.getAbsolutePath(), targetModFile.getAbsolutePath(), e.getMessage())); |
| } finally { |
| // ModFile will be updated during the next call to `getModFile` |
| tsFileResource.setModFile(null); |
| } |
| } |
| |
| // help tsfile resource degrade |
| TsFileResourceManager.getInstance().registerSealedTsFileResource(tsFileResource); |
| |
| tsFileManager.add(tsFileResource, false); |
| |
| return true; |
| } |
| |
| /** |
| * Get all working sequence tsfile processors |
| * |
| * @return all working sequence tsfile processors |
| */ |
| public Collection<TsFileProcessor> getWorkSequenceTsFileProcessors() { |
| return workSequenceTsFileProcessors.values(); |
| } |
| |
| public boolean removeTsFile(File fileToBeRemoved) { |
| TsFileResource tsFileResourceToBeRemoved = unloadTsFileInside(fileToBeRemoved); |
| if (tsFileResourceToBeRemoved == null) { |
| return false; |
| } |
| tsFileResourceToBeRemoved.writeLock(); |
| try { |
| tsFileResourceToBeRemoved.remove(); |
| logger.info("Remove tsfile {} successfully.", tsFileResourceToBeRemoved.getTsFile()); |
| } finally { |
| tsFileResourceToBeRemoved.writeUnlock(); |
| } |
| return true; |
| } |
| |
| /** |
| * Unload tsfile and move it to the target directory if it exists. |
| * |
| * <p>Firstly, unload the TsFileResource from sequenceFileList/unSequenceFileList. |
| * |
| * <p>Secondly, move the tsfile and .resource file to the target directory. |
| * |
| * @param fileToBeUnloaded tsfile to be unloaded |
| * @return whether the file to be unloaded exists. @UsedBy load external tsfile module. |
| */ |
| public boolean unloadTsfile(File fileToBeUnloaded, File targetDir) throws IOException { |
| TsFileResource tsFileResourceToBeMoved = unloadTsFileInside(fileToBeUnloaded); |
| if (tsFileResourceToBeMoved == null) { |
| return false; |
| } |
| tsFileResourceToBeMoved.writeLock(); |
| try { |
| tsFileResourceToBeMoved.moveTo(targetDir); |
| logger.info( |
| "Move tsfile {} to target dir {} successfully.", |
| tsFileResourceToBeMoved.getTsFile(), |
| targetDir.getPath()); |
| } finally { |
| tsFileResourceToBeMoved.writeUnlock(); |
| } |
| return true; |
| } |
| |
| private TsFileResource unloadTsFileInside(File fileToBeUnloaded) { |
| writeLock("unloadTsFileInside"); |
| TsFileResource unloadedTsFileResource = null; |
| try { |
| Iterator<TsFileResource> sequenceIterator = tsFileManager.getIterator(true); |
| while (sequenceIterator.hasNext()) { |
| TsFileResource sequenceResource = sequenceIterator.next(); |
| if (sequenceResource.getTsFile().getName().equals(fileToBeUnloaded.getName())) { |
| unloadedTsFileResource = sequenceResource; |
| tsFileManager.remove(unloadedTsFileResource, true); |
| FileMetrics.getInstance() |
| .deleteTsFile(true, Collections.singletonList(unloadedTsFileResource)); |
| break; |
| } |
| } |
| if (unloadedTsFileResource == null) { |
| Iterator<TsFileResource> unsequenceIterator = tsFileManager.getIterator(false); |
| while (unsequenceIterator.hasNext()) { |
| TsFileResource unsequenceResource = unsequenceIterator.next(); |
| if (unsequenceResource.getTsFile().getName().equals(fileToBeUnloaded.getName())) { |
| unloadedTsFileResource = unsequenceResource; |
| tsFileManager.remove(unloadedTsFileResource, false); |
| FileMetrics.getInstance() |
| .deleteTsFile(false, Collections.singletonList(unloadedTsFileResource)); |
| break; |
| } |
| } |
| } |
| } finally { |
| writeUnlock(); |
| } |
| return unloadedTsFileResource; |
| } |
| |
| /** |
| * Get all working unsequence tsfile processors |
| * |
| * @return all working unsequence tsfile processors |
| */ |
| public Collection<TsFileProcessor> getWorkUnsequenceTsFileProcessors() { |
| return workUnsequenceTsFileProcessors.values(); |
| } |
| |
| public void setDataTTLWithTimePrecisionCheck(long dataTTL) { |
| if (dataTTL != Long.MAX_VALUE) { |
| dataTTL = |
| CommonDateTimeUtils.convertMilliTimeWithPrecision( |
| dataTTL, CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); |
| } |
| this.dataTTL = dataTTL; |
| } |
| |
| public void setDataTTL(long dataTTL) { |
| this.dataTTL = dataTTL; |
| } |
| |
| public List<TsFileResource> getSequenceFileList() { |
| return tsFileManager.getTsFileList(true); |
| } |
| |
| public List<TsFileResource> getUnSequenceFileList() { |
| return tsFileManager.getTsFileList(false); |
| } |
| |
| public String getDataRegionId() { |
| return dataRegionId; |
| } |
| |
| /** |
| * Get the storageGroupPath with dataRegionId. |
| * |
| * @return data region path, like root.sg1/0 |
| */ |
| public String getStorageGroupPath() { |
| return databaseName + File.separator + dataRegionId; |
| } |
| |
| /** |
| * Check if the data of "tsFileResource" all exist locally by comparing planIndexes in the |
| * partition of "partitionNumber". This is available only when the IoTDB instances which generated |
| * "tsFileResource" have the same plan indexes as the local one. |
| * |
| * @return true if any file contains plans with indexes no less than the max plan index of |
| * "tsFileResource", otherwise false. |
| */ |
| public boolean isFileAlreadyExist(TsFileResource tsFileResource, long partitionNum) { |
| // examine working processor first as they have the largest plan index |
| return isFileAlreadyExistInWorking( |
| tsFileResource, partitionNum, getWorkSequenceTsFileProcessors()) |
| || isFileAlreadyExistInWorking( |
| tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors()) |
| || isFileAlreadyExistInClosed(tsFileResource, partitionNum, getSequenceFileList()) |
| || isFileAlreadyExistInClosed(tsFileResource, partitionNum, getUnSequenceFileList()); |
| } |
| |
| private boolean isFileAlreadyExistInClosed( |
| TsFileResource tsFileResource, long partitionNum, Collection<TsFileResource> existingFiles) { |
| for (TsFileResource resource : existingFiles) { |
| if (resource.getTimePartition() == partitionNum |
| && resource.getMaxPlanIndex() > tsFileResource.getMaxPlanIndex()) { |
| logger.info( |
| "{} is covered by a closed file {}: [{}, {}] [{}, {}]", |
| tsFileResource, |
| resource, |
| tsFileResource.minPlanIndex, |
| tsFileResource.maxPlanIndex, |
| resource.minPlanIndex, |
| resource.maxPlanIndex); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private boolean isFileAlreadyExistInWorking( |
| TsFileResource tsFileResource, |
| long partitionNum, |
| Collection<TsFileProcessor> workingProcessors) { |
| for (TsFileProcessor workingProcesssor : workingProcessors) { |
| if (workingProcesssor.getTimeRangeId() == partitionNum) { |
| TsFileResource workResource = workingProcesssor.getTsFileResource(); |
| boolean isCovered = workResource.getMaxPlanIndex() > tsFileResource.getMaxPlanIndex(); |
| if (isCovered) { |
| logger.info( |
| "{} is covered by a working file {}: [{}, {}] [{}, {}]", |
| tsFileResource, |
| workResource, |
| tsFileResource.minPlanIndex, |
| tsFileResource.maxPlanIndex, |
| workResource.minPlanIndex, |
| workResource.maxPlanIndex); |
| } |
| return isCovered; |
| } |
| } |
| return false; |
| } |
| |
| public void abortCompaction() { |
| tsFileManager.setAllowCompaction(false); |
| CompactionScheduleTaskManager.getInstance().unregisterDataRegion(this); |
| List<AbstractCompactionTask> runningTasks = |
| CompactionTaskManager.getInstance().abortCompaction(databaseName + "-" + dataRegionId); |
| while (CompactionTaskManager.getInstance().isAnyTaskInListStillRunning(runningTasks)) { |
| try { |
| TimeUnit.MILLISECONDS.sleep(10); |
| } catch (InterruptedException e) { |
| logger.error("Thread get interrupted when waiting compaction to finish", e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| isCompactionSelecting.set(false); |
| } |
| |
| public TsFileManager getTsFileResourceManager() { |
| return tsFileManager; |
| } |
| |
| /** |
| * Insert batch of rows belongs to one device |
| * |
| * @param insertRowsOfOneDeviceNode batch of rows belongs to one device |
| */ |
| public void insert(InsertRowsOfOneDeviceNode insertRowsOfOneDeviceNode) |
| throws WriteProcessException, BatchProcessException { |
| StorageEngine.blockInsertionIfReject(null); |
| long startTime = System.nanoTime(); |
| writeLock("InsertRowsOfOneDevice"); |
| PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime); |
| try { |
| if (deleted) { |
| return; |
| } |
| Map<TsFileProcessor, Boolean> tsFileProcessorMapForFlushing = new HashMap<>(); |
| for (int i = 0; i < insertRowsOfOneDeviceNode.getInsertRowNodeList().size(); i++) { |
| InsertRowNode insertRowNode = insertRowsOfOneDeviceNode.getInsertRowNodeList().get(i); |
| if (!isAlive(insertRowNode.getTime())) { |
| // we do not need to write these part of data, as they can not be queried |
| // or the sub-plan has already been executed, we are retrying other sub-plans |
| insertRowsOfOneDeviceNode |
| .getResults() |
| .put( |
| i, |
| RpcUtils.getStatus( |
| TSStatusCode.OUT_OF_TTL.getStatusCode(), |
| String.format( |
| "Insertion time [%s] is less than ttl time bound [%s]", |
| DateTimeUtils.convertLongToDate(insertRowNode.getTime()), |
| DateTimeUtils.convertLongToDate( |
| CommonDateTimeUtils.currentTime() - dataTTL)))); |
| continue; |
| } |
| // init map |
| long timePartitionId = TimePartitionUtils.getTimePartitionId(insertRowNode.getTime()); |
| |
| if (config.isEnableSeparateData() |
| && !lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId)) { |
| TimePartitionManager.getInstance() |
| .registerTimePartitionInfo( |
| new TimePartitionInfo( |
| new DataRegionId(Integer.valueOf(dataRegionId)), |
| timePartitionId, |
| true, |
| Long.MAX_VALUE, |
| 0)); |
| } |
| |
| boolean isSequence = |
| config.isEnableSeparateData() |
| && insertRowNode.getTime() |
| > lastFlushTimeMap.getFlushedTime(timePartitionId, insertRowNode.getDeviceID()); |
| |
| // insert to sequence or unSequence file |
| try { |
| insertToTsFileProcessor( |
| insertRowNode, isSequence, timePartitionId, tsFileProcessorMapForFlushing); |
| } catch (WriteProcessException e) { |
| insertRowsOfOneDeviceNode |
| .getResults() |
| .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); |
| } |
| } |
| // check memtable size and may asyncTryToFlush the work memtable |
| for (Map.Entry<TsFileProcessor, Boolean> entry : tsFileProcessorMapForFlushing.entrySet()) { |
| if (entry.getKey().shouldFlush()) { |
| fileFlushPolicy.apply(this, entry.getKey(), entry.getValue()); |
| } |
| } |
| } finally { |
| writeUnlock(); |
| } |
| if (!insertRowsOfOneDeviceNode.getResults().isEmpty()) { |
| throw new BatchProcessException("Partial failed inserting rows of one device"); |
| } |
| } |
| |
| public void insert(InsertRowsNode insertRowsNode) |
| throws BatchProcessException, WriteProcessRejectException { |
| StorageEngine.blockInsertionIfReject(null); |
| long startTime = System.nanoTime(); |
| writeLock("InsertRows"); |
| PERFORMANCE_OVERVIEW_METRICS.recordScheduleLockCost(System.nanoTime() - startTime); |
| try { |
| if (deleted) { |
| return; |
| } |
| boolean[] areSequence = new boolean[insertRowsNode.getInsertRowNodeList().size()]; |
| long[] timePartitionIds = new long[insertRowsNode.getInsertRowNodeList().size()]; |
| for (int i = 0; i < insertRowsNode.getInsertRowNodeList().size(); i++) { |
| InsertRowNode insertRowNode = insertRowsNode.getInsertRowNodeList().get(i); |
| if (!isAlive(insertRowNode.getTime())) { |
| insertRowsNode |
| .getResults() |
| .put( |
| i, |
| RpcUtils.getStatus( |
| TSStatusCode.OUT_OF_TTL.getStatusCode(), |
| String.format( |
| "Insertion time [%s] is less than ttl time bound [%s]", |
| DateTimeUtils.convertLongToDate(insertRowNode.getTime()), |
| DateTimeUtils.convertLongToDate( |
| CommonDateTimeUtils.currentTime() - dataTTL)))); |
| continue; |
| } |
| // init map |
| timePartitionIds[i] = TimePartitionUtils.getTimePartitionId(insertRowNode.getTime()); |
| |
| if (config.isEnableSeparateData() |
| && !lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionIds[i])) { |
| TimePartitionManager.getInstance() |
| .registerTimePartitionInfo( |
| new TimePartitionInfo( |
| new DataRegionId(Integer.parseInt(dataRegionId)), |
| timePartitionIds[i], |
| true, |
| Long.MAX_VALUE, |
| 0)); |
| } |
| areSequence[i] = |
| config.isEnableSeparateData() |
| && insertRowNode.getTime() |
| > lastFlushTimeMap.getFlushedTime( |
| timePartitionIds[i], insertRowNode.getDeviceID()); |
| } |
| insertToTsFileProcessors(insertRowsNode, areSequence, timePartitionIds); |
| if (!insertRowsNode.getResults().isEmpty()) { |
| throw new BatchProcessException("Partial failed inserting rows"); |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /** |
| * Insert batch of tablets belongs to multiple devices |
| * |
| * @param insertMultiTabletsNode batch of tablets belongs to multiple devices |
| */ |
| public void insertTablets(InsertMultiTabletsNode insertMultiTabletsNode) |
| throws BatchProcessException { |
| for (int i = 0; i < insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) { |
| InsertTabletNode insertTabletNode = insertMultiTabletsNode.getInsertTabletNodeList().get(i); |
| try { |
| insertTablet(insertTabletNode); |
| } catch (WriteProcessException e) { |
| insertMultiTabletsNode |
| .getResults() |
| .put(i, RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); |
| } catch (BatchProcessException e) { |
| // for each error |
| TSStatus firstStatus = null; |
| for (TSStatus status : e.getFailingStatus()) { |
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| firstStatus = status; |
| } |
| // return WRITE_PROCESS_REJECT directly for the consensus retry logic |
| if (status.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) { |
| insertMultiTabletsNode.getResults().put(i, status); |
| throw new BatchProcessException("Rejected inserting multi tablets"); |
| } |
| } |
| insertMultiTabletsNode.getResults().put(i, firstStatus); |
| } |
| } |
| |
| if (!insertMultiTabletsNode.getResults().isEmpty()) { |
| throw new BatchProcessException("Partial failed inserting multi tablets"); |
| } |
| } |
| |
| /** @return the disk space occupied by this data region, unit is MB */ |
| public long countRegionDiskSize() { |
| AtomicLong diskSize = new AtomicLong(0); |
| TierManager.getInstance() |
| .getAllLocalFilesFolders() |
| .forEach( |
| folder -> { |
| folder = folder + File.separator + databaseName + File.separator + dataRegionId; |
| countFolderDiskSize(folder, diskSize); |
| }); |
| return diskSize.get() / 1024 / 1024; |
| } |
| |
| /** |
| * @param folder the folder's path |
| * @param diskSize the disk space occupied by this folder, unit is MB |
| */ |
| private void countFolderDiskSize(String folder, AtomicLong diskSize) { |
| File file = FSFactoryProducer.getFSFactory().getFile(folder); |
| File[] allFile = file.listFiles(); |
| if (allFile == null) { |
| return; |
| } |
| for (File f : allFile) { |
| if (f.isFile()) { |
| diskSize.addAndGet(f.length()); |
| } else if (f.isDirectory()) { |
| countFolderDiskSize(f.getAbsolutePath(), diskSize); |
| } |
| } |
| } |
| |
| public void addSettleFilesToList( |
| List<TsFileResource> seqResourcesToBeSettled, |
| List<TsFileResource> unseqResourcesToBeSettled, |
| List<String> tsFilePaths) { |
| if (tsFilePaths.isEmpty()) { |
| for (TsFileResource resource : tsFileManager.getTsFileList(true)) { |
| if (!resource.isClosed()) { |
| continue; |
| } |
| resource.setSettleTsFileCallBack(this::settleTsFileCallBack); |
| seqResourcesToBeSettled.add(resource); |
| } |
| for (TsFileResource resource : tsFileManager.getTsFileList(false)) { |
| if (!resource.isClosed()) { |
| continue; |
| } |
| resource.setSettleTsFileCallBack(this::settleTsFileCallBack); |
| unseqResourcesToBeSettled.add(resource); |
| } |
| } else { |
| for (String tsFilePath : tsFilePaths) { |
| File fileToBeSettled = new File(tsFilePath); |
| if ("sequence" |
| .equals( |
| fileToBeSettled |
| .getParentFile() |
| .getParentFile() |
| .getParentFile() |
| .getParentFile() |
| .getName())) { |
| for (TsFileResource resource : tsFileManager.getTsFileList(true)) { |
| if (resource.getTsFile().getAbsolutePath().equals(tsFilePath)) { |
| resource.setSettleTsFileCallBack(this::settleTsFileCallBack); |
| seqResourcesToBeSettled.add(resource); |
| break; |
| } |
| } |
| } else { |
| for (TsFileResource resource : tsFileManager.getTsFileList(false)) { |
| if (resource.getTsFile().getAbsolutePath().equals(tsFilePath)) { |
| unseqResourcesToBeSettled.add(resource); |
| break; |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| public void setCustomCloseFileListeners(List<CloseFileListener> customCloseFileListeners) { |
| this.customCloseFileListeners = customCloseFileListeners; |
| } |
| |
| public void setCustomFlushListeners(List<FlushListener> customFlushListeners) { |
| this.customFlushListeners = customFlushListeners; |
| } |
| |
| public void setAllowCompaction(boolean allowCompaction) { |
| this.tsFileManager.setAllowCompaction(allowCompaction); |
| } |
| |
| @FunctionalInterface |
| public interface CloseTsFileCallBack { |
| |
| void call(TsFileProcessor caller) throws TsFileProcessorException, IOException; |
| } |
| |
| @FunctionalInterface |
| public interface UpdateEndTimeCallBack { |
| |
| void call(TsFileProcessor caller, Map<IDeviceID, Long> updateMap, long systemFlushTime); |
| } |
| |
| @FunctionalInterface |
| public interface SettleTsFileCallBack { |
| |
| void call(TsFileResource oldTsFileResource, List<TsFileResource> newTsFileResources) |
| throws WriteProcessException; |
| } |
| |
| public List<Long> getTimePartitions() { |
| return new ArrayList<>(partitionMaxFileVersions.keySet()); |
| } |
| |
| public Long getLatestTimePartition() { |
| return getTimePartitions().stream().max(Long::compareTo).orElse(0L); |
| } |
| |
| public String getInsertWriteLockHolder() { |
| return insertWriteLockHolder; |
| } |
| |
| /** This method could only be used in iot consensus */ |
| public IWALNode getWALNode() { |
| if (!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS)) { |
| throw new UnsupportedOperationException(); |
| } |
| // identifier should be same with getTsFileProcessor method |
| return WALManager.getInstance() |
| .applyForWALNode(databaseName + FILE_NAME_SEPARATOR + dataRegionId); |
| } |
| |
| /** Wait for this data region successfully deleted */ |
| public void waitForDeleted() { |
| writeLock("waitForDeleted"); |
| try { |
| if (!deleted) { |
| deletedCondition.await(); |
| } |
| FileMetrics.getInstance().deleteRegion(databaseName, dataRegionId); |
| } catch (InterruptedException e) { |
| logger.error("Interrupted When waiting for data region deleted."); |
| Thread.currentThread().interrupt(); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /** Release all threads waiting for this data region successfully deleted */ |
| public void markDeleted() { |
| writeLock("markDeleted"); |
| try { |
| deleted = true; |
| deletedCondition.signalAll(); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /* Be careful, the thread that calls this method may not hold the write lock!!*/ |
| public void degradeFlushTimeMap(long timePartitionId) { |
| lastFlushTimeMap.degradeLastFlushTime(timePartitionId); |
| } |
| |
| public long getMemCost() { |
| return dataRegionInfo.getMemCost(); |
| } |
| |
| private void renameAndHandleError(String originFileName, String newFileName) { |
| try { |
| File originFile = new File(originFileName); |
| if (originFile.exists()) { |
| Files.move(originFile.toPath(), Paths.get(newFileName)); |
| } |
| } catch (IOException e) { |
| logger.error("Failed to rename {} to {},", originFileName, newFileName, e); |
| } |
| } |
| |
| @Override |
| public long getDataTTL() { |
| return dataTTL; |
| } |
| |
| @TestOnly |
| public ILastFlushTimeMap getLastFlushTimeMap() { |
| return lastFlushTimeMap; |
| } |
| |
| @TestOnly |
| public TsFileManager getTsFileManager() { |
| return tsFileManager; |
| } |
| } |