blob: d115a11afacf486f359cc43760634dc6c59f696f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.merge.selector;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* MaxFileMergeFileSelector selects the most files from given seqFiles and unseqFiles which can be
* merged without exceeding given memory budget. It always assume the number of timeseries being
* queried at the same time is 1 to maximize the number of file merged.
*/
public class MaxFileMergeFileSelector implements IMergeFileSelector {
private static final Logger logger = LoggerFactory.getLogger(MaxFileMergeFileSelector.class);
private static final String LOG_FILE_COST = "Memory cost of file {} is {}";
MergeResource resource;
long totalCost;
private long memoryBudget;
private long maxSeqFileCost;
// the number of timeseries being queried at the same time
int concurrentMergeNum = 1;
/**
* Total metadata size of each file.
*/
private Map<TsFileResource, Long> fileMetaSizeMap = new HashMap<>();
/**
* Maximum memory cost of querying a timeseries in each file.
*/
private Map<TsFileResource, Long> maxSeriesQueryCostMap = new HashMap<>();
List<TsFileResource> selectedUnseqFiles;
List<TsFileResource> selectedSeqFiles;
private Collection<Integer> tmpSelectedSeqFiles;
private long tempMaxSeqFileCost;
private boolean[] seqSelected;
private int seqSelectedNum;
public MaxFileMergeFileSelector(MergeResource resource, long memoryBudget) {
this.resource = resource;
this.memoryBudget = memoryBudget;
}
/**
* Select merge candidates from seqFiles and unseqFiles under the given memoryBudget. This process
* iteratively adds the next unseqFile from unseqFiles and its overlapping seqFiles as newly-added
* candidates and computes their estimated memory cost. If the current cost pluses the new cost is
* still under the budget, accept the unseqFile and the seqFiles as candidates, otherwise go to
* the next iteration. The memory cost of a file is calculated in two ways: The rough estimation:
* for a seqFile, the size of its metadata is used for estimation. Since in the worst case, the
* file only contains one timeseries and all its metadata will be loaded into memory with at most
* one actual data chunk (which is negligible) and writing the timeseries into a new file generate
* metadata of the similar size, so the size of all seqFiles' metadata (generated when writing new
* chunks) pluses the largest one (loaded when reading a timeseries from the seqFiles) is the
* total estimation of all seqFiles; for an unseqFile, since the merge reader may read all chunks
* of a series to perform a merge read, the whole file may be loaded into memory, so we use the
* file's length as the maximum estimation. The tight estimation: based on the rough estimation,
* we scan the file's metadata to count the number of chunks for each series, find the series
* which have the most chunks in the file and use its chunk proportion to refine the rough
* estimation. The rough estimation is performed first, if no candidates can be found using rough
* estimation, we run the selection again with tight estimation.
*
* @return two lists of TsFileResource, the former is selected seqFiles and the latter is selected
* unseqFiles or an empty array if there are no proper candidates by the budget.
*/
@Override
public List[] select() throws MergeException {
long startTime = System.currentTimeMillis();
try {
logger.info("Selecting merge candidates from {} seqFile, {} unseqFiles",
resource.getSeqFiles().size(), resource.getUnseqFiles().size());
select(false);
if (selectedUnseqFiles.isEmpty()) {
select(true);
}
resource.setSeqFiles(selectedSeqFiles);
resource.setUnseqFiles(selectedUnseqFiles);
resource.removeOutdatedSeqReaders();
if (selectedUnseqFiles.isEmpty()) {
logger.info("No merge candidates are found");
return new List[0];
}
} catch (IOException e) {
throw new MergeException(e);
}
if (logger.isInfoEnabled()) {
logger.info("Selected merge candidates, {} seqFiles, {} unseqFiles, total memory cost {}, "
+ "time consumption {}ms",
selectedSeqFiles.size(), selectedUnseqFiles.size(), totalCost,
System.currentTimeMillis() - startTime);
}
return new List[]{selectedSeqFiles, selectedUnseqFiles};
}
void select(boolean useTightBound) throws IOException {
tmpSelectedSeqFiles = new HashSet<>();
seqSelected = new boolean[resource.getSeqFiles().size()];
seqSelectedNum = 0;
selectedSeqFiles = new ArrayList<>();
selectedUnseqFiles = new ArrayList<>();
maxSeqFileCost = 0;
tempMaxSeqFileCost = 0;
totalCost = 0;
int unseqIndex = 0;
long startTime = System.currentTimeMillis();
long timeConsumption = 0;
long timeLimit = IoTDBDescriptor.getInstance().getConfig().getMergeFileSelectionTimeBudget();
if (timeLimit < 0) {
timeLimit = Long.MAX_VALUE;
}
while (unseqIndex < resource.getUnseqFiles().size() && timeConsumption < timeLimit) {
// select next unseq files
TsFileResource unseqFile = resource.getUnseqFiles().get(unseqIndex);
if (seqSelectedNum != resource.getSeqFiles().size() && !UpgradeUtils
.isNeedUpgrade(unseqFile)) {
selectOverlappedSeqFiles(unseqFile);
}
boolean isClosed = checkClosed(unseqFile);
if (!isClosed) {
tmpSelectedSeqFiles.clear();
unseqIndex++;
timeConsumption = System.currentTimeMillis() - startTime;
continue;
}
tempMaxSeqFileCost = maxSeqFileCost;
long newCost = useTightBound ? calculateTightMemoryCost(unseqFile, tmpSelectedSeqFiles,
startTime, timeLimit) :
calculateLooseMemoryCost(unseqFile, tmpSelectedSeqFiles, startTime, timeLimit);
updateSelectedFiles(newCost, unseqFile);
tmpSelectedSeqFiles.clear();
unseqIndex++;
timeConsumption = System.currentTimeMillis() - startTime;
}
for (int i = 0; i < seqSelected.length; i++) {
if (seqSelected[i]) {
selectedSeqFiles.add(resource.getSeqFiles().get(i));
}
}
}
private void updateSelectedFiles(long newCost, TsFileResource unseqFile) {
if (totalCost + newCost < memoryBudget) {
selectedUnseqFiles.add(unseqFile);
maxSeqFileCost = tempMaxSeqFileCost;
for (Integer seqIdx : tmpSelectedSeqFiles) {
seqSelected[seqIdx] = true;
seqSelectedNum++;
}
totalCost += newCost;
logger.debug("Adding a new unseqFile {} and seqFiles {} as candidates, new cost {}, total"
+ " cost {}",
unseqFile, tmpSelectedSeqFiles, newCost, totalCost);
}
}
private boolean checkClosed(TsFileResource unseqFile) {
boolean isClosed = unseqFile.isClosed();
if (!isClosed) {
return false;
}
for (Integer seqIdx : tmpSelectedSeqFiles) {
if (!resource.getSeqFiles().get(seqIdx).isClosed()) {
isClosed = false;
break;
}
}
return isClosed;
}
private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
int tmpSelectedNum = 0;
for (Entry<String, Integer> deviceStartTimeEntry : unseqFile.getDeviceToIndexMap().entrySet()) {
String deviceId = deviceStartTimeEntry.getKey();
int deviceIndex = deviceStartTimeEntry.getValue();
long unseqStartTime = unseqFile.getStartTime(deviceIndex);
long unseqEndTime = unseqFile.getEndTime(deviceIndex);
boolean noMoreOverlap = false;
for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap; i++) {
TsFileResource seqFile = resource.getSeqFiles().get(i);
if (seqSelected[i] || !seqFile.getDeviceToIndexMap().containsKey(deviceId)) {
continue;
}
long seqEndTime = seqFile.getEndTime(deviceId);
if (unseqEndTime <= seqEndTime) {
// the unseqFile overlaps current seqFile
tmpSelectedSeqFiles.add(i);
tmpSelectedNum++;
// the device of the unseqFile can not merge with later seqFiles
noMoreOverlap = true;
} else if (unseqStartTime <= seqEndTime) {
// the device of the unseqFile may merge with later seqFiles
// and the unseqFile overlaps current seqFile
tmpSelectedSeqFiles.add(i);
tmpSelectedNum++;
}
}
if (tmpSelectedNum + seqSelectedNum == resource.getSeqFiles().size()) {
break;
}
}
}
private long calculateMemoryCost(TsFileResource tmpSelectedUnseqFile,
Collection<Integer> tmpSelectedSeqFiles, IFileQueryMemMeasurement unseqMeasurement,
IFileQueryMemMeasurement seqMeasurement, long startTime, long timeLimit) throws IOException {
long cost = 0;
Long fileCost = unseqMeasurement.measure(tmpSelectedUnseqFile);
cost += fileCost;
for (Integer seqFileIdx : tmpSelectedSeqFiles) {
TsFileResource seqFile = resource.getSeqFiles().get(seqFileIdx);
fileCost = seqMeasurement.measure(seqFile);
if (fileCost > tempMaxSeqFileCost) {
// only one file will be read at the same time, so only the largest one is recorded here
cost -= tempMaxSeqFileCost;
cost += fileCost;
tempMaxSeqFileCost = fileCost;
}
// but writing data into a new file may generate the same amount of metadata in memory
cost += calculateMetadataSize(seqFile);
long timeConsumption = System.currentTimeMillis() - startTime;
if (timeConsumption > timeLimit) {
return Long.MAX_VALUE;
}
}
return cost;
}
private long calculateLooseMemoryCost(TsFileResource tmpSelectedUnseqFile,
Collection<Integer> tmpSelectedSeqFiles, long startTime, long timeLimit) throws IOException {
return calculateMemoryCost(tmpSelectedUnseqFile, tmpSelectedSeqFiles,
TsFileResource::getTsFileSize, this::calculateMetadataSize, startTime, timeLimit);
}
private long calculateTightMemoryCost(TsFileResource tmpSelectedUnseqFile,
Collection<Integer> tmpSelectedSeqFiles, long startTime, long timeLimit) throws IOException {
return calculateMemoryCost(tmpSelectedUnseqFile, tmpSelectedSeqFiles,
this::calculateTightUnseqMemoryCost, this::calculateTightSeqMemoryCost, startTime,
timeLimit);
}
private long calculateMetadataSize(TsFileResource seqFile) throws IOException {
Long cost = fileMetaSizeMap.get(seqFile);
if (cost == null) {
cost = MergeUtils.getFileMetaSize(seqFile, resource.getFileReader(seqFile));
fileMetaSizeMap.put(seqFile, cost);
logger.debug(LOG_FILE_COST, seqFile, cost);
}
return cost;
}
private long calculateTightFileMemoryCost(TsFileResource seqFile,
IFileQueryMemMeasurement measurement)
throws IOException {
Long cost = maxSeriesQueryCostMap.get(seqFile);
if (cost == null) {
long[] chunkNums = MergeUtils
.findTotalAndLargestSeriesChunkNum(seqFile, resource.getFileReader(seqFile));
long totalChunkNum = chunkNums[0];
long maxChunkNum = chunkNums[1];
cost = measurement.measure(seqFile) * maxChunkNum / totalChunkNum;
maxSeriesQueryCostMap.put(seqFile, cost);
logger.debug(LOG_FILE_COST, seqFile, cost);
}
return cost;
}
// this method traverses all ChunkMetadata to find out which series has the most chunks and uses
// its proportion to all series to get a maximum estimation
private long calculateTightSeqMemoryCost(TsFileResource seqFile) throws IOException {
long singleSeriesCost = calculateTightFileMemoryCost(seqFile, this::calculateMetadataSize);
long multiSeriesCost = concurrentMergeNum * singleSeriesCost;
long maxCost = calculateMetadataSize(seqFile);
return multiSeriesCost > maxCost ? maxCost : multiSeriesCost;
}
// this method traverses all ChunkMetadata to find out which series has the most chunks and uses
// its proportion among all series to get a maximum estimation
private long calculateTightUnseqMemoryCost(TsFileResource unseqFile) throws IOException {
long singleSeriesCost = calculateTightFileMemoryCost(unseqFile, TsFileResource::getTsFileSize);
long multiSeriesCost = concurrentMergeNum * singleSeriesCost;
long maxCost = unseqFile.getTsFileSize();
return multiSeriesCost > maxCost ? maxCost : multiSeriesCost;
}
@Override
public int getConcurrentMergeNum() {
return concurrentMergeNum;
}
}