| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.engine.storagegroup; |
| |
| import org.apache.iotdb.db.conf.IoTDBConfig; |
| import org.apache.iotdb.db.conf.IoTDBConstant; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.conf.directories.DirectoryManager; |
| import org.apache.iotdb.db.engine.StorageEngine; |
| import org.apache.iotdb.db.engine.compaction.CompactionMergeTaskPoolManager; |
| import org.apache.iotdb.db.engine.compaction.TsFileManagement; |
| import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement; |
| import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; |
| import org.apache.iotdb.db.engine.flush.CloseFileListener; |
| import org.apache.iotdb.db.engine.flush.FlushListener; |
| import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy; |
| import org.apache.iotdb.db.engine.merge.manage.MergeManager; |
| import org.apache.iotdb.db.engine.merge.task.RecoverMergeTask; |
| import org.apache.iotdb.db.engine.modification.Deletion; |
| import org.apache.iotdb.db.engine.modification.ModificationFile; |
| import org.apache.iotdb.db.engine.querycontext.QueryDataSource; |
| import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex; |
| import org.apache.iotdb.db.engine.trigger.executor.TriggerEngine; |
| import org.apache.iotdb.db.engine.trigger.executor.TriggerEvent; |
| import org.apache.iotdb.db.engine.upgrade.UpgradeCheckStatus; |
| import org.apache.iotdb.db.engine.upgrade.UpgradeLog; |
| import org.apache.iotdb.db.engine.version.SimpleFileVersionController; |
| import org.apache.iotdb.db.engine.version.VersionController; |
| import org.apache.iotdb.db.exception.BatchProcessException; |
| import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; |
| import org.apache.iotdb.db.exception.LoadFileException; |
| import org.apache.iotdb.db.exception.StorageGroupProcessorException; |
| import org.apache.iotdb.db.exception.TriggerExecutionException; |
| import org.apache.iotdb.db.exception.TsFileProcessorException; |
| import org.apache.iotdb.db.exception.WriteProcessException; |
| import org.apache.iotdb.db.exception.WriteProcessRejectException; |
| import org.apache.iotdb.db.exception.metadata.IllegalPathException; |
| import org.apache.iotdb.db.exception.metadata.MetadataException; |
| import org.apache.iotdb.db.exception.query.OutOfTTLException; |
| import org.apache.iotdb.db.exception.query.QueryProcessException; |
| import org.apache.iotdb.db.metadata.PartialPath; |
| import org.apache.iotdb.db.metadata.mnode.IMNode; |
| import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; |
| import org.apache.iotdb.db.qp.physical.crud.DeletePlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan; |
| import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; |
| import org.apache.iotdb.db.query.context.QueryContext; |
| import org.apache.iotdb.db.query.control.QueryFileManager; |
| import org.apache.iotdb.db.service.IoTDB; |
| import org.apache.iotdb.db.utils.CopyOnReadLinkedList; |
| import org.apache.iotdb.db.utils.MmapUtil; |
| import org.apache.iotdb.db.utils.TestOnly; |
| import org.apache.iotdb.db.utils.UpgradeUtils; |
| import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer; |
| import org.apache.iotdb.rpc.RpcUtils; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| import org.apache.iotdb.service.rpc.thrift.TSStatus; |
| import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; |
| import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; |
| import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; |
| import org.apache.iotdb.tsfile.read.TimeValuePair; |
| import org.apache.iotdb.tsfile.read.filter.basic.Filter; |
| import org.apache.iotdb.tsfile.utils.Pair; |
| import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; |
| import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.MappedByteBuffer; |
| import java.nio.file.Files; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.Deque; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR; |
| import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX; |
| import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX; |
| import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; |
| |
| /** |
| * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one |
| * TsFileProcessor in the working status. <br> |
| * |
| * <p>There are two situations to set the working TsFileProcessor to closing status:<br> |
| * |
| * <p>(1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or |
| * shouldClose())<br> |
| * |
| * <p>(2) someone calls syncCloseAllWorkingTsFileProcessors(). (up to now, only flush command from |
| * cli will call this method)<br> |
| * |
| * <p>UnSequence data has the similar process as above. |
| * |
| * <p>When a sequence TsFileProcessor is submitted to be flushed, the |
| * updateLatestFlushTimeCallback() method will be called as a callback.<br> |
| * |
| * <p>When a TsFileProcessor is closed, the closeUnsealedTsFileProcessorCallBack() method will be |
| * called as a callback. |
| */ |
| public class StorageGroupProcessor { |
| |
| public static final String MERGING_MODIFICATION_FILE_NAME = "merge.mods"; |
| private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); |
| private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG"); |
| |
| /** |
| * All newly generated chunks after merge have version number 0, so we set merged Modification |
| * file version to 1 to take effect |
| */ |
| private static final int MERGE_MOD_START_VERSION_NUM = 1; |
| |
| private static final Logger logger = LoggerFactory.getLogger(StorageGroupProcessor.class); |
| /** indicating the file to be loaded already exists locally. */ |
| private static final int POS_ALREADY_EXIST = -2; |
| /** indicating the file to be loaded overlap with some files. */ |
| private static final int POS_OVERLAP = -3; |
| |
| private final boolean enableMemControl = config.isEnableMemControl(); |
| /** |
| * a read write lock for guaranteeing concurrent safety when accessing all fields in this class |
| * (i.e., schema, (un)sequenceFileList, work(un)SequenceTsFileProcessor, |
| * closing(Un)SequenceTsFileProcessor, latestTimeForEachDevice, and |
| * partitionLatestFlushedTimeForEachDevice) |
| */ |
| private final ReadWriteLock insertLock = new ReentrantReadWriteLock(); |
| /** closeStorageGroupCondition is used to wait for all currently closing TsFiles to be done. */ |
| private final Object closeStorageGroupCondition = new Object(); |
| /** |
| * avoid some tsfileResource is changed (e.g., from unsealed to sealed) when a query is executed. |
| */ |
| private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock(); |
| /** time partition id in the storage group -> tsFileProcessor for this time partition */ |
| private final TreeMap<Long, TsFileProcessor> workSequenceTsFileProcessors = new TreeMap<>(); |
| /** time partition id in the storage group -> tsFileProcessor for this time partition */ |
| private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>(); |
| /** compactionMergeWorking is used to wait for last compaction to be done. */ |
| private volatile boolean compactionMergeWorking = false; |
| // upgrading sequence TsFile resource list |
| private List<TsFileResource> upgradeSeqFileList = new LinkedList<>(); |
| |
| /** sequence tsfile processors which are closing */ |
| private CopyOnReadLinkedList<TsFileProcessor> closingSequenceTsFileProcessor = |
| new CopyOnReadLinkedList<>(); |
| |
| // upgrading unsequence TsFile resource list |
| private List<TsFileResource> upgradeUnseqFileList = new LinkedList<>(); |
| |
| /** unsequence tsfile processors which are closing */ |
| private CopyOnReadLinkedList<TsFileProcessor> closingUnSequenceTsFileProcessor = |
| new CopyOnReadLinkedList<>(); |
| |
| private AtomicInteger upgradeFileCount = new AtomicInteger(); |
| /* |
| * time partition id -> map, which contains |
| * device -> global latest timestamp of each device latestTimeForEachDevice caches non-flushed |
| * changes upon timestamps of each device, and is used to update partitionLatestFlushedTimeForEachDevice |
| * when a flush is issued. |
| */ |
| private Map<Long, Map<String, Long>> latestTimeForEachDevice = new HashMap<>(); |
| /** |
| * time partition id -> map, which contains device -> largest timestamp of the latest memtable to |
| * be submitted to asyncTryToFlush partitionLatestFlushedTimeForEachDevice determines whether a |
| * data point should be put into a sequential file or an unsequential file. Data of some device |
| * with timestamp less than or equals to the device's latestFlushedTime should go into an |
| * unsequential file. |
| */ |
| private Map<Long, Map<String, Long>> partitionLatestFlushedTimeForEachDevice = new HashMap<>(); |
| |
| /** used to record the latest flush time while upgrading and inserting */ |
| private Map<Long, Map<String, Long>> newlyFlushedPartitionLatestFlushedTimeForEachDevice = |
| new HashMap<>(); |
| /** |
| * global mapping of device -> largest timestamp of the latest memtable to * be submitted to |
| * asyncTryToFlush, globalLatestFlushedTimeForEachDevice is utilized to maintain global |
| * latestFlushedTime of devices and will be updated along with |
| * partitionLatestFlushedTimeForEachDevice |
| */ |
| private Map<String, Long> globalLatestFlushedTimeForEachDevice = new HashMap<>(); |
| |
| /** virtual storage group id */ |
| private String virtualStorageGroupId; |
| |
| /** logical storage group name */ |
| private String logicalStorageGroupName; |
| |
| /** storage group system directory */ |
| private File storageGroupSysDir; |
| |
| /** manage seqFileList and unSeqFileList */ |
| private TsFileManagement tsFileManagement; |
| /** |
| * time partition id -> version controller which assigns a version for each MemTable and |
| * deletion/update such that after they are persisted, the order of insertions, deletions and |
| * updates can be re-determined. Will be empty if there are not MemTables in memory. |
| */ |
| private HashMap<Long, VersionController> timePartitionIdVersionControllerMap = new HashMap<>(); |
| /** |
| * when the data in a storage group is older than dataTTL, it is considered invalid and will be |
| * eventually removed. |
| */ |
| private long dataTTL = Long.MAX_VALUE; |
| |
| /** file system factory (local or hdfs) */ |
| private FSFactory fsFactory = FSFactoryProducer.getFSFactory(); |
| |
| /** file flush policy */ |
| private TsFileFlushPolicy fileFlushPolicy; |
| |
| /** |
| * The max file versions in each partition. By recording this, if several IoTDB instances have the |
| * same policy of closing file and their ingestion is identical, then files of the same version in |
| * different IoTDB instance will have identical data, providing convenience for data comparison |
| * across different instances. partition number -> max version number |
| */ |
| private Map<Long, Long> partitionMaxFileVersions = new HashMap<>(); |
| |
| /** storage group info for mem control */ |
| private StorageGroupInfo storageGroupInfo = new StorageGroupInfo(this); |
| /** |
| * Record the device number of the last TsFile in each storage group, which is applied to |
| * initialize the array size of DeviceTimeIndex. It is reasonable to assume that the adjacent |
| * files should have similar numbers of devices. Default value: INIT_ARRAY_SIZE = 64 |
| */ |
| private int deviceNumInLastClosedTsFile = DeviceTimeIndex.INIT_ARRAY_SIZE; |
| |
| /** whether it's ready from recovery */ |
| private boolean isReady = false; |
| |
| /** close file listeners */ |
| private List<CloseFileListener> customCloseFileListeners = Collections.emptyList(); |
| |
| /** flush listeners */ |
| private List<FlushListener> customFlushListeners = Collections.emptyList(); |
| |
| private static final int WAL_BUFFER_SIZE = |
| IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2; |
| |
| private final Deque<ByteBuffer> walByteBufferPool = new LinkedList<>(); |
| |
| private int currentWalPoolSize = 0; |
| |
| // this field is used to avoid when one writer release bytebuffer back to pool, |
| // and the next writer has already arrived, but the check thread get the lock first, it find the |
| // pool |
| // is not empty, so it free the memory. When the next writer get the lock, it will apply the |
| // memory again. |
| // So our free memory strategy is only when the expected size less than the current pool size |
| // and the pool is not empty and the time interval since the pool is not empty is larger than |
| // DEFAULT_POOL_TRIM_INTERVAL_MILLIS |
| private long timeWhenPoolNotEmpty = Long.MAX_VALUE; |
| |
| /** |
| * record the insertWriteLock in SG is being hold by which method, it will be empty string if on |
| * one holds the insertWriteLock |
| */ |
| private String insertWriteLockHolder = ""; |
| |
| /** get the direct byte buffer from pool, each fetch contains two ByteBuffer */ |
| public ByteBuffer[] getWalDirectByteBuffer() { |
| ByteBuffer[] res = new ByteBuffer[2]; |
| synchronized (walByteBufferPool) { |
| long startTime = System.nanoTime(); |
| int MAX_WAL_BYTEBUFFER_NUM = |
| config.getConcurrentWritingTimePartition() |
| * config.getMaxWalBytebufferNumForEachPartition(); |
| while (walByteBufferPool.isEmpty() && currentWalPoolSize + 2 > MAX_WAL_BYTEBUFFER_NUM) { |
| try { |
| walByteBufferPool.wait(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| logger.error( |
| "getDirectByteBuffer occurs error while waiting for DirectByteBuffer" + "group {}-{}", |
| logicalStorageGroupName, |
| virtualStorageGroupId, |
| e); |
| } |
| logger.info( |
| "Waiting {} ms for wal direct byte buffer.", |
| (System.nanoTime() - startTime) / 1_000_000); |
| } |
| // If the queue is not empty, it must have at least two. |
| if (!walByteBufferPool.isEmpty()) { |
| res[0] = walByteBufferPool.pollFirst(); |
| res[1] = walByteBufferPool.pollFirst(); |
| } else { |
| // if the queue is empty and current size is less than MAX_BYTEBUFFER_NUM |
| // we can construct another two more new byte buffer |
| currentWalPoolSize += 2; |
| res[0] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE); |
| res[1] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE); |
| } |
| // if the pool is empty, set the time back to MAX_VALUE |
| if (walByteBufferPool.isEmpty()) { |
| timeWhenPoolNotEmpty = Long.MAX_VALUE; |
| } |
| } |
| return res; |
| } |
| |
| /** put the byteBuffer back to pool */ |
| public void releaseWalBuffer(ByteBuffer[] byteBuffers) { |
| for (ByteBuffer byteBuffer : byteBuffers) { |
| byteBuffer.clear(); |
| } |
| synchronized (walByteBufferPool) { |
| // if the pool is empty before, update the time |
| if (walByteBufferPool.isEmpty()) { |
| timeWhenPoolNotEmpty = System.nanoTime(); |
| } |
| walByteBufferPool.addLast(byteBuffers[0]); |
| walByteBufferPool.addLast(byteBuffers[1]); |
| walByteBufferPool.notifyAll(); |
| } |
| } |
| |
| /** trim the size of the pool and release the memory of needless direct byte buffer */ |
| private void trimTask() { |
| synchronized (walByteBufferPool) { |
| int expectedSize = |
| (workSequenceTsFileProcessors.size() + workUnsequenceTsFileProcessors.size()) * 2; |
| // the unit is ms |
| long poolNotEmptyIntervalInMS = (System.nanoTime() - timeWhenPoolNotEmpty) / 1_000_000; |
| // only when the expected size less than the current pool size |
| // and the pool is not empty and the time interval since the pool is not empty is larger than |
| // 10s |
| // we will trim the size to expectedSize until the pool is empty |
| while (expectedSize < currentWalPoolSize |
| && !walByteBufferPool.isEmpty() |
| && poolNotEmptyIntervalInMS >= config.getWalPoolTrimIntervalInMS()) { |
| MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast()); |
| MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast()); |
| currentWalPoolSize -= 2; |
| } |
| } |
| } |
| |
| /** |
| * constrcut a storage group processor |
| * |
| * @param systemDir system dir path |
| * @param virtualStorageGroupId virtual storage group id e.g. 1 |
| * @param fileFlushPolicy file flush policy |
| * @param logicalStorageGroupName logical storage group name e.g. root.sg1 |
| */ |
| public StorageGroupProcessor( |
| String systemDir, |
| String virtualStorageGroupId, |
| TsFileFlushPolicy fileFlushPolicy, |
| String logicalStorageGroupName) |
| throws StorageGroupProcessorException { |
| this.virtualStorageGroupId = virtualStorageGroupId; |
| this.logicalStorageGroupName = logicalStorageGroupName; |
| this.fileFlushPolicy = fileFlushPolicy; |
| |
| storageGroupSysDir = SystemFileFactory.INSTANCE.getFile(systemDir, virtualStorageGroupId); |
| if (storageGroupSysDir.mkdirs()) { |
| logger.info( |
| "Storage Group system Directory {} doesn't exist, create it", |
| storageGroupSysDir.getPath()); |
| } else if (!storageGroupSysDir.exists()) { |
| logger.error("create Storage Group system Directory {} failed", storageGroupSysDir.getPath()); |
| } |
| this.tsFileManagement = |
| IoTDBDescriptor.getInstance() |
| .getConfig() |
| .getCompactionStrategy() |
| .getTsFileManagement(logicalStorageGroupName, storageGroupSysDir.getAbsolutePath()); |
| |
| ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); |
| executorService.scheduleWithFixedDelay( |
| this::trimTask, |
| config.getWalPoolTrimIntervalInMS(), |
| config.getWalPoolTrimIntervalInMS(), |
| TimeUnit.MILLISECONDS); |
| recover(); |
| } |
| |
| public String getLogicalStorageGroupName() { |
| return logicalStorageGroupName; |
| } |
| |
| public boolean isReady() { |
| return isReady; |
| } |
| |
| public void setReady(boolean ready) { |
| isReady = ready; |
| } |
| |
| private Map<Long, List<TsFileResource>> splitResourcesByPartition( |
| List<TsFileResource> resources) { |
| Map<Long, List<TsFileResource>> ret = new HashMap<>(); |
| for (TsFileResource resource : resources) { |
| ret.computeIfAbsent(resource.getTimePartition(), l -> new ArrayList<>()).add(resource); |
| } |
| return ret; |
| } |
| |
| /** recover from file */ |
| private void recover() throws StorageGroupProcessorException { |
| logger.info( |
| String.format( |
| "start recovering virtual storage group %s[%s]", |
| logicalStorageGroupName, virtualStorageGroupId)); |
| |
| try { |
| // collect candidate TsFiles from sequential and unsequential data directory |
| Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair = |
| getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders()); |
| List<TsFileResource> tmpSeqTsFiles = seqTsFilesPair.left; |
| List<TsFileResource> oldSeqTsFiles = seqTsFilesPair.right; |
| upgradeSeqFileList.addAll(oldSeqTsFiles); |
| Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair = |
| getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders()); |
| List<TsFileResource> tmpUnseqTsFiles = unseqTsFilesPair.left; |
| List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right; |
| upgradeUnseqFileList.addAll(oldUnseqTsFiles); |
| |
| if (upgradeSeqFileList.size() + upgradeUnseqFileList.size() != 0) { |
| upgradeFileCount.set(upgradeSeqFileList.size() + upgradeUnseqFileList.size()); |
| } |
| |
| // split by partition so that we can find the last file of each partition and decide to |
| // close it or not |
| Map<Long, List<TsFileResource>> partitionTmpSeqTsFiles = |
| splitResourcesByPartition(tmpSeqTsFiles); |
| Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles = |
| splitResourcesByPartition(tmpUnseqTsFiles); |
| for (List<TsFileResource> value : partitionTmpSeqTsFiles.values()) { |
| recoverTsFiles(value, true); |
| } |
| for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) { |
| recoverTsFiles(value, false); |
| } |
| |
| String taskName = |
| logicalStorageGroupName + "-" + virtualStorageGroupId + "-" + System.currentTimeMillis(); |
| File mergingMods = |
| SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, MERGING_MODIFICATION_FILE_NAME); |
| if (mergingMods.exists()) { |
| this.tsFileManagement.mergingModification = new ModificationFile(mergingMods.getPath()); |
| } |
| RecoverMergeTask recoverMergeTask = |
| new RecoverMergeTask( |
| new ArrayList<>(tsFileManagement.getTsFileList(true)), |
| tsFileManagement.getTsFileList(false), |
| storageGroupSysDir.getPath(), |
| tsFileManagement::mergeEndAction, |
| taskName, |
| IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), |
| logicalStorageGroupName); |
| logger.info( |
| "{} - {} a RecoverMergeTask {} starts...", |
| logicalStorageGroupName, |
| virtualStorageGroupId, |
| taskName); |
| recoverMergeTask.recoverMerge( |
| IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()); |
| if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) { |
| mergingMods.delete(); |
| } |
| recoverCompaction(); |
| for (TsFileResource resource : tsFileManagement.getTsFileList(true)) { |
| long partitionNum = resource.getTimePartition(); |
| updatePartitionFileVersion(partitionNum, resource.getVersion()); |
| } |
| for (TsFileResource resource : tsFileManagement.getTsFileList(false)) { |
| long partitionNum = resource.getTimePartition(); |
| updatePartitionFileVersion(partitionNum, resource.getVersion()); |
| } |
| for (TsFileResource resource : upgradeSeqFileList) { |
| long partitionNum = resource.getTimePartition(); |
| updatePartitionFileVersion(partitionNum, resource.getVersion()); |
| } |
| for (TsFileResource resource : upgradeUnseqFileList) { |
| long partitionNum = resource.getTimePartition(); |
| updatePartitionFileVersion(partitionNum, resource.getVersion()); |
| } |
| updateLatestFlushedTime(); |
| } catch (IOException | MetadataException e) { |
| throw new StorageGroupProcessorException(e); |
| } |
| |
| List<TsFileResource> seqTsFileResources = tsFileManagement.getTsFileList(true); |
| for (TsFileResource resource : seqTsFileResources) { |
| long timePartitionId = resource.getTimePartition(); |
| Map<String, Long> endTimeMap = new HashMap<>(); |
| for (String deviceId : resource.getDevices()) { |
| long endTime = resource.getEndTime(deviceId); |
| endTimeMap.put(deviceId, endTime); |
| } |
| latestTimeForEachDevice |
| .computeIfAbsent(timePartitionId, l -> new HashMap<>()) |
| .putAll(endTimeMap); |
| partitionLatestFlushedTimeForEachDevice |
| .computeIfAbsent(timePartitionId, id -> new HashMap<>()) |
| .putAll(endTimeMap); |
| globalLatestFlushedTimeForEachDevice.putAll(endTimeMap); |
| } |
| |
| if (IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction() |
| && seqTsFileResources.size() > 0) { |
| for (long timePartitionId : timePartitionIdVersionControllerMap.keySet()) { |
| executeCompaction( |
| timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge()); |
| } |
| } |
| |
| logger.info( |
| String.format( |
| "the virtual storage group %s[%s] is recovered successfully", |
| logicalStorageGroupName, virtualStorageGroupId)); |
| } |
| |
| private void recoverCompaction() { |
| if (!CompactionMergeTaskPoolManager.getInstance().isTerminated()) { |
| compactionMergeWorking = true; |
| logger.info( |
| "{} - {} submit a compaction recover merge task", |
| logicalStorageGroupName, |
| virtualStorageGroupId); |
| try { |
| CompactionMergeTaskPoolManager.getInstance() |
| .submitTask( |
| logicalStorageGroupName, |
| tsFileManagement.new CompactionRecoverTask(this::closeCompactionMergeCallBack)); |
| } catch (RejectedExecutionException e) { |
| this.closeCompactionMergeCallBack(false, 0); |
| logger.error( |
| "{} - {} compaction submit task failed", |
| logicalStorageGroupName, |
| virtualStorageGroupId, |
| e); |
| } |
| } else { |
| logger.error( |
| "{} compaction pool not started ,recover failed", |
| logicalStorageGroupName + "-" + virtualStorageGroupId); |
| } |
| } |
| |
| private void updatePartitionFileVersion(long partitionNum, long fileVersion) { |
| long oldVersion = partitionMaxFileVersions.getOrDefault(partitionNum, 0L); |
| if (fileVersion > oldVersion) { |
| partitionMaxFileVersions.put(partitionNum, fileVersion); |
| } |
| } |
| |
| /** |
| * use old seq file to update latestTimeForEachDevice, globalLatestFlushedTimeForEachDevice, |
| * partitionLatestFlushedTimeForEachDevice and timePartitionIdVersionControllerMap |
| */ |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| private void updateLatestFlushedTime() throws IOException { |
| |
| VersionController versionController = |
| new SimpleFileVersionController(storageGroupSysDir.getPath()); |
| long currentVersion = versionController.currVersion(); |
| for (TsFileResource resource : upgradeSeqFileList) { |
| for (String deviceId : resource.getDevices()) { |
| long endTime = resource.getEndTime(deviceId); |
| long endTimePartitionId = StorageEngine.getTimePartition(endTime); |
| latestTimeForEachDevice |
| .computeIfAbsent(endTimePartitionId, l -> new HashMap<>()) |
| .put(deviceId, endTime); |
| globalLatestFlushedTimeForEachDevice.put(deviceId, endTime); |
| |
| // set all the covered partition's LatestFlushedTime |
| long partitionId = StorageEngine.getTimePartition(resource.getStartTime(deviceId)); |
| while (partitionId <= endTimePartitionId) { |
| partitionLatestFlushedTimeForEachDevice |
| .computeIfAbsent(partitionId, l -> new HashMap<>()) |
| .put(deviceId, endTime); |
| if (!timePartitionIdVersionControllerMap.containsKey(partitionId)) { |
| File directory = |
| SystemFileFactory.INSTANCE.getFile(storageGroupSysDir, String.valueOf(partitionId)); |
| if (!directory.exists()) { |
| directory.mkdirs(); |
| } |
| File versionFile = |
| SystemFileFactory.INSTANCE.getFile( |
| directory, SimpleFileVersionController.FILE_PREFIX + currentVersion); |
| if (!versionFile.createNewFile()) { |
| logger.warn("Version file {} has already been created ", versionFile); |
| } |
| timePartitionIdVersionControllerMap.put( |
| partitionId, |
| new SimpleFileVersionController(storageGroupSysDir.getPath(), partitionId)); |
| } |
| partitionId++; |
| } |
| } |
| } |
| } |
| |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders) |
| throws IOException { |
| List<File> tsFiles = new ArrayList<>(); |
| List<File> upgradeFiles = new ArrayList<>(); |
| for (String baseDir : folders) { |
| File fileFolder = |
| fsFactory.getFile( |
| baseDir + File.separator + logicalStorageGroupName, virtualStorageGroupId); |
| if (!fileFolder.exists()) { |
| continue; |
| } |
| |
| // old version |
| // some TsFileResource may be being persisted when the system crashed, try recovering such |
| // resources |
| continueFailedRenames(fileFolder, TEMP_SUFFIX); |
| |
| // some TsFiles were going to be replaced by the merged files when the system crashed and |
| // the process was interrupted before the merged files could be named |
| continueFailedRenames(fileFolder, MERGE_SUFFIX); |
| |
| File[] subFiles = fileFolder.listFiles(); |
| if (subFiles != null) { |
| for (File partitionFolder : subFiles) { |
| if (!partitionFolder.isDirectory()) { |
| logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath()); |
| } else if (!partitionFolder.getName().equals(IoTDBConstant.UPGRADE_FOLDER_NAME)) { |
| // some TsFileResource may be being persisted when the system crashed, try recovering |
| // such |
| // resources |
| continueFailedRenames(partitionFolder, TEMP_SUFFIX); |
| |
| // some TsFiles were going to be replaced by the merged files when the system crashed |
| // and |
| // the process was interrupted before the merged files could be named |
| continueFailedRenames(partitionFolder, MERGE_SUFFIX); |
| |
| Collections.addAll( |
| tsFiles, |
| fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX)); |
| } else { |
| // collect old TsFiles for upgrading |
| Collections.addAll( |
| upgradeFiles, |
| fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX)); |
| } |
| } |
| } |
| } |
| tsFiles.sort(this::compareFileName); |
| List<TsFileResource> ret = new ArrayList<>(); |
| tsFiles.forEach(f -> ret.add(new TsFileResource(f))); |
| upgradeFiles.sort(this::compareFileName); |
| List<TsFileResource> upgradeRet = new ArrayList<>(); |
| for (File f : upgradeFiles) { |
| TsFileResource fileResource = new TsFileResource(f); |
| fileResource.setClosed(true); |
| // make sure the flush command is called before IoTDB is down. |
| fileResource.deserializeFromOldFile(); |
| upgradeRet.add(fileResource); |
| } |
| return new Pair<>(ret, upgradeRet); |
| } |
| |
| private void continueFailedRenames(File fileFolder, String suffix) { |
| File[] files = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), suffix); |
| if (files != null) { |
| for (File tempResource : files) { |
| File originResource = fsFactory.getFile(tempResource.getPath().replace(suffix, "")); |
| if (originResource.exists()) { |
| tempResource.delete(); |
| } else { |
| tempResource.renameTo(originResource); |
| } |
| } |
| } |
| } |
| |
| private void recoverTsFiles(List<TsFileResource> tsFiles, boolean isSeq) { |
| for (int i = 0; i < tsFiles.size(); i++) { |
| TsFileResource tsFileResource = tsFiles.get(i); |
| long timePartitionId = tsFileResource.getTimePartition(); |
| |
| TsFileRecoverPerformer recoverPerformer = |
| new TsFileRecoverPerformer( |
| logicalStorageGroupName |
| + File.separator |
| + virtualStorageGroupId |
| + FILE_NAME_SEPARATOR, |
| tsFileResource, |
| isSeq, |
| i == tsFiles.size() - 1); |
| |
| RestorableTsFileIOWriter writer; |
| try { |
| // this tsfile is not zero level, no need to perform redo wal |
| if (LevelCompactionTsFileManagement.getMergeLevel(tsFileResource.getTsFile()) > 0) { |
| writer = |
| recoverPerformer.recover(false, this::getWalDirectByteBuffer, this::releaseWalBuffer); |
| if (writer.hasCrashed()) { |
| tsFileManagement.addRecover(tsFileResource, isSeq); |
| } else { |
| tsFileResource.setClosed(true); |
| tsFileManagement.add(tsFileResource, isSeq); |
| } |
| continue; |
| } else { |
| writer = |
| recoverPerformer.recover(true, this::getWalDirectByteBuffer, this::releaseWalBuffer); |
| } |
| } catch (StorageGroupProcessorException e) { |
| logger.warn( |
| "Skip TsFile: {} because of error in recover: ", tsFileResource.getTsFilePath(), e); |
| continue; |
| } |
| |
| if (i != tsFiles.size() - 1 || !writer.canWrite()) { |
| // not the last file or cannot write, just close it |
| tsFileResource.setClosed(true); |
| } else if (writer.canWrite()) { |
| // the last file is not closed, continue writing to in |
| TsFileProcessor tsFileProcessor; |
| if (isSeq) { |
| tsFileProcessor = |
| new TsFileProcessor( |
| virtualStorageGroupId, |
| storageGroupInfo, |
| tsFileResource, |
| this::closeUnsealedTsFileProcessorCallBack, |
| this::updateLatestFlushTimeCallback, |
| true, |
| writer); |
| if (enableMemControl) { |
| TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo); |
| tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); |
| this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor); |
| } |
| workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor); |
| } else { |
| tsFileProcessor = |
| new TsFileProcessor( |
| virtualStorageGroupId, |
| storageGroupInfo, |
| tsFileResource, |
| this::closeUnsealedTsFileProcessorCallBack, |
| this::unsequenceFlushCallback, |
| false, |
| writer); |
| if (enableMemControl) { |
| TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo); |
| tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); |
| this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor); |
| } |
| workUnsequenceTsFileProcessors.put(timePartitionId, tsFileProcessor); |
| } |
| tsFileResource.setProcessor(tsFileProcessor); |
| tsFileResource.removeResourceFile(); |
| tsFileProcessor.setTimeRangeId(timePartitionId); |
| writer.makeMetadataVisible(); |
| if (enableMemControl) { |
| // get chunkMetadata size |
| long chunkMetadataSize = 0; |
| for (Map<String, List<ChunkMetadata>> metaMap : writer.getMetadatasForQuery().values()) { |
| for (List<ChunkMetadata> metadatas : metaMap.values()) { |
| for (ChunkMetadata chunkMetadata : metadatas) { |
| chunkMetadataSize += chunkMetadata.calculateRamSize(); |
| } |
| } |
| } |
| tsFileProcessor.getTsFileProcessorInfo().addTSPMemCost(chunkMetadataSize); |
| } |
| } |
| tsFileManagement.add(tsFileResource, isSeq); |
| } |
| } |
| |
| // ({systemTime}-{versionNum}-{mergeNum}.tsfile) |
| private int compareFileName(File o1, File o2) { |
| String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR); |
| String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR); |
| long ver1 = Long.parseLong(items1[0]); |
| long ver2 = Long.parseLong(items2[0]); |
| int cmp = Long.compare(ver1, ver2); |
| if (cmp == 0) { |
| return Long.compare(Long.parseLong(items1[1]), Long.parseLong(items2[1])); |
| } else { |
| return cmp; |
| } |
| } |
| |
| /** |
| * insert one row of data |
| * |
| * @param insertRowPlan one row of data |
| */ |
| public void insert(InsertRowPlan insertRowPlan) |
| throws WriteProcessException, TriggerExecutionException { |
| // reject insertions that are out of ttl |
| if (!isAlive(insertRowPlan.getTime())) { |
| throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL)); |
| } |
| writeLock("InsertRow"); |
| try { |
| // init map |
| long timePartitionId = StorageEngine.getTimePartition(insertRowPlan.getTime()); |
| |
| partitionLatestFlushedTimeForEachDevice.computeIfAbsent( |
| timePartitionId, id -> new HashMap<>()); |
| |
| boolean isSequence = |
| insertRowPlan.getTime() |
| > partitionLatestFlushedTimeForEachDevice |
| .get(timePartitionId) |
| .getOrDefault(insertRowPlan.getPrefixPath().getFullPath(), Long.MIN_VALUE); |
| |
| // is unsequence and user set config to discard out of order data |
| if (!isSequence |
| && IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) { |
| return; |
| } |
| |
| latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>()); |
| |
| // fire trigger before insertion |
| TriggerEngine.fire(TriggerEvent.BEFORE_INSERT, insertRowPlan); |
| // insert to sequence or unSequence file |
| insertToTsFileProcessor(insertRowPlan, isSequence, timePartitionId); |
| // fire trigger after insertion |
| TriggerEngine.fire(TriggerEvent.AFTER_INSERT, insertRowPlan); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /** |
| * Insert a tablet (rows belonging to the same devices) into this storage group. |
| * |
| * @throws BatchProcessException if some of the rows failed to be inserted |
| */ |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| public void insertTablet(InsertTabletPlan insertTabletPlan) |
| throws BatchProcessException, TriggerExecutionException { |
| |
| writeLock("insertTablet"); |
| try { |
| TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()]; |
| Arrays.fill(results, RpcUtils.SUCCESS_STATUS); |
| boolean noFailure = true; |
| |
| /* |
| * assume that batch has been sorted by client |
| */ |
| int loc = 0; |
| while (loc < insertTabletPlan.getRowCount()) { |
| long currTime = insertTabletPlan.getTimes()[loc]; |
| // skip points that do not satisfy TTL |
| if (!isAlive(currTime)) { |
| results[loc] = |
| RpcUtils.getStatus( |
| TSStatusCode.OUT_OF_TTL_ERROR, |
| "time " + currTime + " in current line is out of TTL: " + dataTTL); |
| loc++; |
| noFailure = false; |
| } else { |
| break; |
| } |
| } |
| // loc pointing at first legal position |
| if (loc == insertTabletPlan.getRowCount()) { |
| throw new BatchProcessException(results); |
| } |
| |
| // fire trigger before insertion |
| final int firePosition = loc; |
| TriggerEngine.fire(TriggerEvent.BEFORE_INSERT, insertTabletPlan, firePosition); |
| |
| // before is first start point |
| int before = loc; |
| // before time partition |
| long beforeTimePartition = |
| StorageEngine.getTimePartition(insertTabletPlan.getTimes()[before]); |
| // init map |
| long lastFlushTime = |
| partitionLatestFlushedTimeForEachDevice |
| .computeIfAbsent(beforeTimePartition, id -> new HashMap<>()) |
| .computeIfAbsent( |
| insertTabletPlan.getPrefixPath().getFullPath(), id -> Long.MIN_VALUE); |
| // if is sequence |
| boolean isSequence = false; |
| while (loc < insertTabletPlan.getRowCount()) { |
| long time = insertTabletPlan.getTimes()[loc]; |
| long curTimePartition = StorageEngine.getTimePartition(time); |
| // start next partition |
| if (curTimePartition != beforeTimePartition) { |
| // insert last time partition |
| if (isSequence |
| || !IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) { |
| noFailure = |
| insertTabletToTsFileProcessor( |
| insertTabletPlan, before, loc, isSequence, results, beforeTimePartition) |
| && noFailure; |
| } |
| // re initialize |
| before = loc; |
| beforeTimePartition = curTimePartition; |
| lastFlushTime = |
| partitionLatestFlushedTimeForEachDevice |
| .computeIfAbsent(beforeTimePartition, id -> new HashMap<>()) |
| .computeIfAbsent( |
| insertTabletPlan.getPrefixPath().getFullPath(), id -> Long.MIN_VALUE); |
| isSequence = false; |
| } |
| // still in this partition |
| else { |
| // judge if we should insert sequence |
| if (!isSequence && time > lastFlushTime) { |
| // insert into unsequence and then start sequence |
| if (!IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) { |
| noFailure = |
| insertTabletToTsFileProcessor( |
| insertTabletPlan, before, loc, false, results, beforeTimePartition) |
| && noFailure; |
| } |
| before = loc; |
| isSequence = true; |
| } |
| loc++; |
| } |
| } |
| |
| // do not forget last part |
| if (before < loc |
| && (isSequence |
| || !IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData())) { |
| noFailure = |
| insertTabletToTsFileProcessor( |
| insertTabletPlan, before, loc, isSequence, results, beforeTimePartition) |
| && noFailure; |
| } |
| long globalLatestFlushedTime = |
| globalLatestFlushedTimeForEachDevice.getOrDefault( |
| insertTabletPlan.getPrefixPath().getFullPath(), Long.MIN_VALUE); |
| tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime); |
| |
| if (!noFailure) { |
| throw new BatchProcessException(results); |
| } |
| |
| // fire trigger after insertion |
| TriggerEngine.fire(TriggerEvent.AFTER_INSERT, insertTabletPlan, firePosition); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /** @return whether the given time falls in ttl */ |
| private boolean isAlive(long time) { |
| return dataTTL == Long.MAX_VALUE || (System.currentTimeMillis() - time) <= dataTTL; |
| } |
| |
| /** |
| * insert batch to tsfile processor thread-safety that the caller need to guarantee The rows to be |
| * inserted are in the range [start, end) |
| * |
| * @param insertTabletPlan insert a tablet of a device |
| * @param sequence whether is sequence |
| * @param start start index of rows to be inserted in insertTabletPlan |
| * @param end end index of rows to be inserted in insertTabletPlan |
| * @param results result array |
| * @param timePartitionId time partition id |
| * @return false if any failure occurs when inserting the tablet, true otherwise |
| */ |
| private boolean insertTabletToTsFileProcessor( |
| InsertTabletPlan insertTabletPlan, |
| int start, |
| int end, |
| boolean sequence, |
| TSStatus[] results, |
| long timePartitionId) { |
| // return when start >= end |
| if (start >= end) { |
| return true; |
| } |
| |
| TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence); |
| if (tsFileProcessor == null) { |
| for (int i = start; i < end; i++) { |
| results[i] = |
| RpcUtils.getStatus( |
| TSStatusCode.INTERNAL_SERVER_ERROR, |
| "can not create TsFileProcessor, timePartitionId: " + timePartitionId); |
| } |
| return false; |
| } |
| |
| try { |
| tsFileProcessor.insertTablet(insertTabletPlan, start, end, results); |
| } catch (WriteProcessRejectException e) { |
| logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage()); |
| return false; |
| } catch (WriteProcessException e) { |
| logger.error("insert to TsFileProcessor error ", e); |
| return false; |
| } |
| |
| latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>()); |
| // try to update the latest time of the device of this tsRecord |
| if (sequence |
| && latestTimeForEachDevice |
| .get(timePartitionId) |
| .getOrDefault(insertTabletPlan.getPrefixPath().getFullPath(), Long.MIN_VALUE) |
| < insertTabletPlan.getTimes()[end - 1]) { |
| latestTimeForEachDevice |
| .get(timePartitionId) |
| .put( |
| insertTabletPlan.getPrefixPath().getFullPath(), insertTabletPlan.getTimes()[end - 1]); |
| } |
| |
| // check memtable size and may async try to flush the work memtable |
| if (tsFileProcessor.shouldFlush()) { |
| fileFlushPolicy.apply(this, tsFileProcessor, sequence); |
| } |
| return true; |
| } |
| |
| private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime) { |
| if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) { |
| return; |
| } |
| IMeasurementMNode[] mNodes = plan.getMeasurementMNodes(); |
| int columnIndex = 0; |
| for (int i = 0; i < mNodes.length; i++) { |
| // Don't update cached last value for vector type |
| if (mNodes[i] != null && plan.isAligned()) { |
| columnIndex += mNodes[i].getSchema().getValueMeasurementIdList().size(); |
| } else { |
| if (plan.getColumns()[i] == null) { |
| columnIndex++; |
| continue; |
| } |
| // Update cached last value with high priority |
| if (mNodes[i] != null) { |
| // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to |
| // update last cache |
| IoTDB.metaManager.updateLastCache( |
| null, plan.composeLastTimeValuePair(columnIndex), true, latestFlushedTime, mNodes[i]); |
| } else { |
| // measurementMNodes[i] is null, use the path to update remote cache |
| IoTDB.metaManager.updateLastCache( |
| plan.getPrefixPath().concatNode(plan.getMeasurements()[columnIndex]), |
| plan.composeLastTimeValuePair(columnIndex), |
| true, |
| latestFlushedTime, |
| null); |
| } |
| columnIndex++; |
| } |
| } |
| } |
| |
| private void insertToTsFileProcessor( |
| InsertRowPlan insertRowPlan, boolean sequence, long timePartitionId) |
| throws WriteProcessException { |
| TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence); |
| if (tsFileProcessor == null) { |
| return; |
| } |
| |
| tsFileProcessor.insert(insertRowPlan); |
| |
| // try to update the latest time of the device of this tsRecord |
| if (latestTimeForEachDevice |
| .get(timePartitionId) |
| .getOrDefault(insertRowPlan.getPrefixPath().getFullPath(), Long.MIN_VALUE) |
| < insertRowPlan.getTime()) { |
| latestTimeForEachDevice |
| .get(timePartitionId) |
| .put(insertRowPlan.getPrefixPath().getFullPath(), insertRowPlan.getTime()); |
| } |
| |
| long globalLatestFlushTime = |
| globalLatestFlushedTimeForEachDevice.getOrDefault( |
| insertRowPlan.getPrefixPath().getFullPath(), Long.MIN_VALUE); |
| |
| tryToUpdateInsertLastCache(insertRowPlan, globalLatestFlushTime); |
| |
| // check memtable size and may asyncTryToFlush the work memtable |
| if (tsFileProcessor.shouldFlush()) { |
| fileFlushPolicy.apply(this, tsFileProcessor, sequence); |
| } |
| } |
| |
| private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long latestFlushedTime) { |
| if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) { |
| return; |
| } |
| IMeasurementMNode[] mNodes = plan.getMeasurementMNodes(); |
| int columnIndex = 0; |
| for (IMeasurementMNode mNode : mNodes) { |
| // Don't update cached last value for vector type |
| if (!plan.isAligned()) { |
| if (plan.getValues()[columnIndex] == null) { |
| columnIndex++; |
| continue; |
| } |
| // Update cached last value with high priority |
| if (mNode != null) { |
| // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to |
| // update last cache |
| IoTDB.metaManager.updateLastCache( |
| null, plan.composeTimeValuePair(columnIndex), true, latestFlushedTime, mNode); |
| } else { |
| IoTDB.metaManager.updateLastCache( |
| plan.getPrefixPath().concatNode(plan.getMeasurements()[columnIndex]), |
| plan.composeTimeValuePair(columnIndex), |
| true, |
| latestFlushedTime, |
| null); |
| } |
| columnIndex++; |
| } |
| } |
| } |
| |
| /** |
| * mem control module use this method to flush memtable |
| * |
| * @param tsFileProcessor tsfile processor in which memtable to be flushed |
| */ |
| public void submitAFlushTaskWhenShouldFlush(TsFileProcessor tsFileProcessor) { |
| writeLock("submitAFlushTaskWhenShouldFlush"); |
| try { |
| // check memtable size and may asyncTryToFlush the work memtable |
| if (tsFileProcessor.shouldFlush()) { |
| fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean sequence) { |
| TsFileProcessor tsFileProcessor = null; |
| try { |
| if (sequence) { |
| tsFileProcessor = |
| getOrCreateTsFileProcessorIntern(timeRangeId, workSequenceTsFileProcessors, true); |
| } else { |
| tsFileProcessor = |
| getOrCreateTsFileProcessorIntern(timeRangeId, workUnsequenceTsFileProcessors, false); |
| } |
| } catch (DiskSpaceInsufficientException e) { |
| logger.error( |
| "disk space is insufficient when creating TsFile processor, change system mode to read-only", |
| e); |
| IoTDBDescriptor.getInstance().getConfig().setReadOnly(true); |
| } catch (IOException e) { |
| logger.error( |
| "meet IOException when creating TsFileProcessor, change system mode to read-only", e); |
| IoTDBDescriptor.getInstance().getConfig().setReadOnly(true); |
| } |
| return tsFileProcessor; |
| } |
| |
| /** |
| * get processor from hashmap, flush oldest processor if necessary |
| * |
| * @param timeRangeId time partition range |
| * @param tsFileProcessorTreeMap tsFileProcessorTreeMap |
| * @param sequence whether is sequence or not |
| */ |
| private TsFileProcessor getOrCreateTsFileProcessorIntern( |
| long timeRangeId, TreeMap<Long, TsFileProcessor> tsFileProcessorTreeMap, boolean sequence) |
| throws IOException, DiskSpaceInsufficientException { |
| |
| TsFileProcessor res = tsFileProcessorTreeMap.get(timeRangeId); |
| |
| if (null == res) { |
| // we have to remove oldest processor to control the num of the memtables |
| // TODO: use a method to control the number of memtables |
| if (tsFileProcessorTreeMap.size() |
| >= IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition()) { |
| Map.Entry<Long, TsFileProcessor> processorEntry = tsFileProcessorTreeMap.firstEntry(); |
| logger.info( |
| "will close a {} TsFile because too many active partitions ({} > {}) in the storage group {},", |
| sequence, |
| tsFileProcessorTreeMap.size(), |
| IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition(), |
| logicalStorageGroupName); |
| asyncCloseOneTsFileProcessor(sequence, processorEntry.getValue()); |
| } |
| |
| // build new processor |
| res = newTsFileProcessor(sequence, timeRangeId); |
| tsFileProcessorTreeMap.put(timeRangeId, res); |
| tsFileManagement.add(res.getTsFileResource(), sequence); |
| } |
| |
| return res; |
| } |
| |
| private TsFileProcessor newTsFileProcessor(boolean sequence, long timePartitionId) |
| throws IOException, DiskSpaceInsufficientException { |
| DirectoryManager directoryManager = DirectoryManager.getInstance(); |
| String baseDir = |
| sequence |
| ? directoryManager.getNextFolderForSequenceFile() |
| : directoryManager.getNextFolderForUnSequenceFile(); |
| fsFactory |
| .getFile(baseDir + File.separator + logicalStorageGroupName, virtualStorageGroupId) |
| .mkdirs(); |
| |
| String filePath = |
| baseDir |
| + File.separator |
| + logicalStorageGroupName |
| + File.separator |
| + virtualStorageGroupId |
| + File.separator |
| + timePartitionId |
| + File.separator |
| + getNewTsFileName(timePartitionId); |
| |
| return getTsFileProcessor(sequence, filePath, timePartitionId); |
| } |
| |
| private TsFileProcessor getTsFileProcessor( |
| boolean sequence, String filePath, long timePartitionId) throws IOException { |
| TsFileProcessor tsFileProcessor; |
| if (sequence) { |
| tsFileProcessor = |
| new TsFileProcessor( |
| logicalStorageGroupName + File.separator + virtualStorageGroupId, |
| fsFactory.getFileWithParent(filePath), |
| storageGroupInfo, |
| this::closeUnsealedTsFileProcessorCallBack, |
| this::updateLatestFlushTimeCallback, |
| true); |
| } else { |
| tsFileProcessor = |
| new TsFileProcessor( |
| logicalStorageGroupName + File.separator + virtualStorageGroupId, |
| fsFactory.getFileWithParent(filePath), |
| storageGroupInfo, |
| this::closeUnsealedTsFileProcessorCallBack, |
| this::unsequenceFlushCallback, |
| false); |
| } |
| |
| if (enableMemControl) { |
| TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(storageGroupInfo); |
| tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo); |
| this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor); |
| } |
| |
| tsFileProcessor.addCloseFileListeners(customCloseFileListeners); |
| tsFileProcessor.addFlushListeners(customFlushListeners); |
| tsFileProcessor.setTimeRangeId(timePartitionId); |
| |
| return tsFileProcessor; |
| } |
| |
| /** |
| * Create a new tsfile name |
| * |
| * @return file name |
| */ |
| private String getNewTsFileName(long timePartitionId) { |
| long version = partitionMaxFileVersions.getOrDefault(timePartitionId, 0L) + 1; |
| partitionMaxFileVersions.put(timePartitionId, version); |
| return getNewTsFileName(System.currentTimeMillis(), version, 0, 0); |
| } |
| |
| private String getNewTsFileName(long time, long version, int mergeCnt, int unSeqMergeCnt) { |
| return TsFileResource.getNewTsFileName(time, version, mergeCnt, unSeqMergeCnt); |
| } |
| |
| /** |
| * close one tsfile processor |
| * |
| * @param sequence whether this tsfile processor is sequence or not |
| * @param tsFileProcessor tsfile processor |
| */ |
| public void syncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) { |
| synchronized (closeStorageGroupCondition) { |
| try { |
| asyncCloseOneTsFileProcessor(sequence, tsFileProcessor); |
| long startTime = System.currentTimeMillis(); |
| while (closingSequenceTsFileProcessor.contains(tsFileProcessor) |
| || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) { |
| closeStorageGroupCondition.wait(60_000); |
| if (System.currentTimeMillis() - startTime > 60_000) { |
| logger.warn( |
| "{} has spent {}s to wait for closing one tsfile.", |
| logicalStorageGroupName + "-" + this.virtualStorageGroupId, |
| (System.currentTimeMillis() - startTime) / 1000); |
| } |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| logger.error( |
| "syncCloseOneTsFileProcessor error occurs while waiting for closing the storage " |
| + "group {}", |
| logicalStorageGroupName + "-" + virtualStorageGroupId, |
| e); |
| } |
| } |
| } |
| |
| /** |
| * close one tsfile processor, thread-safety should be ensured by caller |
| * |
| * @param sequence whether this tsfile processor is sequence or not |
| * @param tsFileProcessor tsfile processor |
| */ |
| public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) { |
| // for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed. |
| // for unsequence tsfile, we have maintained the endTimeMap when an insertion comes. |
| if (closingSequenceTsFileProcessor.contains(tsFileProcessor) |
| || closingUnSequenceTsFileProcessor.contains(tsFileProcessor) |
| || tsFileProcessor.alreadyMarkedClosing()) { |
| return; |
| } |
| logger.info( |
| "Async close tsfile: {}", |
| tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath()); |
| if (sequence) { |
| closingSequenceTsFileProcessor.add(tsFileProcessor); |
| updateEndTimeMap(tsFileProcessor); |
| tsFileProcessor.asyncClose(); |
| |
| workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId()); |
| // if unsequence files don't contain this time range id, we should remove it's version |
| // controller |
| if (!workUnsequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) { |
| timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId()); |
| } |
| logger.info( |
| "close a sequence tsfile processor {}", |
| logicalStorageGroupName + "-" + virtualStorageGroupId); |
| } else { |
| closingUnSequenceTsFileProcessor.add(tsFileProcessor); |
| tsFileProcessor.asyncClose(); |
| |
| workUnsequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId()); |
| // if sequence files don't contain this time range id, we should remove it's version |
| // controller |
| if (!workSequenceTsFileProcessors.containsKey(tsFileProcessor.getTimeRangeId())) { |
| timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId()); |
| } |
| } |
| } |
| |
| /** |
| * delete the storageGroup's own folder in folder data/system/storage_groups |
| * |
| * @param systemDir system dir |
| */ |
| public void deleteFolder(String systemDir) { |
| logger.info( |
| "{} will close all files for deleting data folder {}", |
| logicalStorageGroupName + "-" + virtualStorageGroupId, |
| systemDir); |
| writeLock("deleteFolder"); |
| try { |
| syncCloseAllWorkingTsFileProcessors(); |
| File storageGroupFolder = |
| SystemFileFactory.INSTANCE.getFile(systemDir, virtualStorageGroupId); |
| if (storageGroupFolder.exists()) { |
| org.apache.iotdb.db.utils.FileUtils.deleteDirectory(storageGroupFolder); |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /** close all tsfile resource */ |
| public void closeAllResources() { |
| for (TsFileResource tsFileResource : tsFileManagement.getTsFileList(false)) { |
| try { |
| tsFileResource.close(); |
| } catch (IOException e) { |
| logger.error("Cannot close a TsFileResource {}", tsFileResource, e); |
| } |
| } |
| for (TsFileResource tsFileResource : tsFileManagement.getTsFileList(true)) { |
| try { |
| tsFileResource.close(); |
| } catch (IOException e) { |
| logger.error("Cannot close a TsFileResource {}", tsFileResource, e); |
| } |
| } |
| } |
| |
| /** release wal buffer */ |
| public void releaseWalDirectByteBufferPool() { |
| synchronized (walByteBufferPool) { |
| while (!walByteBufferPool.isEmpty()) { |
| MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeFirst()); |
| currentWalPoolSize--; |
| } |
| } |
| } |
| |
| /** delete tsfile */ |
| public void syncDeleteDataFiles() { |
| logger.info( |
| "{} will close all files for deleting data files", |
| logicalStorageGroupName + "-" + virtualStorageGroupId); |
| writeLock("syncDeleteDataFiles"); |
| try { |
| |
| syncCloseAllWorkingTsFileProcessors(); |
| // normally, mergingModification is just need to be closed by after a merge task is finished. |
| // we close it here just for IT test. |
| if (this.tsFileManagement.mergingModification != null) { |
| this.tsFileManagement.mergingModification.close(); |
| } |
| |
| closeAllResources(); |
| List<String> folder = DirectoryManager.getInstance().getAllSequenceFileFolders(); |
| folder.addAll(DirectoryManager.getInstance().getAllUnSequenceFileFolders()); |
| deleteAllSGFolders(folder); |
| |
| this.workSequenceTsFileProcessors.clear(); |
| this.workUnsequenceTsFileProcessors.clear(); |
| this.tsFileManagement.clear(); |
| this.partitionLatestFlushedTimeForEachDevice.clear(); |
| this.globalLatestFlushedTimeForEachDevice.clear(); |
| this.latestTimeForEachDevice.clear(); |
| } catch (IOException e) { |
| logger.error( |
| "Cannot close the mergingMod file {}", |
| this.tsFileManagement.mergingModification.getFilePath(), |
| e); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| private void deleteAllSGFolders(List<String> folder) { |
| for (String tsfilePath : folder) { |
| File storageGroupFolder = |
| fsFactory.getFile( |
| tsfilePath, logicalStorageGroupName + File.separator + virtualStorageGroupId); |
| if (storageGroupFolder.exists()) { |
| org.apache.iotdb.db.utils.FileUtils.deleteDirectory(storageGroupFolder); |
| } |
| } |
| } |
| |
| /** Iterate each TsFile and try to lock and remove those out of TTL. */ |
| public synchronized void checkFilesTTL() { |
| if (dataTTL == Long.MAX_VALUE) { |
| logger.debug( |
| "{}: TTL not set, ignore the check", |
| logicalStorageGroupName + "-" + virtualStorageGroupId); |
| return; |
| } |
| long ttlLowerBound = System.currentTimeMillis() - dataTTL; |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "{}: TTL removing files before {}", |
| logicalStorageGroupName + "-" + virtualStorageGroupId, |
| new Date(ttlLowerBound)); |
| } |
| |
| // copy to avoid concurrent modification of deletion |
| List<TsFileResource> seqFiles = new ArrayList<>(tsFileManagement.getTsFileList(true)); |
| List<TsFileResource> unseqFiles = new ArrayList<>(tsFileManagement.getTsFileList(false)); |
| |
| for (TsFileResource tsFileResource : seqFiles) { |
| checkFileTTL(tsFileResource, ttlLowerBound, true); |
| } |
| for (TsFileResource tsFileResource : unseqFiles) { |
| checkFileTTL(tsFileResource, ttlLowerBound, false); |
| } |
| } |
| |
| private void checkFileTTL(TsFileResource resource, long ttlLowerBound, boolean isSeq) { |
| if (resource.isMerging() |
| || !resource.isClosed() |
| || !resource.isDeleted() && resource.stillLives(ttlLowerBound)) { |
| return; |
| } |
| |
| writeLock("checkFileTTL"); |
| try { |
| // prevent new merges and queries from choosing this file |
| resource.setDeleted(true); |
| // the file may be chosen for merge after the last check and before writeLock() |
| // double check to ensure the file is not used by a merge |
| if (resource.isMerging()) { |
| return; |
| } |
| |
| // ensure that the file is not used by any queries |
| if (resource.tryWriteLock()) { |
| try { |
| // physical removal |
| resource.remove(); |
| if (logger.isInfoEnabled()) { |
| logger.info( |
| "Removed a file {} before {} by ttl ({}ms)", |
| resource.getTsFilePath(), |
| new Date(ttlLowerBound), |
| dataTTL); |
| } |
| tsFileManagement.remove(resource, isSeq); |
| } finally { |
| resource.writeUnlock(); |
| } |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| public void timedFlushMemTable() { |
| writeLock("timedFlushMemTable"); |
| try { |
| // only check unsequence tsfiles' memtables |
| List<TsFileProcessor> tsFileProcessors = |
| new ArrayList<>(workUnsequenceTsFileProcessors.values()); |
| long timestampBaseline = System.currentTimeMillis() - config.getUnseqMemtableFlushInterval(); |
| for (TsFileProcessor tsFileProcessor : tsFileProcessors) { |
| if (tsFileProcessor.getWorkMemTableCreatedTime() < timestampBaseline) { |
| logger.info( |
| "Exceed flush interval, so flush work memtable of time partition {} in storage group {}[{}]", |
| tsFileProcessor.getTimeRangeId(), |
| logicalStorageGroupName, |
| virtualStorageGroupId); |
| fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); |
| } |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /** This method will be blocked until all tsfile processors are closed. */ |
| public void syncCloseAllWorkingTsFileProcessors() { |
| synchronized (closeStorageGroupCondition) { |
| try { |
| asyncCloseAllWorkingTsFileProcessors(); |
| long startTime = System.currentTimeMillis(); |
| while (!closingSequenceTsFileProcessor.isEmpty() |
| || !closingUnSequenceTsFileProcessor.isEmpty()) { |
| closeStorageGroupCondition.wait(60_000); |
| if (System.currentTimeMillis() - startTime > 60_000) { |
| logger.warn( |
| "{} has spent {}s to wait for closing all TsFiles.", |
| logicalStorageGroupName + "-" + this.virtualStorageGroupId, |
| (System.currentTimeMillis() - startTime) / 1000); |
| } |
| } |
| } catch (InterruptedException e) { |
| logger.error( |
| "CloseFileNodeCondition error occurs while waiting for closing the storage " |
| + "group {}", |
| logicalStorageGroupName + "-" + virtualStorageGroupId, |
| e); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| /** close all working tsfile processors */ |
| public void asyncCloseAllWorkingTsFileProcessors() { |
| writeLock("asyncCloseAllWorkingTsFileProcessors"); |
| try { |
| logger.info( |
| "async force close all files in storage group: {}", |
| logicalStorageGroupName + "-" + virtualStorageGroupId); |
| // to avoid concurrent modification problem, we need a new array list |
| for (TsFileProcessor tsFileProcessor : |
| new ArrayList<>(workSequenceTsFileProcessors.values())) { |
| asyncCloseOneTsFileProcessor(true, tsFileProcessor); |
| } |
| // to avoid concurrent modification problem, we need a new array list |
| for (TsFileProcessor tsFileProcessor : |
| new ArrayList<>(workUnsequenceTsFileProcessors.values())) { |
| asyncCloseOneTsFileProcessor(false, tsFileProcessor); |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /** force close all working tsfile processors */ |
| public void forceCloseAllWorkingTsFileProcessors() throws TsFileProcessorException { |
| writeLock("forceCloseAllWorkingTsFileProcessors"); |
| try { |
| logger.info( |
| "force close all processors in storage group: {}", |
| logicalStorageGroupName + "-" + virtualStorageGroupId); |
| // to avoid concurrent modification problem, we need a new array list |
| for (TsFileProcessor tsFileProcessor : |
| new ArrayList<>(workSequenceTsFileProcessors.values())) { |
| tsFileProcessor.putMemTableBackAndClose(); |
| } |
| // to avoid concurrent modification problem, we need a new array list |
| for (TsFileProcessor tsFileProcessor : |
| new ArrayList<>(workUnsequenceTsFileProcessors.values())) { |
| tsFileProcessor.putMemTableBackAndClose(); |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| // TODO need a read lock, please consider the concurrency with flush manager threads. |
| /** |
| * build query data source by searching all tsfile which fit in query filter |
| * |
| * @param fullPath data path |
| * @param context query context |
| * @param timeFilter time filter |
| * @return query data source |
| */ |
| public QueryDataSource query( |
| PartialPath fullPath, |
| QueryContext context, |
| QueryFileManager filePathsManager, |
| Filter timeFilter) |
| throws QueryProcessException { |
| readLock(); |
| try { |
| List<TsFileResource> seqResources = |
| getFileResourceListForQuery( |
| tsFileManagement.getTsFileList(true), |
| upgradeSeqFileList, |
| fullPath, |
| context, |
| timeFilter, |
| true); |
| List<TsFileResource> unseqResources = |
| getFileResourceListForQuery( |
| tsFileManagement.getTsFileList(false), |
| upgradeUnseqFileList, |
| fullPath, |
| context, |
| timeFilter, |
| false); |
| QueryDataSource dataSource = new QueryDataSource(seqResources, unseqResources); |
| // used files should be added before mergeLock is unlocked, or they may be deleted by |
| // running merge |
| // is null only in tests |
| if (filePathsManager != null) { |
| filePathsManager.addUsedFilesForQuery(context.getQueryId(), dataSource); |
| } |
| dataSource.setDataTTL(dataTTL); |
| return dataSource; |
| } catch (MetadataException e) { |
| throw new QueryProcessException(e); |
| } finally { |
| readUnlock(); |
| } |
| } |
| |
| /** lock the read lock of the insert lock */ |
| public void readLock() { |
| // apply read lock for SG insert lock to prevent inconsistent with concurrently writing memtable |
| insertLock.readLock().lock(); |
| // apply read lock for TsFileResource list |
| tsFileManagement.readLock(); |
| } |
| |
| /** unlock the read lock of insert lock */ |
| public void readUnlock() { |
| tsFileManagement.readUnLock(); |
| insertLock.readLock().unlock(); |
| } |
| |
| /** lock the write lock of the insert lock */ |
| public void writeLock(String holder) { |
| insertLock.writeLock().lock(); |
| insertWriteLockHolder = holder; |
| } |
| |
| /** unlock the write lock of the insert lock */ |
| public void writeUnlock() { |
| insertWriteLockHolder = ""; |
| insertLock.writeLock().unlock(); |
| } |
| |
| /** |
| * @param tsFileResources includes sealed and unsealed tsfile resources |
| * @return fill unsealed tsfile resources with memory data and ChunkMetadataList of data in disk |
| */ |
| private List<TsFileResource> getFileResourceListForQuery( |
| Collection<TsFileResource> tsFileResources, |
| List<TsFileResource> upgradeTsFileResources, |
| PartialPath fullPath, |
| QueryContext context, |
| Filter timeFilter, |
| boolean isSeq) |
| throws MetadataException { |
| String deviceId = fullPath.getDevice(); |
| |
| if (context.isDebug()) { |
| DEBUG_LOGGER.info( |
| "Path: {}.{}, get tsfile list: {} isSeq: {} timefilter: {}", |
| deviceId, |
| fullPath.getMeasurement(), |
| tsFileResources, |
| isSeq, |
| (timeFilter == null ? "null" : timeFilter)); |
| } |
| |
| IMeasurementSchema schema = IoTDB.metaManager.getSeriesSchema(fullPath); |
| |
| List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); |
| long ttlLowerBound = |
| dataTTL != Long.MAX_VALUE ? System.currentTimeMillis() - dataTTL : Long.MIN_VALUE; |
| context.setQueryTimeLowerBound(ttlLowerBound); |
| |
| // for upgrade files and old files must be closed |
| for (TsFileResource tsFileResource : upgradeTsFileResources) { |
| if (!tsFileResource.isSatisfied(deviceId, timeFilter, isSeq, dataTTL, context.isDebug())) { |
| continue; |
| } |
| closeQueryLock.readLock().lock(); |
| try { |
| tsfileResourcesForQuery.add(tsFileResource); |
| } finally { |
| closeQueryLock.readLock().unlock(); |
| } |
| } |
| |
| for (TsFileResource tsFileResource : tsFileResources) { |
| if (!tsFileResource.isSatisfied( |
| fullPath.getDevice(), timeFilter, isSeq, dataTTL, context.isDebug())) { |
| continue; |
| } |
| closeQueryLock.readLock().lock(); |
| try { |
| if (tsFileResource.isClosed()) { |
| tsfileResourcesForQuery.add(tsFileResource); |
| } else { |
| tsFileResource |
| .getUnsealedFileProcessor() |
| .query(deviceId, fullPath.getMeasurement(), schema, context, tsfileResourcesForQuery); |
| } |
| } catch (IOException e) { |
| throw new MetadataException(e); |
| } finally { |
| closeQueryLock.readLock().unlock(); |
| } |
| } |
| return tsfileResourcesForQuery; |
| } |
| |
| /** |
| * Delete data whose timestamp <= 'timestamp' and belongs to the time series |
| * deviceId.measurementId. |
| * |
| * @param path the timeseries path of the to be deleted. |
| * @param startTime the startTime of delete range. |
| * @param endTime the endTime of delete range. |
| */ |
| public void delete(PartialPath path, long startTime, long endTime, long planIndex) |
| throws IOException { |
| // If there are still some old version tsfiles, the delete won't succeeded. |
| if (upgradeFileCount.get() != 0) { |
| throw new IOException( |
| "Delete failed. " + "Please do not delete until the old files upgraded."); |
| } |
| // TODO: how to avoid partial deletion? |
| // FIXME: notice that if we may remove a SGProcessor out of memory, we need to close all opened |
| // mod files in mergingModification, sequenceFileList, and unsequenceFileList |
| writeLock("delete"); |
| |
| // record files which are updated so that we can roll back them in case of exception |
| List<ModificationFile> updatedModFiles = new ArrayList<>(); |
| |
| try { |
| Set<PartialPath> devicePaths = IoTDB.metaManager.getDevices(path.getDevicePath()); |
| for (PartialPath device : devicePaths) { |
| Long lastUpdateTime = null; |
| for (Map<String, Long> latestTimeMap : latestTimeForEachDevice.values()) { |
| Long curTime = latestTimeMap.get(device.getFullPath()); |
| if (curTime != null && (lastUpdateTime == null || lastUpdateTime < curTime)) { |
| lastUpdateTime = curTime; |
| } |
| } |
| |
| // delete Last cache record if necessary |
| tryToDeleteLastCache(device, path, startTime, endTime); |
| } |
| |
| // write log to impacted working TsFileProcessors |
| logDeletion(startTime, endTime, path); |
| |
| Deletion deletion = new Deletion(path, MERGE_MOD_START_VERSION_NUM, startTime, endTime); |
| if (tsFileManagement.mergingModification != null) { |
| tsFileManagement.mergingModification.write(deletion); |
| updatedModFiles.add(tsFileManagement.mergingModification); |
| } |
| |
| deleteDataInFiles( |
| tsFileManagement.getTsFileList(true), deletion, devicePaths, updatedModFiles, planIndex); |
| deleteDataInFiles( |
| tsFileManagement.getTsFileList(false), deletion, devicePaths, updatedModFiles, planIndex); |
| |
| } catch (Exception e) { |
| // roll back |
| for (ModificationFile modFile : updatedModFiles) { |
| modFile.abort(); |
| } |
| throw new IOException(e); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| private void logDeletion(long startTime, long endTime, PartialPath path) throws IOException { |
| long timePartitionStartId = StorageEngine.getTimePartition(startTime); |
| long timePartitionEndId = StorageEngine.getTimePartition(endTime); |
| if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { |
| DeletePlan deletionPlan = new DeletePlan(startTime, endTime, path); |
| for (Map.Entry<Long, TsFileProcessor> entry : workSequenceTsFileProcessors.entrySet()) { |
| if (timePartitionStartId <= entry.getKey() && entry.getKey() <= timePartitionEndId) { |
| entry.getValue().getLogNode().write(deletionPlan); |
| } |
| } |
| |
| for (Map.Entry<Long, TsFileProcessor> entry : workUnsequenceTsFileProcessors.entrySet()) { |
| if (timePartitionStartId <= entry.getKey() && entry.getKey() <= timePartitionEndId) { |
| entry.getValue().getLogNode().write(deletionPlan); |
| } |
| } |
| } |
| } |
| |
| private boolean canSkipDelete( |
| TsFileResource tsFileResource, |
| Set<PartialPath> devicePaths, |
| long deleteStart, |
| long deleteEnd) { |
| for (PartialPath device : devicePaths) { |
| String deviceId = device.getFullPath(); |
| long endTime = tsFileResource.getEndTime(deviceId); |
| if (endTime == Long.MIN_VALUE) { |
| return false; |
| } |
| |
| if (tsFileResource.isDeviceIdExist(deviceId) |
| && (deleteEnd >= tsFileResource.getStartTime(deviceId) && deleteStart <= endTime)) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| private void deleteDataInFiles( |
| Collection<TsFileResource> tsFileResourceList, |
| Deletion deletion, |
| Set<PartialPath> devicePaths, |
| List<ModificationFile> updatedModFiles, |
| long planIndex) |
| throws IOException { |
| for (TsFileResource tsFileResource : tsFileResourceList) { |
| if (canSkipDelete( |
| tsFileResource, devicePaths, deletion.getStartTime(), deletion.getEndTime())) { |
| continue; |
| } |
| |
| deletion.setFileOffset(tsFileResource.getTsFileSize()); |
| // write deletion into modification file |
| tsFileResource.getModFile().write(deletion); |
| // remember to close mod file |
| tsFileResource.getModFile().close(); |
| logger.info( |
| "[Deletion] Deletion with path:{}, time:{}-{} written into mods file.", |
| deletion.getPath(), |
| deletion.getStartTime(), |
| deletion.getEndTime()); |
| |
| tsFileResource.updatePlanIndexes(planIndex); |
| |
| // delete data in memory of unsealed file |
| if (!tsFileResource.isClosed()) { |
| TsFileProcessor tsfileProcessor = tsFileResource.getUnsealedFileProcessor(); |
| tsfileProcessor.deleteDataInMemory(deletion, devicePaths); |
| } |
| |
| // add a record in case of rollback |
| updatedModFiles.add(tsFileResource.getModFile()); |
| } |
| } |
| |
| private void tryToDeleteLastCache( |
| PartialPath deviceId, PartialPath originalPath, long startTime, long endTime) |
| throws WriteProcessException { |
| if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) { |
| return; |
| } |
| try { |
| IMNode node = IoTDB.metaManager.getDeviceNode(deviceId); |
| |
| for (IMNode measurementNode : node.getChildren().values()) { |
| if (measurementNode != null |
| && originalPath.matchFullPath(measurementNode.getPartialPath())) { |
| TimeValuePair lastPair = ((IMeasurementMNode) measurementNode).getCachedLast(); |
| if (lastPair != null |
| && startTime <= lastPair.getTimestamp() |
| && lastPair.getTimestamp() <= endTime) { |
| ((IMeasurementMNode) measurementNode).resetCache(); |
| logger.info( |
| "[tryToDeleteLastCache] Last cache for path: {} is set to null", |
| measurementNode.getFullPath()); |
| } |
| } |
| } |
| } catch (MetadataException e) { |
| throw new WriteProcessException(e); |
| } |
| } |
| |
| /** |
| * when close an TsFileProcessor, update its EndTimeMap immediately |
| * |
| * @param tsFileProcessor processor to be closed |
| */ |
| private void updateEndTimeMap(TsFileProcessor tsFileProcessor) { |
| TsFileResource resource = tsFileProcessor.getTsFileResource(); |
| for (String deviceId : resource.getDevices()) { |
| resource.updateEndTime( |
| deviceId, latestTimeForEachDevice.get(tsFileProcessor.getTimeRangeId()).get(deviceId)); |
| } |
| } |
| |
| private boolean unsequenceFlushCallback(TsFileProcessor processor) { |
| return true; |
| } |
| |
| private boolean updateLatestFlushTimeCallback(TsFileProcessor processor) { |
| // update the largest timestamp in the last flushing memtable |
| Map<String, Long> curPartitionDeviceLatestTime = |
| latestTimeForEachDevice.get(processor.getTimeRangeId()); |
| |
| if (curPartitionDeviceLatestTime == null) { |
| logger.warn( |
| "Partition: {} does't have latest time for each device. " |
| + "No valid record is written into memtable. Flushing tsfile is: {}", |
| processor.getTimeRangeId(), |
| processor.getTsFileResource().getTsFile()); |
| return false; |
| } |
| |
| for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) { |
| partitionLatestFlushedTimeForEachDevice |
| .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>()) |
| .put(entry.getKey(), entry.getValue()); |
| updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice( |
| processor.getTimeRangeId(), entry.getKey(), entry.getValue()); |
| if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), Long.MIN_VALUE) |
| < entry.getValue()) { |
| globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * update latest flush time for partition id |
| * |
| * @param partitionId partition id |
| * @param latestFlushTime lastest flush time |
| * @return true if update latest flush time success |
| */ |
| private boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime) { |
| // update the largest timestamp in the last flushing memtable |
| Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice.get(partitionId); |
| |
| if (curPartitionDeviceLatestTime == null) { |
| logger.warn( |
| "Partition: {} does't have latest time for each device. " |
| + "No valid record is written into memtable. latest flush time is: {}", |
| partitionId, |
| latestFlushTime); |
| return false; |
| } |
| |
| for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) { |
| // set lastest flush time to latestTimeForEachDevice |
| entry.setValue(latestFlushTime); |
| |
| partitionLatestFlushedTimeForEachDevice |
| .computeIfAbsent(partitionId, id -> new HashMap<>()) |
| .put(entry.getKey(), entry.getValue()); |
| newlyFlushedPartitionLatestFlushedTimeForEachDevice |
| .computeIfAbsent(partitionId, id -> new HashMap<>()) |
| .put(entry.getKey(), entry.getValue()); |
| if (globalLatestFlushedTimeForEachDevice.getOrDefault(entry.getKey(), Long.MIN_VALUE) |
| < entry.getValue()) { |
| globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| return true; |
| } |
| |
| /** used for upgrading */ |
| public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice( |
| long partitionId, String deviceId, long time) { |
| newlyFlushedPartitionLatestFlushedTimeForEachDevice |
| .computeIfAbsent(partitionId, id -> new HashMap<>()) |
| .compute(deviceId, (k, v) -> v == null ? time : Math.max(v, time)); |
| } |
| |
| /** put the memtable back to the MemTablePool and make the metadata in writer visible */ |
| // TODO please consider concurrency with query and insert method. |
| private void closeUnsealedTsFileProcessorCallBack(TsFileProcessor tsFileProcessor) |
| throws TsFileProcessorException { |
| closeQueryLock.writeLock().lock(); |
| try { |
| tsFileProcessor.close(); |
| } finally { |
| closeQueryLock.writeLock().unlock(); |
| } |
| // closingSequenceTsFileProcessor is a thread safety class. |
| if (closingSequenceTsFileProcessor.contains(tsFileProcessor)) { |
| closingSequenceTsFileProcessor.remove(tsFileProcessor); |
| } else { |
| closingUnSequenceTsFileProcessor.remove(tsFileProcessor); |
| } |
| synchronized (closeStorageGroupCondition) { |
| closeStorageGroupCondition.notifyAll(); |
| } |
| logger.info( |
| "signal closing storage group condition in {}", |
| logicalStorageGroupName + "-" + virtualStorageGroupId); |
| |
| executeCompaction( |
| tsFileProcessor.getTimeRangeId(), |
| IoTDBDescriptor.getInstance().getConfig().isForceFullMerge()); |
| } |
| |
| private void executeCompaction(long timePartition, boolean fullMerge) { |
| if (!compactionMergeWorking && !CompactionMergeTaskPoolManager.getInstance().isTerminated()) { |
| compactionMergeWorking = true; |
| logger.info( |
| "{} submit a compaction merge task", |
| logicalStorageGroupName + "-" + virtualStorageGroupId); |
| try { |
| // fork and filter current tsfile, then commit then to compaction merge |
| tsFileManagement.forkCurrentFileList(timePartition); |
| tsFileManagement.setForceFullMerge(fullMerge); |
| CompactionMergeTaskPoolManager.getInstance() |
| .submitTask( |
| logicalStorageGroupName, |
| tsFileManagement |
| .new CompactionMergeTask(this::closeCompactionMergeCallBack, timePartition)); |
| } catch (IOException | RejectedExecutionException e) { |
| this.closeCompactionMergeCallBack(false, timePartition); |
| logger.error( |
| "{} compaction submit task failed", |
| logicalStorageGroupName + "-" + virtualStorageGroupId, |
| e); |
| } |
| } else { |
| logger.info( |
| "{} last compaction merge task is working, skip current merge", |
| logicalStorageGroupName + "-" + virtualStorageGroupId); |
| } |
| } |
| |
| /** close compaction merge callback, to release some locks */ |
| private void closeCompactionMergeCallBack(boolean isMerge, long timePartitionId) { |
| if (isMerge && IoTDBDescriptor.getInstance().getConfig().isEnableContinuousCompaction()) { |
| executeCompaction( |
| timePartitionId, IoTDBDescriptor.getInstance().getConfig().isForceFullMerge()); |
| } else { |
| this.compactionMergeWorking = false; |
| } |
| } |
| |
| /** |
| * count all Tsfiles in the storage group which need to be upgraded |
| * |
| * @return total num of the tsfiles which need to be upgraded in the storage group |
| */ |
| public int countUpgradeFiles() { |
| return upgradeFileCount.get(); |
| } |
| |
| /** upgrade all files belongs to this storage group */ |
| public void upgrade() { |
| for (TsFileResource seqTsFileResource : upgradeSeqFileList) { |
| seqTsFileResource.setSeq(true); |
| seqTsFileResource.setUpgradeTsFileResourceCallBack(this::upgradeTsFileResourceCallBack); |
| seqTsFileResource.doUpgrade(); |
| } |
| for (TsFileResource unseqTsFileResource : upgradeUnseqFileList) { |
| unseqTsFileResource.setSeq(false); |
| unseqTsFileResource.setUpgradeTsFileResourceCallBack(this::upgradeTsFileResourceCallBack); |
| unseqTsFileResource.doUpgrade(); |
| } |
| } |
| |
| private void upgradeTsFileResourceCallBack(TsFileResource tsFileResource) { |
| List<TsFileResource> upgradedResources = tsFileResource.getUpgradedResources(); |
| for (TsFileResource resource : upgradedResources) { |
| long partitionId = resource.getTimePartition(); |
| resource |
| .getDevices() |
| .forEach( |
| device -> |
| updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice( |
| partitionId, device, resource.getEndTime(device))); |
| } |
| upgradeFileCount.getAndAdd(-1); |
| // load all upgraded resources in this sg to tsFileManagement |
| if (upgradeFileCount.get() == 0) { |
| writeLock("upgradeTsFileResourceCallBack"); |
| try { |
| loadUpgradedResources(upgradeSeqFileList, true); |
| loadUpgradedResources(upgradeUnseqFileList, false); |
| } finally { |
| writeUnlock(); |
| } |
| // after upgrade complete, update partitionLatestFlushedTimeForEachDevice |
| for (Entry<Long, Map<String, Long>> entry : |
| newlyFlushedPartitionLatestFlushedTimeForEachDevice.entrySet()) { |
| long timePartitionId = entry.getKey(); |
| Map<String, Long> latestFlushTimeForPartition = |
| partitionLatestFlushedTimeForEachDevice.getOrDefault(timePartitionId, new HashMap<>()); |
| for (Entry<String, Long> endTimeMap : entry.getValue().entrySet()) { |
| String device = endTimeMap.getKey(); |
| long endTime = endTimeMap.getValue(); |
| if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) < endTime) { |
| partitionLatestFlushedTimeForEachDevice |
| .computeIfAbsent(timePartitionId, id -> new HashMap<>()) |
| .put(device, endTime); |
| } |
| } |
| } |
| } |
| } |
| |
| private void loadUpgradedResources(List<TsFileResource> resources, boolean isseq) { |
| if (resources.isEmpty()) { |
| return; |
| } |
| for (TsFileResource resource : resources) { |
| try { |
| UpgradeUtils.moveUpgradedFiles(resource); |
| tsFileManagement.addAll(resource.getUpgradedResources(), isseq); |
| // delete old TsFile and resource |
| resource.delete(); |
| Files.deleteIfExists( |
| fsFactory |
| .getFile(resource.getTsFile().toPath() + ModificationFile.FILE_SUFFIX) |
| .toPath()); |
| UpgradeLog.writeUpgradeLogFile( |
| resource.getTsFile().getAbsolutePath() + "," + UpgradeCheckStatus.UPGRADE_SUCCESS); |
| } catch (IOException e) { |
| logger.error("Unable to load {}, caused by ", resource, e); |
| } |
| } |
| // delete upgrade folder when it is empty |
| if (resources.get(0).getTsFile().getParentFile().isDirectory() |
| && resources.get(0).getTsFile().getParentFile().listFiles().length == 0) { |
| try { |
| Files.delete(resources.get(0).getTsFile().getParentFile().toPath()); |
| } catch (IOException e) { |
| logger.error( |
| "Delete upgrade folder {} failed, caused by ", |
| resources.get(0).getTsFile().getParentFile(), |
| e); |
| } |
| } |
| resources.clear(); |
| } |
| |
| /** |
| * merge file under this storage group processor |
| * |
| * @param isFullMerge whether this merge is a full merge or not |
| */ |
| public void merge(boolean isFullMerge) { |
| writeLock("merge"); |
| try { |
| for (long timePartitionId : partitionLatestFlushedTimeForEachDevice.keySet()) { |
| executeCompaction(timePartitionId, isFullMerge); |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /** |
| * Load a new tsfile to storage group processor. Tne file may have overlap with other files. |
| * |
| * <p>or unsequence list. |
| * |
| * <p>Secondly, execute the loading process by the type. |
| * |
| * <p>Finally, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice. |
| * |
| * @param newTsFileResource tsfile resource @UsedBy sync module. |
| */ |
| public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws LoadFileException { |
| File tsfileToBeInserted = newTsFileResource.getTsFile(); |
| long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck(); |
| writeLock("loadNewTsFileForSync"); |
| try { |
| if (loadTsFileByType( |
| LoadTsFileType.LOAD_SEQUENCE, |
| tsfileToBeInserted, |
| newTsFileResource, |
| newFilePartitionId)) { |
| updateLatestTimeMap(newTsFileResource); |
| } |
| resetLastCacheWhenLoadingTsfile(newTsFileResource); |
| } catch (DiskSpaceInsufficientException e) { |
| logger.error( |
| "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.", |
| tsfileToBeInserted.getAbsolutePath(), |
| tsfileToBeInserted.getParentFile().getName()); |
| IoTDBDescriptor.getInstance().getConfig().setReadOnly(true); |
| throw new LoadFileException(e); |
| } catch (IllegalPathException e) { |
| logger.error( |
| "Failed to reset last cache when loading file {}", newTsFileResource.getTsFilePath()); |
| throw new LoadFileException(e); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| private void resetLastCacheWhenLoadingTsfile(TsFileResource newTsFileResource) |
| throws IllegalPathException { |
| for (String device : newTsFileResource.getDevices()) { |
| tryToDeleteLastCacheByDevice(new PartialPath(device)); |
| } |
| } |
| |
| private void tryToDeleteLastCacheByDevice(PartialPath deviceId) { |
| if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) { |
| return; |
| } |
| try { |
| IMNode node = IoTDB.metaManager.getDeviceNode(deviceId); |
| |
| for (IMNode measurementNode : node.getChildren().values()) { |
| if (measurementNode != null) { |
| ((IMeasurementMNode) measurementNode).resetCache(); |
| logger.debug( |
| "[tryToDeleteLastCacheByDevice] Last cache for path: {} is set to null", |
| measurementNode.getFullPath()); |
| } |
| } |
| } catch (MetadataException e) { |
| // the path doesn't cache in cluster mode now, ignore |
| } |
| } |
| |
| /** |
| * Load a new tsfile to storage group processor. Tne file may have overlap with other files. |
| * |
| * <p>that there has no file which is overlapping with the new file. |
| * |
| * <p>Firstly, determine the loading type of the file, whether it needs to be loaded in sequence |
| * list or unsequence list. |
| * |
| * <p>Secondly, execute the loading process by the type. |
| * |
| * <p>Finally, update the latestTimeForEachDevice and partitionLatestFlushedTimeForEachDevice. |
| * |
| * @param newTsFileResource tsfile resource @UsedBy load external tsfile module |
| */ |
| public void loadNewTsFile(TsFileResource newTsFileResource) throws LoadFileException { |
| File tsfileToBeInserted = newTsFileResource.getTsFile(); |
| long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck(); |
| writeLock("loadNewTsFile"); |
| try { |
| List<TsFileResource> sequenceList = tsFileManagement.getTsFileList(true); |
| |
| int insertPos = findInsertionPosition(newTsFileResource, newFilePartitionId, sequenceList); |
| String newFileName, renameInfo; |
| LoadTsFileType tsFileType; |
| |
| // loading tsfile by type |
| if (insertPos == POS_OVERLAP) { |
| newFileName = |
| getNewTsFileName( |
| System.currentTimeMillis(), |
| getAndSetNewVersion(newFilePartitionId, newTsFileResource), |
| 0, |
| 0); |
| renameInfo = IoTDBConstant.UNSEQUENCE_FLODER_NAME; |
| tsFileType = LoadTsFileType.LOAD_UNSEQUENCE; |
| newTsFileResource.setSeq(false); |
| } else { |
| // check whether the file name needs to be renamed. |
| newFileName = getFileNameForSequenceLoadingFile(insertPos, newTsFileResource, sequenceList); |
| renameInfo = IoTDBConstant.SEQUENCE_FLODER_NAME; |
| tsFileType = LoadTsFileType.LOAD_SEQUENCE; |
| newTsFileResource.setSeq(true); |
| } |
| |
| if (!newFileName.equals(tsfileToBeInserted.getName())) { |
| logger.info( |
| "TsFile {} must be renamed to {} for loading into the " + renameInfo + " list.", |
| tsfileToBeInserted.getName(), |
| newFileName); |
| newTsFileResource.setFile( |
| fsFactory.getFile(tsfileToBeInserted.getParentFile(), newFileName)); |
| } |
| loadTsFileByType(tsFileType, tsfileToBeInserted, newTsFileResource, newFilePartitionId); |
| resetLastCacheWhenLoadingTsfile(newTsFileResource); |
| |
| // update latest time map |
| updateLatestTimeMap(newTsFileResource); |
| long partitionNum = newTsFileResource.getTimePartition(); |
| updatePartitionFileVersion(partitionNum, newTsFileResource.getVersion()); |
| } catch (DiskSpaceInsufficientException e) { |
| logger.error( |
| "Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.", |
| tsfileToBeInserted.getAbsolutePath(), |
| tsfileToBeInserted.getParentFile().getName()); |
| IoTDBDescriptor.getInstance().getConfig().setReadOnly(true); |
| throw new LoadFileException(e); |
| } catch (IllegalPathException e) { |
| logger.error( |
| "Failed to reset last cache when loading file {}", newTsFileResource.getTsFilePath()); |
| throw new LoadFileException(e); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| /** |
| * Set the version in "partition" to "version" if "version" is larger than the current version. |
| */ |
| public void setPartitionFileVersionToMax(long partition, long version) { |
| partitionMaxFileVersions.compute( |
| partition, (prt, oldVer) -> computeMaxVersion(oldVer, version)); |
| } |
| |
| private long computeMaxVersion(Long oldVersion, Long newVersion) { |
| if (oldVersion == null) { |
| return newVersion; |
| } |
| return Math.max(oldVersion, newVersion); |
| } |
| |
| /** |
| * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them. |
| * |
| * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one to be inserted |
| * POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new |
| * file can be inserted between [i, i+1] |
| */ |
| private int findInsertionPosition( |
| TsFileResource newTsFileResource, |
| long newFilePartitionId, |
| List<TsFileResource> sequenceList) { |
| |
| int insertPos = -1; |
| |
| // find the position where the new file should be inserted |
| for (int i = 0; i < sequenceList.size(); i++) { |
| TsFileResource localFile = sequenceList.get(i); |
| long localPartitionId = Long.parseLong(localFile.getTsFile().getParentFile().getName()); |
| |
| if (newFilePartitionId > localPartitionId) { |
| insertPos = i; |
| continue; |
| } |
| |
| if (!localFile.isClosed() && localFile.getProcessor() != null) { |
| // we cannot compare two files by TsFileResource unless they are both closed |
| syncCloseOneTsFileProcessor(true, localFile.getProcessor()); |
| } |
| int fileComparison = compareTsFileDevices(newTsFileResource, localFile); |
| switch (fileComparison) { |
| case 0: |
| // some devices are newer but some devices are older, the two files overlap in general |
| return POS_OVERLAP; |
| case -1: |
| // all devices in localFile are newer than the new file, the new file can be |
| // inserted before localFile |
| return i - 1; |
| default: |
| // all devices in the local file are older than the new file, proceed to the next file |
| insertPos = i; |
| } |
| } |
| return insertPos; |
| } |
| |
| /** |
| * Compare each device in the two files to find the time relation of them. |
| * |
| * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is partially older than |
| * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A) |
| */ |
| private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) { |
| boolean hasPre = false, hasSubsequence = false; |
| for (String device : fileA.getDevices()) { |
| if (!fileB.getDevices().contains(device)) { |
| continue; |
| } |
| long startTimeA = fileA.getStartTime(device); |
| long endTimeA = fileA.getEndTime(device); |
| long startTimeB = fileB.getStartTime(device); |
| long endTimeB = fileB.getEndTime(device); |
| if (startTimeA > endTimeB) { |
| // A's data of the device is later than to the B's data |
| hasPre = true; |
| } else if (startTimeB > endTimeA) { |
| // A's data of the device is previous to the B's data |
| hasSubsequence = true; |
| } else { |
| // the two files overlap in the device |
| return 0; |
| } |
| } |
| if (hasPre && hasSubsequence) { |
| // some devices are newer but some devices are older, the two files overlap in general |
| return 0; |
| } |
| if (!hasPre && hasSubsequence) { |
| // all devices in B are newer than those in A |
| return -1; |
| } |
| // all devices in B are older than those in A |
| return 1; |
| } |
| |
| /** |
| * If the historical versions of a file is a sub-set of the given file's, (close and) remove it to |
| * reduce unnecessary merge. Only used when the file sender and the receiver share the same file |
| * close policy. Warning: DO NOT REMOVE |
| */ |
| @SuppressWarnings("unused") |
| public void removeFullyOverlapFiles(TsFileResource resource) { |
| writeLock("removeFullyOverlapFiles"); |
| try { |
| Iterator<TsFileResource> iterator = tsFileManagement.getIterator(true); |
| removeFullyOverlapFiles(resource, iterator, true); |
| |
| iterator = tsFileManagement.getIterator(false); |
| removeFullyOverlapFiles(resource, iterator, false); |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| private void removeFullyOverlapFiles( |
| TsFileResource newTsFile, Iterator<TsFileResource> iterator, boolean isSeq) { |
| while (iterator.hasNext()) { |
| TsFileResource existingTsFile = iterator.next(); |
| if (newTsFile.isPlanRangeCovers(existingTsFile) |
| && !newTsFile.getTsFile().equals(existingTsFile.getTsFile()) |
| && existingTsFile.tryWriteLock()) { |
| logger.info( |
| "{} is covered by {}: [{}, {}], [{}, {}], remove it", |
| existingTsFile, |
| newTsFile, |
| existingTsFile.minPlanIndex, |
| existingTsFile.maxPlanIndex, |
| newTsFile.minPlanIndex, |
| newTsFile.maxPlanIndex); |
| // if we fail to lock the file, it means it is being queried or merged and we will not |
| // wait until it is free, we will just leave it to the next merge |
| try { |
| removeFullyOverlapFile(existingTsFile, iterator, isSeq); |
| } catch (Exception e) { |
| logger.error( |
| "Something gets wrong while removing FullyOverlapFiles: {}", |
| existingTsFile.getTsFile().getAbsolutePath(), |
| e); |
| } finally { |
| existingTsFile.writeUnlock(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * remove the given tsFileResource. If the corresponding tsFileProcessor is in the working status, |
| * close it before remove the related resource files. maybe time-consuming for closing a tsfile. |
| */ |
| private void removeFullyOverlapFile( |
| TsFileResource tsFileResource, Iterator<TsFileResource> iterator, boolean isSeq) { |
| logger.info( |
| "Removing a covered file {}, closed: {}", tsFileResource, tsFileResource.isClosed()); |
| if (!tsFileResource.isClosed()) { |
| try { |
| // also remove the TsFileProcessor if the overlapped file is not closed |
| long timePartition = tsFileResource.getTimePartition(); |
| Map<Long, TsFileProcessor> fileProcessorMap = |
| isSeq ? workSequenceTsFileProcessors : workUnsequenceTsFileProcessors; |
| TsFileProcessor tsFileProcessor = fileProcessorMap.get(timePartition); |
| if (tsFileProcessor != null && tsFileProcessor.getTsFileResource() == tsFileResource) { |
| // have to take some time to close the tsFileProcessor |
| tsFileProcessor.syncClose(); |
| fileProcessorMap.remove(timePartition); |
| } |
| } catch (Exception e) { |
| logger.error("Cannot close {}", tsFileResource, e); |
| } |
| } |
| tsFileManagement.remove(tsFileResource, isSeq); |
| iterator.remove(); |
| tsFileResource.remove(); |
| } |
| |
| /** |
| * Get an appropriate filename to ensure the order between files. The tsfile is named after |
| * ({systemTime}-{versionNum}-{in_space_compaction_num}-{cross_space_compaction_num}.tsfile). |
| * |
| * <p>The sorting rules for tsfile names @see {@link this#compareFileName}, we can restore the |
| * list based on the file name and ensure the correctness of the order, so there are three cases. |
| * |
| * <p>1. The tsfile is to be inserted in the first place of the list. Timestamp can be set to half |
| * of the timestamp value in the file name of the first tsfile in the list , and the version |
| * number will be updated to the largest number in this time partition. |
| * |
| * <p>2. The tsfile is to be inserted in the last place of the list. The file name is generated by |
| * the system according to the naming rules and returned. |
| * |
| * <p>3. This file is inserted between two files. The time stamp is the mean of the timestamps of |
| * the two files, the version number will be updated to the largest number in this time partition. |
| * |
| * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex + |
| * 1] |
| * @return appropriate filename |
| */ |
| private String getFileNameForSequenceLoadingFile( |
| int insertIndex, TsFileResource newTsFileResource, List<TsFileResource> sequenceList) |
| throws LoadFileException { |
| long timePartitionId = newTsFileResource.getTimePartition(); |
| long preTime, subsequenceTime; |
| |
| if (insertIndex == -1) { |
| preTime = 0L; |
| } else { |
| String preName = sequenceList.get(insertIndex).getTsFile().getName(); |
| preTime = Long.parseLong(preName.split(FILE_NAME_SEPARATOR)[0]); |
| } |
| if (insertIndex == tsFileManagement.size(true) - 1) { |
| subsequenceTime = preTime + ((System.currentTimeMillis() - preTime) << 1); |
| } else { |
| String subsequenceName = sequenceList.get(insertIndex + 1).getTsFile().getName(); |
| subsequenceTime = Long.parseLong(subsequenceName.split(FILE_NAME_SEPARATOR)[0]); |
| } |
| |
| long meanTime = preTime + ((subsequenceTime - preTime) >> 1); |
| if (insertIndex != tsFileManagement.size(true) - 1 && meanTime == subsequenceTime) { |
| throw new LoadFileException("can not load TsFile because of can not find suitable location"); |
| } |
| |
| return getNewTsFileName( |
| meanTime, getAndSetNewVersion(timePartitionId, newTsFileResource), 0, 0); |
| } |
| |
| private long getAndSetNewVersion(long timePartitionId, TsFileResource tsFileResource) { |
| long version = partitionMaxFileVersions.getOrDefault(timePartitionId, -1L) + 1; |
| partitionMaxFileVersions.put(timePartitionId, version); |
| tsFileResource.setVersion(version); |
| return version; |
| } |
| |
| /** |
| * Update latest time in latestTimeForEachDevice and |
| * partitionLatestFlushedTimeForEachDevice. @UsedBy sync module, load external tsfile module. |
| */ |
| private void updateLatestTimeMap(TsFileResource newTsFileResource) { |
| for (String device : newTsFileResource.getDevices()) { |
| long endTime = newTsFileResource.getEndTime(device); |
| long timePartitionId = StorageEngine.getTimePartition(endTime); |
| if (!latestTimeForEachDevice |
| .computeIfAbsent(timePartitionId, id -> new HashMap<>()) |
| .containsKey(device) |
| || latestTimeForEachDevice.get(timePartitionId).get(device) < endTime) { |
| latestTimeForEachDevice.get(timePartitionId).put(device, endTime); |
| } |
| |
| Map<String, Long> latestFlushTimeForPartition = |
| partitionLatestFlushedTimeForEachDevice.getOrDefault(timePartitionId, new HashMap<>()); |
| |
| if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) < endTime) { |
| partitionLatestFlushedTimeForEachDevice |
| .computeIfAbsent(timePartitionId, id -> new HashMap<>()) |
| .put(device, endTime); |
| } |
| if (globalLatestFlushedTimeForEachDevice.getOrDefault(device, Long.MIN_VALUE) < endTime) { |
| globalLatestFlushedTimeForEachDevice.put(device, endTime); |
| } |
| } |
| } |
| |
| /** |
| * Execute the loading process by the type. |
| * |
| * @param type load type |
| * @param tsFileResource tsfile resource to be loaded |
| * @param filePartitionId the partition id of the new file |
| * @return load the file successfully @UsedBy sync module, load external tsfile module. |
| */ |
| private boolean loadTsFileByType( |
| LoadTsFileType type, File tsFileToLoad, TsFileResource tsFileResource, long filePartitionId) |
| throws LoadFileException, DiskSpaceInsufficientException { |
| File targetFile; |
| switch (type) { |
| case LOAD_UNSEQUENCE: |
| targetFile = |
| fsFactory.getFile( |
| DirectoryManager.getInstance().getNextFolderForUnSequenceFile(), |
| logicalStorageGroupName |
| + File.separatorChar |
| + virtualStorageGroupId |
| + File.separatorChar |
| + filePartitionId |
| + File.separator |
| + tsFileResource.getTsFile().getName()); |
| tsFileResource.setFile(targetFile); |
| if (tsFileManagement.contains(tsFileResource, false)) { |
| logger.error("The file {} has already been loaded in unsequence list", tsFileResource); |
| return false; |
| } |
| tsFileManagement.add(tsFileResource, false); |
| logger.info( |
| "Load tsfile in unsequence list, move file from {} to {}", |
| tsFileToLoad.getAbsolutePath(), |
| targetFile.getAbsolutePath()); |
| break; |
| case LOAD_SEQUENCE: |
| targetFile = |
| fsFactory.getFile( |
| DirectoryManager.getInstance().getNextFolderForSequenceFile(), |
| logicalStorageGroupName |
| + File.separatorChar |
| + virtualStorageGroupId |
| + File.separatorChar |
| + filePartitionId |
| + File.separator |
| + tsFileResource.getTsFile().getName()); |
| tsFileResource.setFile(targetFile); |
| if (tsFileManagement.contains(tsFileResource, true)) { |
| logger.error("The file {} has already been loaded in sequence list", tsFileResource); |
| return false; |
| } |
| tsFileManagement.add(tsFileResource, true); |
| logger.info( |
| "Load tsfile in sequence list, move file from {} to {}", |
| tsFileToLoad.getAbsolutePath(), |
| targetFile.getAbsolutePath()); |
| break; |
| default: |
| throw new LoadFileException(String.format("Unsupported type of loading tsfile : %s", type)); |
| } |
| |
| // move file from sync dir to data dir |
| if (!targetFile.getParentFile().exists()) { |
| targetFile.getParentFile().mkdirs(); |
| } |
| try { |
| FileUtils.moveFile(tsFileToLoad, targetFile); |
| } catch (IOException e) { |
| logger.error( |
| "File renaming failed when loading tsfile. Origin: {}, Target: {}", |
| tsFileToLoad.getAbsolutePath(), |
| targetFile.getAbsolutePath(), |
| e); |
| throw new LoadFileException( |
| String.format( |
| "File renaming failed when loading tsfile. Origin: %s, Target: %s, because %s", |
| tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage())); |
| } |
| |
| File resourceFileToLoad = |
| fsFactory.getFile(tsFileToLoad.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX); |
| File targetResourceFile = |
| fsFactory.getFile(targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX); |
| try { |
| FileUtils.moveFile(resourceFileToLoad, targetResourceFile); |
| } catch (IOException e) { |
| logger.error( |
| "File renaming failed when loading .resource file. Origin: {}, Target: {}", |
| resourceFileToLoad.getAbsolutePath(), |
| targetResourceFile.getAbsolutePath(), |
| e); |
| throw new LoadFileException( |
| String.format( |
| "File renaming failed when loading .resource file. Origin: %s, Target: %s, because %s", |
| resourceFileToLoad.getAbsolutePath(), |
| targetResourceFile.getAbsolutePath(), |
| e.getMessage())); |
| } |
| |
| File modFileToLoad = |
| fsFactory.getFile(tsFileToLoad.getAbsolutePath() + ModificationFile.FILE_SUFFIX); |
| if (modFileToLoad.exists()) { |
| // when successfully loaded, the filepath of the resource will be changed to the IoTDB data |
| // dir, so we can add a suffix to find the old modification file. |
| File targetModFile = |
| fsFactory.getFile(targetFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX); |
| try { |
| Files.deleteIfExists(targetFile.toPath()); |
| } catch (IOException e) { |
| logger.warn("Cannot delete localModFile {}", targetModFile, e); |
| } |
| try { |
| FileUtils.moveFile(modFileToLoad, targetModFile); |
| } catch (IOException e) { |
| logger.error( |
| "File renaming failed when loading .mod file. Origin: {}, Target: {}", |
| resourceFileToLoad.getAbsolutePath(), |
| targetModFile.getAbsolutePath(), |
| e); |
| throw new LoadFileException( |
| String.format( |
| "File renaming failed when loading .mod file. Origin: %s, Target: %s, because %s", |
| resourceFileToLoad.getAbsolutePath(), |
| targetModFile.getAbsolutePath(), |
| e.getMessage())); |
| } finally { |
| // ModFile will be updated during the next call to `getModFile` |
| tsFileResource.setModFile(null); |
| } |
| } |
| |
| updatePartitionFileVersion(filePartitionId, tsFileResource.getVersion()); |
| return true; |
| } |
| |
| /** |
| * Delete tsfile if it exists. |
| * |
| * <p>Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList. |
| * |
| * <p>Secondly, delete the tsfile and .resource file. |
| * |
| * @param tsfieToBeDeleted tsfile to be deleted |
| * @return whether the file to be deleted exists. @UsedBy sync module, load external tsfile |
| * module. |
| */ |
| public boolean deleteTsfile(File tsfieToBeDeleted) { |
| writeLock("deleteTsfile"); |
| TsFileResource tsFileResourceToBeDeleted = null; |
| try { |
| Iterator<TsFileResource> sequenceIterator = tsFileManagement.getIterator(true); |
| while (sequenceIterator.hasNext()) { |
| TsFileResource sequenceResource = sequenceIterator.next(); |
| if (sequenceResource.getTsFile().getName().equals(tsfieToBeDeleted.getName())) { |
| tsFileResourceToBeDeleted = sequenceResource; |
| tsFileManagement.remove(tsFileResourceToBeDeleted, true); |
| break; |
| } |
| } |
| if (tsFileResourceToBeDeleted == null) { |
| Iterator<TsFileResource> unsequenceIterator = tsFileManagement.getIterator(false); |
| while (unsequenceIterator.hasNext()) { |
| TsFileResource unsequenceResource = unsequenceIterator.next(); |
| if (unsequenceResource.getTsFile().getName().equals(tsfieToBeDeleted.getName())) { |
| tsFileResourceToBeDeleted = unsequenceResource; |
| tsFileManagement.remove(tsFileResourceToBeDeleted, false); |
| break; |
| } |
| } |
| } |
| } finally { |
| writeUnlock(); |
| } |
| if (tsFileResourceToBeDeleted == null) { |
| return false; |
| } |
| tsFileResourceToBeDeleted.writeLock(); |
| try { |
| tsFileResourceToBeDeleted.remove(); |
| logger.info("Delete tsfile {} successfully.", tsFileResourceToBeDeleted.getTsFile()); |
| } finally { |
| tsFileResourceToBeDeleted.writeUnlock(); |
| } |
| return true; |
| } |
| |
| /** |
| * get all working sequence tsfile processors |
| * |
| * @return all working sequence tsfile processors |
| */ |
| public Collection<TsFileProcessor> getWorkSequenceTsFileProcessors() { |
| return workSequenceTsFileProcessors.values(); |
| } |
| |
| /** |
| * Move tsfile to the target directory if it exists. |
| * |
| * <p>Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList. |
| * |
| * <p>Secondly, move the tsfile and .resource file to the target directory. |
| * |
| * @param fileToBeMoved tsfile to be moved |
| * @return whether the file to be moved exists. @UsedBy load external tsfile module. |
| */ |
| public boolean moveTsfile(File fileToBeMoved, File targetDir) { |
| writeLock("moveTsfile"); |
| TsFileResource tsFileResourceToBeMoved = null; |
| try { |
| Iterator<TsFileResource> sequenceIterator = tsFileManagement.getIterator(true); |
| while (sequenceIterator.hasNext()) { |
| TsFileResource sequenceResource = sequenceIterator.next(); |
| if (sequenceResource.getTsFile().getName().equals(fileToBeMoved.getName())) { |
| tsFileResourceToBeMoved = sequenceResource; |
| tsFileManagement.remove(tsFileResourceToBeMoved, true); |
| break; |
| } |
| } |
| if (tsFileResourceToBeMoved == null) { |
| Iterator<TsFileResource> unsequenceIterator = tsFileManagement.getIterator(false); |
| while (unsequenceIterator.hasNext()) { |
| TsFileResource unsequenceResource = unsequenceIterator.next(); |
| if (unsequenceResource.getTsFile().getName().equals(fileToBeMoved.getName())) { |
| tsFileResourceToBeMoved = unsequenceResource; |
| tsFileManagement.remove(tsFileResourceToBeMoved, false); |
| break; |
| } |
| } |
| } |
| } finally { |
| writeUnlock(); |
| } |
| if (tsFileResourceToBeMoved == null) { |
| return false; |
| } |
| tsFileResourceToBeMoved.writeLock(); |
| try { |
| tsFileResourceToBeMoved.moveTo(targetDir); |
| logger.info( |
| "Move tsfile {} to target dir {} successfully.", |
| tsFileResourceToBeMoved.getTsFile(), |
| targetDir.getPath()); |
| } finally { |
| tsFileResourceToBeMoved.writeUnlock(); |
| } |
| return true; |
| } |
| |
| /** |
| * get all working unsequence tsfile processors |
| * |
| * @return all working unsequence tsfile processors |
| */ |
| public Collection<TsFileProcessor> getWorkUnsequenceTsFileProcessors() { |
| return workUnsequenceTsFileProcessors.values(); |
| } |
| |
| public void setDataTTL(long dataTTL) { |
| this.dataTTL = dataTTL; |
| checkFilesTTL(); |
| } |
| |
| public List<TsFileResource> getSequenceFileTreeSet() { |
| return tsFileManagement.getTsFileList(true); |
| } |
| |
| public List<TsFileResource> getUnSequenceFileList() { |
| return tsFileManagement.getTsFileList(false); |
| } |
| |
| public String getVirtualStorageGroupId() { |
| return virtualStorageGroupId; |
| } |
| |
| public StorageGroupInfo getStorageGroupInfo() { |
| return storageGroupInfo; |
| } |
| |
| /** |
| * Check if the data of "tsFileResource" all exist locally by comparing planIndexes in the |
| * partition of "partitionNumber". This is available only when the IoTDB instances which generated |
| * "tsFileResource" have the same plan indexes as the local one. |
| * |
| * @return true if any file contains plans with indexes no less than the max plan index of |
| * "tsFileResource", otherwise false. |
| */ |
| public boolean isFileAlreadyExist(TsFileResource tsFileResource, long partitionNum) { |
| // examine working processor first as they have the largest plan index |
| return isFileAlreadyExistInWorking( |
| tsFileResource, partitionNum, getWorkSequenceTsFileProcessors()) |
| || isFileAlreadyExistInWorking( |
| tsFileResource, partitionNum, getWorkUnsequenceTsFileProcessors()) |
| || isFileAlreadyExistInClosed(tsFileResource, partitionNum, getSequenceFileTreeSet()) |
| || isFileAlreadyExistInClosed(tsFileResource, partitionNum, getUnSequenceFileList()); |
| } |
| |
| private boolean isFileAlreadyExistInClosed( |
| TsFileResource tsFileResource, long partitionNum, Collection<TsFileResource> existingFiles) { |
| for (TsFileResource resource : existingFiles) { |
| if (resource.getTimePartition() == partitionNum |
| && resource.getMaxPlanIndex() > tsFileResource.getMaxPlanIndex()) { |
| logger.info( |
| "{} is covered by a closed file {}: [{}, {}] [{}, {}]", |
| tsFileResource, |
| resource, |
| tsFileResource.minPlanIndex, |
| tsFileResource.maxPlanIndex, |
| resource.minPlanIndex, |
| resource.maxPlanIndex); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private boolean isFileAlreadyExistInWorking( |
| TsFileResource tsFileResource, |
| long partitionNum, |
| Collection<TsFileProcessor> workingProcessors) { |
| for (TsFileProcessor workingProcesssor : workingProcessors) { |
| if (workingProcesssor.getTimeRangeId() == partitionNum) { |
| TsFileResource workResource = workingProcesssor.getTsFileResource(); |
| boolean isCovered = workResource.getMaxPlanIndex() > tsFileResource.getMaxPlanIndex(); |
| if (isCovered) { |
| logger.info( |
| "{} is covered by a working file {}: [{}, {}] [{}, {}]", |
| tsFileResource, |
| workResource, |
| tsFileResource.minPlanIndex, |
| tsFileResource.maxPlanIndex, |
| workResource.minPlanIndex, |
| workResource.maxPlanIndex); |
| } |
| return isCovered; |
| } |
| } |
| return false; |
| } |
| |
| /** remove all partitions that satisfy a filter. */ |
| public void removePartitions(TimePartitionFilter filter) { |
| // this requires blocking all other activities |
| writeLock("removePartitions"); |
| try { |
| // abort ongoing comapctions and merges |
| CompactionMergeTaskPoolManager.getInstance().abortCompaction(logicalStorageGroupName); |
| MergeManager.getINSTANCE().abortMerge(logicalStorageGroupName); |
| // close all working files that should be removed |
| removePartitions(filter, workSequenceTsFileProcessors.entrySet(), true); |
| removePartitions(filter, workUnsequenceTsFileProcessors.entrySet(), false); |
| |
| // remove data files |
| removePartitions(filter, tsFileManagement.getIterator(true), true); |
| removePartitions(filter, tsFileManagement.getIterator(false), false); |
| |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| // may remove the processorEntrys |
| private void removePartitions( |
| TimePartitionFilter filter, |
| Set<Entry<Long, TsFileProcessor>> processorEntrys, |
| boolean sequence) { |
| for (Iterator<Entry<Long, TsFileProcessor>> iterator = processorEntrys.iterator(); |
| iterator.hasNext(); ) { |
| Entry<Long, TsFileProcessor> longTsFileProcessorEntry = iterator.next(); |
| long partitionId = longTsFileProcessorEntry.getKey(); |
| TsFileProcessor processor = longTsFileProcessorEntry.getValue(); |
| if (filter.satisfy(logicalStorageGroupName, partitionId)) { |
| processor.syncClose(); |
| iterator.remove(); |
| processor.getTsFileResource().remove(); |
| tsFileManagement.remove(processor.getTsFileResource(), sequence); |
| updateLatestFlushTimeToPartition(partitionId, Long.MIN_VALUE); |
| logger.debug( |
| "{} is removed during deleting partitions", |
| processor.getTsFileResource().getTsFilePath()); |
| } |
| } |
| } |
| |
| // may remove the iterator's data |
| private void removePartitions( |
| TimePartitionFilter filter, Iterator<TsFileResource> iterator, boolean sequence) { |
| while (iterator.hasNext()) { |
| TsFileResource tsFileResource = iterator.next(); |
| if (filter.satisfy(logicalStorageGroupName, tsFileResource.getTimePartition())) { |
| tsFileResource.remove(); |
| tsFileManagement.remove(tsFileResource, sequence); |
| updateLatestFlushTimeToPartition(tsFileResource.getTimePartition(), Long.MIN_VALUE); |
| logger.debug("{} is removed during deleting partitions", tsFileResource.getTsFilePath()); |
| } |
| } |
| } |
| |
| public TsFileManagement getTsFileManagement() { |
| return tsFileManagement; |
| } |
| |
| /** |
| * insert batch of rows belongs to one device |
| * |
| * @param insertRowsOfOneDevicePlan batch of rows belongs to one device |
| */ |
| public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan) |
| throws WriteProcessException, TriggerExecutionException { |
| writeLock("InsertRowsOfOneDevice"); |
| try { |
| boolean isSequence = false; |
| InsertRowPlan[] rowPlans = insertRowsOfOneDevicePlan.getRowPlans(); |
| for (int i = 0, rowPlansLength = rowPlans.length; i < rowPlansLength; i++) { |
| |
| InsertRowPlan plan = rowPlans[i]; |
| if (!isAlive(plan.getTime()) || insertRowsOfOneDevicePlan.isExecuted(i)) { |
| // we do not need to write these part of data, as they can not be queried |
| // or the sub-plan has already been executed, we are retrying other sub-plans |
| continue; |
| } |
| // init map |
| long timePartitionId = StorageEngine.getTimePartition(plan.getTime()); |
| |
| partitionLatestFlushedTimeForEachDevice.computeIfAbsent( |
| timePartitionId, id -> new HashMap<>()); |
| // as the plans have been ordered, and we have get the write lock, |
| // So, if a plan is sequenced, then all the rest plans are sequenced. |
| // |
| if (!isSequence) { |
| isSequence = |
| plan.getTime() |
| > partitionLatestFlushedTimeForEachDevice |
| .get(timePartitionId) |
| .getOrDefault(plan.getPrefixPath().getFullPath(), Long.MIN_VALUE); |
| } |
| // is unsequence and user set config to discard out of order data |
| if (!isSequence |
| && IoTDBDescriptor.getInstance().getConfig().isEnableDiscardOutOfOrderData()) { |
| return; |
| } |
| latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>()); |
| |
| // fire trigger before insertion |
| TriggerEngine.fire(TriggerEvent.BEFORE_INSERT, plan); |
| // insert to sequence or unSequence file |
| insertToTsFileProcessor(plan, isSequence, timePartitionId); |
| // fire trigger before insertion |
| TriggerEngine.fire(TriggerEvent.AFTER_INSERT, plan); |
| } |
| } finally { |
| writeUnlock(); |
| } |
| } |
| |
| @TestOnly |
| public long getPartitionMaxFileVersions(long partitionId) { |
| return partitionMaxFileVersions.getOrDefault(partitionId, -1L); |
| } |
| |
| public void setCustomCloseFileListeners(List<CloseFileListener> customCloseFileListeners) { |
| this.customCloseFileListeners = customCloseFileListeners; |
| } |
| |
| public void setCustomFlushListeners(List<FlushListener> customFlushListeners) { |
| this.customFlushListeners = customFlushListeners; |
| } |
| |
| private enum LoadTsFileType { |
| LOAD_SEQUENCE, |
| LOAD_UNSEQUENCE |
| } |
| |
| @FunctionalInterface |
| public interface CloseTsFileCallBack { |
| |
| void call(TsFileProcessor caller) throws TsFileProcessorException, IOException; |
| } |
| |
| @FunctionalInterface |
| public interface UpdateEndTimeCallBack { |
| |
| boolean call(TsFileProcessor caller); |
| } |
| |
| @FunctionalInterface |
| public interface UpgradeTsFileResourceCallBack { |
| |
| void call(TsFileResource caller); |
| } |
| |
| @FunctionalInterface |
| public interface CloseCompactionMergeCallBack { |
| |
| void call(boolean isMergeExecutedInCurrentTask, long timePartitionId); |
| } |
| |
| @FunctionalInterface |
| public interface TimePartitionFilter { |
| |
| boolean satisfy(String storageGroupName, long timePartitionId); |
| } |
| |
| public String getInsertWriteLockHolder() { |
| return insertWriteLockHolder; |
| } |
| } |