| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl; |
| |
| import org.apache.iotdb.commons.conf.IoTDBConstant; |
| import org.apache.iotdb.db.conf.IoTDBConfig; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.exception.MergeException; |
| import org.apache.iotdb.db.service.metrics.CompactionMetrics; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.ICompactionSelector; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.ICrossSpaceSelector; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractCrossSpaceEstimator; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossCompactionTaskResource; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossSpaceCompactionCandidate; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.DeviceInfo; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.InsertionCrossCompactionTaskResource; |
| import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.TsFileResourceCandidate; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; |
| import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; |
| import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; |
| import org.apache.iotdb.tsfile.exception.StopReadTsFileByInterruptException; |
| import org.apache.iotdb.tsfile.file.metadata.IDeviceID; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.stream.Collectors; |
| |
| public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector { |
| private static final Logger LOGGER = |
| LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); |
| private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); |
| |
| protected String logicalStorageGroupName; |
| protected String dataRegionId; |
| protected long timePartition; |
| protected TsFileManager tsFileManager; |
| |
| private static boolean hasPrintedLog = false; |
| |
| private final long memoryBudget; |
| private final int maxCrossCompactionFileNum; |
| private final long maxCrossCompactionFileSize; |
| |
| private final AbstractCrossSpaceEstimator compactionEstimator; |
| |
| public RewriteCrossSpaceCompactionSelector( |
| String logicalStorageGroupName, |
| String dataRegionId, |
| long timePartition, |
| TsFileManager tsFileManager) { |
| this.logicalStorageGroupName = logicalStorageGroupName; |
| this.dataRegionId = dataRegionId; |
| this.timePartition = timePartition; |
| this.tsFileManager = tsFileManager; |
| this.memoryBudget = |
| (long) |
| ((double) SystemInfo.getInstance().getMemorySizeForCompaction() |
| / IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() |
| * config.getUsableCompactionMemoryProportion()); |
| this.maxCrossCompactionFileNum = |
| IoTDBDescriptor.getInstance().getConfig().getFileLimitPerCrossTask(); |
| this.maxCrossCompactionFileSize = |
| IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileSize(); |
| |
| this.compactionEstimator = |
| (AbstractCrossSpaceEstimator) |
| ICompactionSelector.getCompactionEstimator( |
| IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer(), false); |
| } |
| |
| /** |
| * Select merge candidates from seqFiles and unseqFiles under the given memoryBudget. This process |
| * iteratively adds the next unseqFile from unseqFiles and its overlapping seqFiles as newly-added |
| * candidates and computes their estimated memory cost. If the current cost pluses the new cost is |
| * still under the budget, accept the unseqFile and the seqFiles as candidates, otherwise go to |
| * the next iteration. The memory cost of a file is calculated in two ways: The rough estimation: |
| * for a seqFile, the size of its metadata is used for estimation. Since in the worst case, the |
| * file only contains one timeseries and all its metadata will be loaded into memory with at most |
| * one actual data chunk (which is negligible) and writing the timeseries into a new file generate |
| * metadata of the similar size, so the size of all seqFiles' metadata (generated when writing new |
| * chunks) pluses the largest one (loaded when reading a timeseries from the seqFiles) is the |
| * total estimation of all seqFiles; for an unseqFile, since the merge reader may read all chunks |
| * of a series to perform a merge read, the whole file may be loaded into memory, so we use the |
| * file's length as the maximum estimation. The tight estimation: based on the rough estimation, |
| * we scan the file's metadata to count the number of chunks for each series, find the series |
| * which have the most chunks in the file and use its chunk proportion to refine the rough |
| * estimation. The rough estimation is performed first, if no candidates can be found using rough |
| * estimation, we run the selection again with tight estimation. |
| * |
| * @return two lists of TsFileResource, the former is selected seqFiles and the latter is selected |
| * unseqFiles or an empty array if there are no proper candidates by the budget. |
| * @throws MergeException in task resources selection. |
| */ |
| @SuppressWarnings({"squid:S1163", "squid:S1143"}) |
| public CrossCompactionTaskResource selectOneTaskResources(CrossSpaceCompactionCandidate candidate) |
| throws MergeException { |
| if (candidate.getSeqFiles().isEmpty() || candidate.getUnseqFiles().isEmpty()) { |
| return new CrossCompactionTaskResource(); |
| } |
| try { |
| LOGGER.debug( |
| "Selecting cross compaction task resources from {} seqFile, {} unseqFiles", |
| candidate.getSeqFiles().size(), |
| candidate.getUnseqFiles().size()); |
| |
| return executeTaskResourceSelection(candidate); |
| } catch (Exception e) { |
| if (e instanceof StopReadTsFileByInterruptException || Thread.interrupted()) { |
| Thread.currentThread().interrupt(); |
| return new CrossCompactionTaskResource(); |
| } |
| throw new MergeException(e); |
| } finally { |
| compactionEstimator.cleanup(); |
| } |
| } |
| |
| public InsertionCrossCompactionTaskResource selectOneInsertionTask( |
| CrossSpaceCompactionCandidate candidate) throws MergeException { |
| if (candidate.getUnseqFileCandidates().isEmpty()) { |
| return new InsertionCrossCompactionTaskResource(); |
| } |
| InsertionCrossSpaceCompactionSelector insertionCrossSpaceCompactionSelector = |
| new InsertionCrossSpaceCompactionSelector(candidate); |
| try { |
| LOGGER.debug( |
| "Selecting insertion cross compaction task resources from {} seqFile, {} unseqFiles", |
| candidate.getSeqFiles().size(), |
| candidate.getUnseqFiles().size()); |
| InsertionCrossCompactionTaskResource result = |
| insertionCrossSpaceCompactionSelector.executeInsertionCrossSpaceCompactionTaskSelection(); |
| if (result.isValid()) { |
| return result; |
| } |
| } catch (IOException e) { |
| throw new MergeException(e); |
| } |
| return new InsertionCrossCompactionTaskResource(); |
| } |
| |
| private boolean isAllFileCandidateValid(List<TsFileResourceCandidate> tsFileResourceCandidates) { |
| for (TsFileResourceCandidate candidate : tsFileResourceCandidates) { |
| if (!candidate.isValidCandidate) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * In a preset time (30 seconds), for each unseqFile, find the list of seqFiles that overlap with |
| * it and have not been selected by the file selector of this compaction task. After finding each |
| * unseqFile and its corresponding overlap seqFile list, estimate the additional memory overhead |
| * that may be added by compacting them (preferably using the loop estimate), and if it does not |
| * exceed the memory overhead preset by the system for the compaction thread, put them into the |
| * selectedSeqFiles and selectedUnseqFiles. |
| * |
| * @throws IOException in prepare next split |
| */ |
| @SuppressWarnings("squid:S135") |
| private CrossCompactionTaskResource executeTaskResourceSelection( |
| CrossSpaceCompactionCandidate candidate) throws IOException { |
| CrossCompactionTaskResource taskResource = new CrossCompactionTaskResource(); |
| |
| while (candidate.hasNextSplit()) { |
| CrossSpaceCompactionCandidate.CrossCompactionTaskResourceSplit split = candidate.nextSplit(); |
| TsFileResource unseqFile = split.unseqFile.resource; |
| List<TsFileResource> targetSeqFiles = |
| split.seqFiles.stream().map(c -> c.resource).collect(Collectors.toList()); |
| |
| if (!split.atLeastOneSeqFileSelected) { |
| LOGGER.debug("Unseq file {} does not overlap with any seq files.", unseqFile); |
| TsFileResourceCandidate latestSealedSeqFile = |
| getLatestSealedSeqFile(candidate.getSeqFileCandidates()); |
| if (latestSealedSeqFile == null) { |
| break; |
| } |
| if (!latestSealedSeqFile.selected) { |
| targetSeqFiles.add(latestSealedSeqFile.resource); |
| latestSealedSeqFile.markAsSelected(); |
| } |
| } |
| |
| List<TsFileResource> newSelectedSeqResources = new ArrayList<>(taskResource.getSeqFiles()); |
| newSelectedSeqResources.addAll(targetSeqFiles); |
| List<TsFileResource> newSelectedUnseqResources = |
| new ArrayList<>(taskResource.getUnseqFiles()); |
| newSelectedUnseqResources.add(unseqFile); |
| |
| long memoryCost = |
| compactionEstimator.estimateCrossCompactionMemory( |
| newSelectedSeqResources, newSelectedUnseqResources); |
| if (!canAddToTaskResource(taskResource, unseqFile, targetSeqFiles, memoryCost)) { |
| break; |
| } |
| taskResource.putResources(unseqFile, targetSeqFiles, memoryCost); |
| LOGGER.debug( |
| "Adding a new unseqFile {} and seqFiles {} as candidates, new cost {}, total cost {}", |
| unseqFile, |
| targetSeqFiles, |
| memoryCost, |
| taskResource.getTotalMemoryCost()); |
| } |
| taskResource.sortSeqFiles(candidate.getSeqFiles()); |
| return taskResource; |
| } |
| |
| private TsFileResourceCandidate getLatestSealedSeqFile( |
| List<TsFileResourceCandidate> seqResourceCandidateList) { |
| for (int i = seqResourceCandidateList.size() - 1; i >= 0; i--) { |
| TsFileResourceCandidate seqResourceCandidate = seqResourceCandidateList.get(i); |
| if (seqResourceCandidate.resource.isClosed()) { |
| // We must select the latest sealed and valid seq file to compact with, in order to avoid |
| // overlapping of the new compacted files with the subsequent seq files. |
| if (seqResourceCandidate.isValidCandidate) { |
| LOGGER.debug( |
| "Select one valid seq file {} for nonOverlap unseq file to compact with.", |
| seqResourceCandidate.resource); |
| return seqResourceCandidate; |
| } |
| break; |
| } |
| } |
| return null; |
| } |
| |
| // TODO: (xingtanzjr) need to confirm whether we should strictly guarantee the conditions |
| // If we guarantee the condition strictly, the smallest collection of cross task resource may not |
| // satisfied |
| @SuppressWarnings("squid:S1135") |
| private boolean canAddToTaskResource( |
| CrossCompactionTaskResource taskResource, |
| TsFileResource unseqFile, |
| List<TsFileResource> seqFiles, |
| long memoryCost) |
| throws IOException { |
| if (memoryCost == -1) { |
| // there is file been deleted during selection |
| return false; |
| } |
| TsFileNameGenerator.TsFileName unseqFileName = |
| TsFileNameGenerator.getTsFileName(unseqFile.getTsFile().getName()); |
| // we add a hard limit for cross compaction that selected unseqFile should reach a certain size |
| // or be compacted in inner |
| // space at least once. This is used to make to improve the priority of inner compaction and |
| // avoid too much cross compaction with small files. |
| if (unseqFile.getTsFileSize() < config.getTargetCompactionFileSize() |
| && unseqFileName.getInnerCompactionCnt() < config.getMinCrossCompactionUnseqFileLevel()) { |
| return false; |
| } |
| |
| long totalFileSize = unseqFile.getTsFileSize(); |
| for (TsFileResource f : seqFiles) { |
| if (f.getTsFileSize() >= config.getTargetCompactionFileSize() * 1.5) { |
| // to avoid serious write amplification caused by cross space compaction, we restrict that |
| // seq files are no longer be compacted when the size reaches the threshold. |
| return false; |
| } |
| totalFileSize += f.getTsFileSize(); |
| } |
| |
| // currently, we must allow at least one unseqFile be selected to handle the situation that |
| // an unseqFile has huge time range but few data points. |
| // IMPORTANT: this logic is opposite to previous level control |
| if (taskResource.getUnseqFiles().isEmpty()) { |
| return true; |
| } |
| |
| return taskResource.getTotalFileNums() + 1 + seqFiles.size() <= maxCrossCompactionFileNum |
| && taskResource.getTotalFileSize() + totalFileSize <= maxCrossCompactionFileSize |
| && memoryCost < memoryBudget; |
| } |
| |
| private boolean canSubmitCrossTask( |
| List<TsFileResource> sequenceFileList, List<TsFileResource> unsequenceFileList) { |
| return !sequenceFileList.isEmpty() && !unsequenceFileList.isEmpty(); |
| } |
| |
| /** |
| * This method creates a specific file selector according to the file selection strategy of |
| * crossSpace compaction, uses the file selector to select all unseqFiles and seqFiles to be |
| * compacted under the time partition of the data region, and creates a compaction task for them. |
| * The task is put into the compactionTaskQueue of the {@link CompactionTaskManager}. |
| * |
| * @return Returns whether the file was found and submits the merge task |
| */ |
| @Override |
| public List<CrossCompactionTaskResource> selectCrossSpaceTask( |
| List<TsFileResource> sequenceFileList, List<TsFileResource> unsequenceFilelist) { |
| return selectCrossSpaceTask(sequenceFileList, unsequenceFilelist, false); |
| } |
| |
| public List<CrossCompactionTaskResource> selectInsertionCrossSpaceTask( |
| List<TsFileResource> sequenceFileList, List<TsFileResource> unsequenceFileList) { |
| return selectCrossSpaceTask(sequenceFileList, unsequenceFileList, true); |
| } |
| |
| @SuppressWarnings({"squid:S1135", "squid:S2696"}) |
| public List<CrossCompactionTaskResource> selectCrossSpaceTask( |
| List<TsFileResource> sequenceFileList, |
| List<TsFileResource> unsequenceFileList, |
| boolean isInsertionTask) { |
| // TODO: (xingtanzjr) need to confirm what this ttl is used for |
| long startTime = System.currentTimeMillis(); |
| long ttlLowerBound = System.currentTimeMillis() - Long.MAX_VALUE; |
| // we record the variable `candidate` here is used for selecting more than one |
| // CrossCompactionTaskResources in this method. |
| // Add read lock for candidate source files to avoid being deleted during the selection. |
| CrossSpaceCompactionCandidate candidate = |
| new CrossSpaceCompactionCandidate(sequenceFileList, unsequenceFileList, ttlLowerBound); |
| try { |
| CrossCompactionTaskResource taskResources; |
| if (isInsertionTask) { |
| taskResources = selectOneInsertionTask(candidate); |
| } else { |
| taskResources = selectOneTaskResources(candidate); |
| } |
| String sgDataRegionId = logicalStorageGroupName + "-" + dataRegionId; |
| if (!taskResources.isValid()) { |
| if (!hasPrintedLog) { |
| LOGGER.info( |
| "{} [{}] Total source files: {} seqFiles, {} unseqFiles. " |
| + "Candidate source files: {} seqFiles, {} unseqFiles. " |
| + "Cannot select any files because they do not meet the conditions " |
| + "or may be occupied by other compaction threads.", |
| isInsertionTask ? "InsertionCrossSpaceCompaction" : "CrossSpaceCompaction", |
| sgDataRegionId, |
| sequenceFileList.size(), |
| unsequenceFileList.size(), |
| candidate.getSeqFiles().size(), |
| candidate.getUnseqFiles().size()); |
| hasPrintedLog = true; |
| } |
| return Collections.emptyList(); |
| } |
| long timeCost = System.currentTimeMillis() - startTime; |
| LOGGER.info( |
| "{} [{}] Total source files: {} seqFiles, {} unseqFiles. " |
| + "Candidate source files: {} seqFiles, {} unseqFiles. " |
| + "Selected source files: {} seqFiles, " |
| + "{} unseqFiles, estimated memory cost {} MB, " |
| + "total selected file size is {} MB, " |
| + "total selected seq file size is {} MB, " |
| + "total selected unseq file size is {} MB, " |
| + "time consumption {}ms.", |
| sgDataRegionId, |
| isInsertionTask ? "InsertionCrossSpaceCompaction" : "CrossSpaceCompaction", |
| sequenceFileList.size(), |
| unsequenceFileList.size(), |
| candidate.getSeqFiles().size(), |
| candidate.getUnseqFiles().size(), |
| taskResources.getSeqFiles().size(), |
| taskResources.getUnseqFiles().size(), |
| (float) (taskResources.getTotalMemoryCost()) / 1024 / 1024, |
| (float) (taskResources.getTotalFileSize()) / 1024 / 1024, |
| taskResources.getTotalSeqFileSize() / 1024 / 1024, |
| taskResources.getTotalUnseqFileSize() / 1024 / 1024, |
| timeCost); |
| CompactionMetrics.getInstance() |
| .updateCompactionTaskSelectionTimeCost( |
| isInsertionTask ? CompactionTaskType.INSERTION : CompactionTaskType.CROSS, timeCost); |
| hasPrintedLog = false; |
| return Collections.singletonList(taskResources); |
| |
| } catch (MergeException e) { |
| // This exception may be caused by drop database |
| if (!tsFileManager.isAllowCompaction()) { |
| return Collections.emptyList(); |
| } |
| LOGGER.error("{} cannot select file for cross space compaction", logicalStorageGroupName, e); |
| } |
| return Collections.emptyList(); |
| } |
| |
| public static class InsertionCrossSpaceCompactionSelector { |
| |
| private List<TsFileResourceCandidate> seqFiles; |
| private List<TsFileResourceCandidate> unseqFiles; |
| |
| public InsertionCrossSpaceCompactionSelector(CrossSpaceCompactionCandidate candidate) { |
| seqFiles = candidate.getSeqFileCandidates(); |
| unseqFiles = candidate.getUnseqFileCandidates(); |
| } |
| |
| private InsertionCrossCompactionTaskResource executeInsertionCrossSpaceCompactionTaskSelection() |
| throws IOException { |
| InsertionCrossCompactionTaskResource result = new InsertionCrossCompactionTaskResource(); |
| if (unseqFiles.isEmpty()) { |
| return result; |
| } |
| if (seqFiles.isEmpty()) { |
| result.toInsertUnSeqFile = unseqFiles.get(0).resource; |
| // ensure the target position is the head of seq space |
| result.targetFileTimestamp = |
| Math.min(System.currentTimeMillis(), getTimestampInFileName(unseqFiles.get(0))); |
| } else { |
| for (TsFileResourceCandidate unseqFile : unseqFiles) { |
| // skip unseq file which is overlapped with files in seq space |
| if (!unseqFile.resource.isInsertionCompactionTaskCandidate()) { |
| continue; |
| } |
| result = selectCurrentUnSeqFile(unseqFile); |
| if (result.isValid()) { |
| break; |
| } |
| } |
| } |
| // select the first unseq file to exclude other CrossSpaceCompactionTask |
| // or InsertionCrossSpaceCompactionTask in current time partition |
| TsFileResourceCandidate firstUnseqFile = unseqFiles.get(0); |
| result.firstUnSeqFileInParitition = firstUnseqFile.resource; |
| return result; |
| } |
| |
| private InsertionCrossCompactionTaskResource selectCurrentUnSeqFile( |
| TsFileResourceCandidate unseqFile) throws IOException { |
| int previousSeqFileIndex = 0; |
| int nextSeqFileIndex = seqFiles.size(); |
| InsertionCrossCompactionTaskResource result = new InsertionCrossCompactionTaskResource(); |
| |
| boolean hasPreviousSeqFile = false; |
| for (DeviceInfo unseqDeviceInfo : unseqFile.getDevices()) { |
| IDeviceID deviceId = unseqDeviceInfo.deviceId; |
| long startTimeOfUnSeqDevice = unseqDeviceInfo.startTime; |
| long endTimeOfUnSeqDevice = unseqDeviceInfo.endTime; |
| for (int i = 0; i < seqFiles.size(); i++) { |
| TsFileResourceCandidate seqFile = seqFiles.get(i); |
| if (seqFile.unsealed()) { |
| nextSeqFileIndex = Math.min(nextSeqFileIndex, i); |
| } |
| if (!seqFile.containsDevice(deviceId)) { |
| continue; |
| } |
| DeviceInfo seqDeviceInfo = seqFile.getDeviceInfoById(deviceId); |
| long startTimeOfSeqDevice = seqDeviceInfo.startTime; |
| long endTimeOfSeqDevice = seqDeviceInfo.endTime; |
| |
| // overlap |
| if (startTimeOfUnSeqDevice <= endTimeOfSeqDevice |
| && endTimeOfUnSeqDevice >= startTimeOfSeqDevice) { |
| unseqFile.resource.setInsertionCompactionTaskCandidate(false); |
| return result; |
| } |
| |
| if (startTimeOfUnSeqDevice > endTimeOfSeqDevice) { |
| previousSeqFileIndex = Math.max(previousSeqFileIndex, i); |
| hasPreviousSeqFile = true; |
| continue; |
| } |
| nextSeqFileIndex = Math.min(nextSeqFileIndex, i); |
| break; |
| } |
| } |
| |
| // select position to insert |
| if (hasPreviousSeqFile) { |
| boolean insertLastInSeqSpace = |
| nextSeqFileIndex == seqFiles.size() && previousSeqFileIndex == seqFiles.size() - 1; |
| if (insertLastInSeqSpace) { |
| TsFileResourceCandidate prev = seqFiles.get(previousSeqFileIndex); |
| long prevTimestamp = getTimestampInFileName(prev); |
| if (prev.isValidCandidate) { |
| result.prevSeqFile = prev.resource; |
| result.targetFileTimestamp = prevTimestamp + 1; |
| result.toInsertUnSeqFile = unseqFile.resource; |
| } |
| return result; |
| } |
| // insert the TsFileResource between 'prev' and 'next' in seq space |
| for (int i = previousSeqFileIndex; |
| i < Math.min(nextSeqFileIndex, seqFiles.size() - 1); |
| i++) { |
| TsFileResourceCandidate prev = seqFiles.get(i); |
| TsFileResourceCandidate next = seqFiles.get(i + 1); |
| if (prev.isValidCandidate && next.isValidCandidate) { |
| long prevTimestamp = getTimestampInFileName(prev); |
| long nextTimestamp = getTimestampInFileName(next); |
| if (nextTimestamp - prevTimestamp > 1) { |
| result.prevSeqFile = prev.resource; |
| result.nextSeqFile = next.resource; |
| result.targetFileTimestamp = |
| prevTimestamp + Math.max(1, (nextTimestamp - prevTimestamp) / 2); |
| result.toInsertUnSeqFile = unseqFile.resource; |
| break; |
| } |
| } |
| } |
| |
| } else { |
| // insert the TsFileResource to the head of seq space |
| TsFileResourceCandidate next = seqFiles.get(0); |
| long nextTimestamp = getTimestampInFileName(next); |
| if (nextTimestamp < 1) { |
| return result; |
| } |
| result.nextSeqFile = next.resource; |
| result.targetFileTimestamp = nextTimestamp / 2; |
| result.toInsertUnSeqFile = unseqFile.resource; |
| } |
| return result; |
| } |
| |
| private long getTimestampInFileName(TsFileResourceCandidate tsFileResourceCandidate) |
| throws IOException { |
| return TsFileNameGenerator.getTsFileName( |
| tsFileResourceCandidate.resource.getTsFile().getName()) |
| .getTime(); |
| } |
| } |
| } |