| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.engine.merge.task; |
| |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.engine.compaction.TsFileManagement; |
| import org.apache.iotdb.db.engine.merge.manage.MergeContext; |
| import org.apache.iotdb.db.engine.merge.manage.MergeManager; |
| import org.apache.iotdb.db.engine.merge.manage.MergeResource; |
| import org.apache.iotdb.db.engine.merge.recover.MergeLogger; |
| import org.apache.iotdb.db.engine.merge.selector.IMergePathSelector; |
| import org.apache.iotdb.db.engine.merge.selector.NaivePathSelector; |
| import org.apache.iotdb.db.engine.modification.Modification; |
| import org.apache.iotdb.db.engine.storagegroup.TsFileResource; |
| import org.apache.iotdb.db.metadata.PartialPath; |
| import org.apache.iotdb.db.utils.MergeUtils; |
| import org.apache.iotdb.db.utils.MergeUtils.MetaListEntry; |
| import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; |
| import org.apache.iotdb.tsfile.read.TimeValuePair; |
| import org.apache.iotdb.tsfile.read.TsFileSequenceReader; |
| import org.apache.iotdb.tsfile.read.common.BatchData; |
| import org.apache.iotdb.tsfile.read.common.Chunk; |
| import org.apache.iotdb.tsfile.read.reader.IPointReader; |
| import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; |
| import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; |
| import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; |
| import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; |
| import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.PriorityQueue; |
| import java.util.TreeMap; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import static org.apache.iotdb.db.utils.MergeUtils.writeBatchPoint; |
| import static org.apache.iotdb.db.utils.MergeUtils.writeTVPair; |
| import static org.apache.iotdb.db.utils.QueryUtils.modifyChunkMetaData; |
| |
| public class MergeMultiChunkTask { |
| |
| private static final Logger logger = LoggerFactory.getLogger(MergeMultiChunkTask.class); |
| private static int minChunkPointNum = |
| IoTDBDescriptor.getInstance().getConfig().getMergeChunkPointNumberThreshold(); |
| |
| private MergeLogger mergeLogger; |
| private List<PartialPath> unmergedSeries; |
| |
| private String taskName; |
| private MergeResource resource; |
| private TimeValuePair[] currTimeValuePairs; |
| private boolean fullMerge; |
| |
| private MergeContext mergeContext; |
| |
| private AtomicInteger mergedChunkNum = new AtomicInteger(); |
| private AtomicInteger unmergedChunkNum = new AtomicInteger(); |
| private int mergedSeriesCnt; |
| private double progress; |
| |
| private int concurrentMergeSeriesNum; |
| private List<PartialPath> currMergingPaths = new ArrayList<>(); |
| // need to be cleared every device |
| private final Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>> |
| measurementChunkMetadataListMapIteratorCache = |
| new TreeMap<>( |
| (o1, o2) -> |
| TsFileManagement.compareFileName( |
| new File(o1.getFileName()), new File(o2.getFileName()))); |
| // need to be cleared every device |
| private final Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> |
| chunkMetadataListCacheForMerge = |
| new TreeMap<>( |
| (o1, o2) -> |
| TsFileManagement.compareFileName( |
| new File(o1.getFileName()), new File(o2.getFileName()))); |
| |
| private String storageGroupName; |
| |
| public MergeMultiChunkTask( |
| MergeContext context, |
| String taskName, |
| MergeLogger mergeLogger, |
| MergeResource mergeResource, |
| boolean fullMerge, |
| List<PartialPath> unmergedSeries, |
| int concurrentMergeSeriesNum, |
| String storageGroupName) { |
| this.mergeContext = context; |
| this.taskName = taskName; |
| this.mergeLogger = mergeLogger; |
| this.resource = mergeResource; |
| this.fullMerge = fullMerge; |
| this.unmergedSeries = unmergedSeries; |
| this.concurrentMergeSeriesNum = concurrentMergeSeriesNum; |
| this.storageGroupName = storageGroupName; |
| } |
| |
| void mergeSeries() throws IOException { |
| if (logger.isInfoEnabled()) { |
| logger.info("{} starts to merge {} series", taskName, unmergedSeries.size()); |
| } |
| long startTime = System.currentTimeMillis(); |
| for (TsFileResource seqFile : resource.getSeqFiles()) { |
| mergeContext.getUnmergedChunkStartTimes().put(seqFile, new HashMap<>()); |
| } |
| // merge each series and write data into each seqFile's corresponding temp merge file |
| List<List<PartialPath>> devicePaths = MergeUtils.splitPathsByDevice(unmergedSeries); |
| for (List<PartialPath> pathList : devicePaths) { |
| // TODO: use statistics of queries to better rearrange series |
| IMergePathSelector pathSelector = new NaivePathSelector(pathList, concurrentMergeSeriesNum); |
| while (pathSelector.hasNext()) { |
| currMergingPaths = pathSelector.next(); |
| mergePaths(); |
| resource.clearChunkWriterCache(); |
| if (Thread.interrupted()) { |
| logger.info("MergeMultiChunkTask {} aborted", taskName); |
| Thread.currentThread().interrupt(); |
| return; |
| } |
| mergedSeriesCnt += currMergingPaths.size(); |
| logMergeProgress(); |
| } |
| measurementChunkMetadataListMapIteratorCache.clear(); |
| chunkMetadataListCacheForMerge.clear(); |
| } |
| if (logger.isInfoEnabled()) { |
| logger.info( |
| "{} all series are merged after {}ms", taskName, System.currentTimeMillis() - startTime); |
| } |
| mergeLogger.logAllTsEnd(); |
| } |
| |
| private void logMergeProgress() { |
| if (logger.isInfoEnabled()) { |
| double newProgress = 100 * mergedSeriesCnt / (double) (unmergedSeries.size()); |
| if (newProgress - progress >= 10.0) { |
| progress = newProgress; |
| logger.info("{} has merged {}% series", taskName, progress); |
| } |
| } |
| } |
| |
| public String getProgress() { |
| return String.format("Processed %d/%d series", mergedSeriesCnt, unmergedSeries.size()); |
| } |
| |
| private void mergePaths() throws IOException { |
| mergeLogger.logTSStart(currMergingPaths); |
| IPointReader[] unseqReaders = resource.getUnseqReaders(currMergingPaths); |
| currTimeValuePairs = new TimeValuePair[currMergingPaths.size()]; |
| for (int i = 0; i < currMergingPaths.size(); i++) { |
| if (unseqReaders[i].hasNextTimeValuePair()) { |
| currTimeValuePairs[i] = unseqReaders[i].currentTimeValuePair(); |
| } |
| } |
| |
| for (int i = 0; i < resource.getSeqFiles().size(); i++) { |
| pathsMergeOneFile(i, unseqReaders); |
| |
| if (Thread.interrupted()) { |
| Thread.currentThread().interrupt(); |
| return; |
| } |
| } |
| mergeLogger.logTSEnd(); |
| } |
| |
| private void pathsMergeOneFile(int seqFileIdx, IPointReader[] unseqReaders) throws IOException { |
| TsFileResource currTsFile = resource.getSeqFiles().get(seqFileIdx); |
| String deviceId = currMergingPaths.get(0).getDevice(); |
| long currDeviceMinTime = currTsFile.getStartTime(deviceId); |
| // all paths in one call are from the same device |
| if (currDeviceMinTime == Long.MAX_VALUE) { |
| return; |
| } |
| |
| for (PartialPath path : currMergingPaths) { |
| mergeContext.getUnmergedChunkStartTimes().get(currTsFile).put(path, new ArrayList<>()); |
| } |
| |
| // if this TsFile receives data later than fileLimitTime, it will overlap the next TsFile, |
| // which is forbidden |
| for (TimeValuePair timeValuePair : currTimeValuePairs) { |
| if (timeValuePair != null && timeValuePair.getTimestamp() < currDeviceMinTime) { |
| currDeviceMinTime = timeValuePair.getTimestamp(); |
| } |
| } |
| boolean isLastFile = seqFileIdx + 1 == resource.getSeqFiles().size(); |
| |
| TsFileSequenceReader fileSequenceReader = resource.getFileReader(currTsFile); |
| List<Modification>[] modifications = new List[currMergingPaths.size()]; |
| List<ChunkMetadata>[] seqChunkMeta = new List[currMergingPaths.size()]; |
| Iterator<Map<String, List<ChunkMetadata>>> measurementChunkMetadataListMapIterator = |
| measurementChunkMetadataListMapIteratorCache.computeIfAbsent( |
| fileSequenceReader, |
| (tsFileSequenceReader -> { |
| try { |
| return tsFileSequenceReader.getMeasurementChunkMetadataListMapIterator(deviceId); |
| } catch (IOException e) { |
| logger.error( |
| "unseq compaction task {}, getMeasurementChunkMetadataListMapIterator meets error. iterator create failed.", |
| taskName, |
| e); |
| return null; |
| } |
| })); |
| if (measurementChunkMetadataListMapIterator == null) { |
| return; |
| } |
| |
| String lastSensor = currMergingPaths.get(currMergingPaths.size() - 1).getMeasurement(); |
| String currSensor = null; |
| Map<String, List<ChunkMetadata>> measurementChunkMetadataListMap = new TreeMap<>(); |
| // find all sensor to merge in order, if exceed, then break |
| while (currSensor == null || currSensor.compareTo(lastSensor) < 0) { |
| measurementChunkMetadataListMap = |
| chunkMetadataListCacheForMerge.computeIfAbsent( |
| fileSequenceReader, tsFileSequenceReader -> new TreeMap<>()); |
| // if empty, get measurementChunkMetadataList block to use later |
| if (measurementChunkMetadataListMap.isEmpty()) { |
| // if do not have more sensor, just break |
| if (measurementChunkMetadataListMapIterator.hasNext()) { |
| measurementChunkMetadataListMap.putAll(measurementChunkMetadataListMapIterator.next()); |
| } else { |
| break; |
| } |
| } |
| |
| Iterator<Entry<String, List<ChunkMetadata>>> measurementChunkMetadataListEntryIterator = |
| measurementChunkMetadataListMap.entrySet().iterator(); |
| while (measurementChunkMetadataListEntryIterator.hasNext()) { |
| Entry<String, List<ChunkMetadata>> measurementChunkMetadataListEntry = |
| measurementChunkMetadataListEntryIterator.next(); |
| currSensor = measurementChunkMetadataListEntry.getKey(); |
| |
| // fill modifications and seqChunkMetas to be used later |
| for (int i = 0; i < currMergingPaths.size(); i++) { |
| if (currMergingPaths.get(i).getMeasurement().equals(currSensor)) { |
| modifications[i] = resource.getModifications(currTsFile, currMergingPaths.get(i)); |
| seqChunkMeta[i] = measurementChunkMetadataListEntry.getValue(); |
| modifyChunkMetaData(seqChunkMeta[i], modifications[i]); |
| for (ChunkMetadata chunkMetadata : seqChunkMeta[i]) { |
| resource.updateStartTime(currTsFile, deviceId, chunkMetadata.getStartTime()); |
| resource.updateEndTime(currTsFile, deviceId, chunkMetadata.getEndTime()); |
| } |
| |
| if (Thread.interrupted()) { |
| Thread.currentThread().interrupt(); |
| return; |
| } |
| break; |
| } |
| } |
| |
| // current sensor larger than last needed sensor, just break out to outer loop |
| if (currSensor.compareTo(lastSensor) > 0) { |
| break; |
| } else { |
| measurementChunkMetadataListEntryIterator.remove(); |
| } |
| } |
| } |
| // update measurementChunkMetadataListMap |
| chunkMetadataListCacheForMerge.put(fileSequenceReader, measurementChunkMetadataListMap); |
| |
| List<Integer> unskippedPathIndices = filterNoDataPaths(seqChunkMeta, seqFileIdx); |
| |
| if (unskippedPathIndices.isEmpty()) { |
| return; |
| } |
| |
| RestorableTsFileIOWriter mergeFileWriter = resource.getMergeFileWriter(currTsFile); |
| for (PartialPath path : currMergingPaths) { |
| MeasurementSchema schema = resource.getSchema(path); |
| mergeFileWriter.addSchema(path, schema); |
| } |
| // merge unseq data with seq data in this file or small chunks in this file into a larger chunk |
| mergeFileWriter.startChunkGroup(deviceId); |
| boolean dataWritten = |
| mergeChunks( |
| seqChunkMeta, |
| isLastFile, |
| fileSequenceReader, |
| unseqReaders, |
| mergeFileWriter, |
| currTsFile); |
| if (dataWritten) { |
| mergeFileWriter.endChunkGroup(); |
| mergeLogger.logFilePosition(mergeFileWriter.getFile()); |
| currTsFile.updateStartTime(deviceId, currDeviceMinTime); |
| } |
| } |
| |
| private List<Integer> filterNoDataPaths(List[] seqChunkMeta, int seqFileIdx) { |
| // if the last seqFile does not contains this series but the unseqFiles do, data of this |
| // series should also be written into a new chunk |
| List<Integer> ret = new ArrayList<>(); |
| for (int i = 0; i < currMergingPaths.size(); i++) { |
| if (seqChunkMeta[i] == null |
| || seqChunkMeta[i].isEmpty() |
| && !(seqFileIdx + 1 == resource.getSeqFiles().size() |
| && currTimeValuePairs[i] != null)) { |
| continue; |
| } |
| ret.add(i); |
| } |
| return ret; |
| } |
| |
| @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning |
| private boolean mergeChunks( |
| List<ChunkMetadata>[] seqChunkMeta, |
| boolean isLastFile, |
| TsFileSequenceReader reader, |
| IPointReader[] unseqReaders, |
| RestorableTsFileIOWriter mergeFileWriter, |
| TsFileResource currFile) |
| throws IOException { |
| int[] ptWrittens = new int[seqChunkMeta.length]; |
| int mergeChunkSubTaskNum = |
| IoTDBDescriptor.getInstance().getConfig().getMergeChunkSubThreadNum(); |
| MetaListEntry[] metaListEntries = new MetaListEntry[currMergingPaths.size()]; |
| PriorityQueue<Integer>[] chunkIdxHeaps = new PriorityQueue[mergeChunkSubTaskNum]; |
| |
| // if merge path is smaller than mergeChunkSubTaskNum, will use merge path number. |
| // so thread are not wasted. |
| if (currMergingPaths.size() < mergeChunkSubTaskNum) { |
| mergeChunkSubTaskNum = currMergingPaths.size(); |
| } |
| |
| for (int i = 0; i < mergeChunkSubTaskNum; i++) { |
| chunkIdxHeaps[i] = new PriorityQueue<>(); |
| } |
| int idx = 0; |
| for (int i = 0; i < currMergingPaths.size(); i++) { |
| chunkIdxHeaps[idx % mergeChunkSubTaskNum].add(i); |
| if (seqChunkMeta[i] == null || seqChunkMeta[i].isEmpty()) { |
| continue; |
| } |
| |
| MetaListEntry entry = new MetaListEntry(i, seqChunkMeta[i]); |
| entry.next(); |
| metaListEntries[i] = entry; |
| idx++; |
| ptWrittens[i] = 0; |
| } |
| |
| mergedChunkNum.set(0); |
| unmergedChunkNum.set(0); |
| |
| List<Future<Void>> futures = new ArrayList<>(); |
| for (int i = 0; i < mergeChunkSubTaskNum; i++) { |
| futures.add( |
| MergeManager.getINSTANCE() |
| .submitChunkSubTask( |
| new MergeChunkHeapTask( |
| chunkIdxHeaps[i], |
| metaListEntries, |
| ptWrittens, |
| reader, |
| mergeFileWriter, |
| unseqReaders, |
| currFile, |
| isLastFile, |
| i))); |
| |
| if (Thread.interrupted()) { |
| Thread.currentThread().interrupt(); |
| return false; |
| } |
| } |
| for (int i = 0; i < mergeChunkSubTaskNum; i++) { |
| try { |
| futures.get(i).get(); |
| } catch (InterruptedException e) { |
| logger.error("MergeChunkHeapTask interrupted", e); |
| Thread.currentThread().interrupt(); |
| return false; |
| } catch (ExecutionException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| // add merge and unmerged chunk statistic |
| mergeContext |
| .getMergedChunkCnt() |
| .compute( |
| currFile, |
| (tsFileResource, anInt) -> |
| anInt == null ? mergedChunkNum.get() : anInt + mergedChunkNum.get()); |
| mergeContext |
| .getUnmergedChunkCnt() |
| .compute( |
| currFile, |
| (tsFileResource, anInt) -> |
| anInt == null ? unmergedChunkNum.get() : anInt + unmergedChunkNum.get()); |
| |
| return mergedChunkNum.get() > 0; |
| } |
| |
| /** |
| * merge a sequence chunk SK |
| * |
| * <p>1. no need to write the chunk to .merge file when: isn't full merge & there isn't unclosed |
| * chunk before & SK is big enough & SK isn't overflowed & SK isn't modified |
| * |
| * <p> |
| * |
| * <p>2. write SK to .merge.file without compressing when: is full merge & there isn't unclosed |
| * chunk before & SK is big enough & SK isn't overflowed & SK isn't modified |
| * |
| * <p>3. other cases: need to unCompress the chunk and write 3.1 SK isn't overflowed 3.2 SK is |
| * overflowed |
| */ |
| @SuppressWarnings("java:S2445") // avoid writing the same writer concurrently |
| private int mergeChunkV2( |
| ChunkMetadata currMeta, |
| boolean chunkOverflowed, |
| boolean chunkTooSmall, |
| Chunk chunk, |
| int lastUnclosedChunkPoint, |
| int pathIdx, |
| TsFileIOWriter mergeFileWriter, |
| IPointReader unseqReader, |
| IChunkWriter chunkWriter, |
| TsFileResource currFile) |
| throws IOException { |
| |
| int unclosedChunkPoint = lastUnclosedChunkPoint; |
| boolean chunkModified = |
| (currMeta.getDeleteIntervalList() != null && !currMeta.getDeleteIntervalList().isEmpty()); |
| |
| // no need to write the chunk to .merge file |
| if (!fullMerge |
| && lastUnclosedChunkPoint == 0 |
| && !chunkTooSmall |
| && !chunkOverflowed |
| && !chunkModified) { |
| unmergedChunkNum.incrementAndGet(); |
| mergeContext |
| .getUnmergedChunkStartTimes() |
| .get(currFile) |
| .get(currMergingPaths.get(pathIdx)) |
| .add(currMeta.getStartTime()); |
| return 0; |
| } |
| |
| // write SK to .merge.file without compressing |
| if (fullMerge |
| && lastUnclosedChunkPoint == 0 |
| && !chunkTooSmall |
| && !chunkOverflowed |
| && !chunkModified) { |
| synchronized (mergeFileWriter) { |
| mergeFileWriter.writeChunk(chunk, currMeta); |
| } |
| mergeContext.incTotalPointWritten(currMeta.getNumOfPoints()); |
| mergeContext.incTotalChunkWritten(); |
| mergedChunkNum.incrementAndGet(); |
| return 0; |
| } |
| |
| // 3.1 SK isn't overflowed, just uncompress and write sequence chunk |
| if (!chunkOverflowed) { |
| unclosedChunkPoint += MergeUtils.writeChunkWithoutUnseq(chunk, chunkWriter); |
| mergedChunkNum.incrementAndGet(); |
| } else { |
| // 3.2 SK is overflowed, uncompress sequence chunk and merge with unseq chunk, then write |
| unclosedChunkPoint += |
| writeChunkWithUnseq(chunk, chunkWriter, unseqReader, currMeta.getEndTime(), pathIdx); |
| mergedChunkNum.incrementAndGet(); |
| } |
| |
| // update points written statistics |
| mergeContext.incTotalPointWritten((long) unclosedChunkPoint - lastUnclosedChunkPoint); |
| if (minChunkPointNum > 0 && unclosedChunkPoint >= minChunkPointNum |
| || unclosedChunkPoint > 0 && minChunkPointNum < 0) { |
| // the new chunk's size is large enough and it should be flushed |
| synchronized (mergeFileWriter) { |
| chunkWriter.writeToFileWriter(mergeFileWriter); |
| } |
| unclosedChunkPoint = 0; |
| } |
| return unclosedChunkPoint; |
| } |
| |
| private int writeRemainingUnseq( |
| IChunkWriter chunkWriter, IPointReader unseqReader, long timeLimit, int pathIdx) |
| throws IOException { |
| int ptWritten = 0; |
| while (currTimeValuePairs[pathIdx] != null |
| && currTimeValuePairs[pathIdx].getTimestamp() < timeLimit) { |
| writeTVPair(currTimeValuePairs[pathIdx], chunkWriter); |
| ptWritten++; |
| unseqReader.nextTimeValuePair(); |
| currTimeValuePairs[pathIdx] = |
| unseqReader.hasNextTimeValuePair() ? unseqReader.currentTimeValuePair() : null; |
| } |
| return ptWritten; |
| } |
| |
| private int writeChunkWithUnseq( |
| Chunk chunk, |
| IChunkWriter chunkWriter, |
| IPointReader unseqReader, |
| long chunkLimitTime, |
| int pathIdx) |
| throws IOException { |
| int cnt = 0; |
| ChunkReader chunkReader = new ChunkReader(chunk, null); |
| while (chunkReader.hasNextSatisfiedPage()) { |
| BatchData batchData = chunkReader.nextPageData(); |
| cnt += mergeWriteBatch(batchData, chunkWriter, unseqReader, pathIdx); |
| } |
| cnt += writeRemainingUnseq(chunkWriter, unseqReader, chunkLimitTime, pathIdx); |
| return cnt; |
| } |
| |
| private int mergeWriteBatch( |
| BatchData batchData, IChunkWriter chunkWriter, IPointReader unseqReader, int pathIdx) |
| throws IOException { |
| int cnt = 0; |
| for (int i = 0; i < batchData.length(); i++) { |
| long time = batchData.getTimeByIndex(i); |
| // merge data in batch and data in unseqReader |
| boolean overwriteSeqPoint = false; |
| // unseq point.time <= sequence point.time, write unseq point |
| while (currTimeValuePairs[pathIdx] != null |
| && currTimeValuePairs[pathIdx].getTimestamp() <= time) { |
| writeTVPair(currTimeValuePairs[pathIdx], chunkWriter); |
| if (currTimeValuePairs[pathIdx].getTimestamp() == time) { |
| overwriteSeqPoint = true; |
| } |
| unseqReader.nextTimeValuePair(); |
| currTimeValuePairs[pathIdx] = |
| unseqReader.hasNextTimeValuePair() ? unseqReader.currentTimeValuePair() : null; |
| cnt++; |
| } |
| // unseq point.time > sequence point.time, write seq point |
| if (!overwriteSeqPoint) { |
| writeBatchPoint(batchData, i, chunkWriter); |
| cnt++; |
| } |
| } |
| return cnt; |
| } |
| |
| public class MergeChunkHeapTask implements Callable<Void> { |
| |
| private PriorityQueue<Integer> chunkIdxHeap; |
| private MetaListEntry[] metaListEntries; |
| private int[] ptWrittens; |
| private TsFileSequenceReader reader; |
| private RestorableTsFileIOWriter mergeFileWriter; |
| private IPointReader[] unseqReaders; |
| private TsFileResource currFile; |
| private boolean isLastFile; |
| private int taskNum; |
| |
| private int totalSeriesNum; |
| |
| public MergeChunkHeapTask( |
| PriorityQueue<Integer> chunkIdxHeap, |
| MetaListEntry[] metaListEntries, |
| int[] ptWrittens, |
| TsFileSequenceReader reader, |
| RestorableTsFileIOWriter mergeFileWriter, |
| IPointReader[] unseqReaders, |
| TsFileResource currFile, |
| boolean isLastFile, |
| int taskNum) { |
| this.chunkIdxHeap = chunkIdxHeap; |
| this.metaListEntries = metaListEntries; |
| this.ptWrittens = ptWrittens; |
| this.reader = reader; |
| this.mergeFileWriter = mergeFileWriter; |
| this.unseqReaders = unseqReaders; |
| this.currFile = currFile; |
| this.isLastFile = isLastFile; |
| this.taskNum = taskNum; |
| this.totalSeriesNum = chunkIdxHeap.size(); |
| } |
| |
| @Override |
| public Void call() throws Exception { |
| mergeChunkHeap(); |
| return null; |
| } |
| |
| @SuppressWarnings("java:S2445") // avoid reading the same reader concurrently |
| private void mergeChunkHeap() throws IOException { |
| while (!chunkIdxHeap.isEmpty()) { |
| int pathIdx = chunkIdxHeap.poll(); |
| PartialPath path = currMergingPaths.get(pathIdx); |
| MeasurementSchema measurementSchema = resource.getSchema(path); |
| IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema); |
| if (Thread.interrupted()) { |
| Thread.currentThread().interrupt(); |
| return; |
| } |
| |
| if (metaListEntries[pathIdx] != null) { |
| MetaListEntry metaListEntry = metaListEntries[pathIdx]; |
| ChunkMetadata currMeta = metaListEntry.current(); |
| boolean isLastChunk = !metaListEntry.hasNext(); |
| boolean chunkOverflowed = |
| MergeUtils.isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta); |
| boolean chunkTooSmall = |
| MergeUtils.isChunkTooSmall( |
| ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum); |
| |
| Chunk chunk; |
| synchronized (reader) { |
| chunk = reader.readMemChunk(currMeta); |
| } |
| ptWrittens[pathIdx] = |
| mergeChunkV2( |
| currMeta, |
| chunkOverflowed, |
| chunkTooSmall, |
| chunk, |
| ptWrittens[pathIdx], |
| pathIdx, |
| mergeFileWriter, |
| unseqReaders[pathIdx], |
| chunkWriter, |
| currFile); |
| |
| if (!isLastChunk) { |
| metaListEntry.next(); |
| chunkIdxHeap.add(pathIdx); |
| continue; |
| } |
| } |
| // this only happens when the seqFiles do not contain this series, otherwise the remaining |
| // data will be merged with the last chunk in the seqFiles |
| if (isLastFile && currTimeValuePairs[pathIdx] != null) { |
| ptWrittens[pathIdx] += |
| writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx], Long.MAX_VALUE, pathIdx); |
| mergedChunkNum.incrementAndGet(); |
| } |
| // the last merged chunk may still be smaller than the threshold, flush it anyway |
| if (ptWrittens[pathIdx] > 0) { |
| synchronized (mergeFileWriter) { |
| chunkWriter.writeToFileWriter(mergeFileWriter); |
| } |
| } |
| } |
| } |
| |
| public String getStorageGroupName() { |
| return storageGroupName; |
| } |
| |
| public String getTaskName() { |
| return taskName + "_" + taskNum; |
| } |
| |
| public String getProgress() { |
| return String.format( |
| "Processed %d/%d series", totalSeriesNum - chunkIdxHeap.size(), totalSeriesNum); |
| } |
| } |
| } |