blob: 4e6f6fb6f71d24ec78f3b5ad1634ea37d84e0322 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.merge.manage;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.reader.resource.CachedUnseqResourceMergeReader;
import org.apache.iotdb.db.utils.MergeUtils;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
/**
* MergeResource manages files and caches of readers, writers, MeasurementSchemas and modifications
* to avoid unnecessary object creations and file openings.
*/
public class MergeResource {
private List<TsFileResource> seqFiles;
private List<TsFileResource> unseqFiles;
private Map<TsFileResource, TsFileSequenceReader> fileReaderCache = new HashMap<>();
private Map<TsFileResource, RestorableTsFileIOWriter> fileWriterCache = new HashMap<>();
private Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>();
private Map<TsFileResource, Map<String, Pair<Long, Long>>> startEndTimeCache =
new HashMap<>(); // pair<startTime, endTime>
private Map<PartialPath, IMeasurementSchema> measurementSchemaMap =
new HashMap<>(); // is this too waste?
private Map<IMeasurementSchema, IChunkWriter> chunkWriterCache = new ConcurrentHashMap<>();
private long timeLowerBound = Long.MIN_VALUE;
private boolean cacheDeviceMeta = false;
public MergeResource(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
this.seqFiles = seqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
this.unseqFiles = unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
}
/** If returns true, it means to participate in the merge */
private boolean filterResource(TsFileResource res) {
return res.getTsFile().exists()
&& !res.isDeleted()
&& (!res.isClosed() || res.stillLives(timeLowerBound));
}
public MergeResource(
Collection<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, long timeLowerBound) {
this.timeLowerBound = timeLowerBound;
this.seqFiles = seqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
this.unseqFiles = unseqFiles.stream().filter(this::filterResource).collect(Collectors.toList());
}
public void clear() throws IOException {
for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) {
sequenceReader.close();
}
for (RestorableTsFileIOWriter writer : fileWriterCache.values()) {
writer.close();
}
fileReaderCache.clear();
fileWriterCache.clear();
modificationCache.clear();
measurementSchemaMap.clear();
chunkWriterCache.clear();
}
public IMeasurementSchema getSchema(PartialPath path) {
return measurementSchemaMap.get(path);
}
/**
* Construct a new or get an existing RestorableTsFileIOWriter of a merge temp file for a SeqFile.
* The path of the merge temp file will be the seqFile's + ".merge".
*
* @return A RestorableTsFileIOWriter of a merge temp file for a SeqFile.
*/
public RestorableTsFileIOWriter getMergeFileWriter(TsFileResource resource) throws IOException {
RestorableTsFileIOWriter writer = fileWriterCache.get(resource);
if (writer == null) {
writer =
new RestorableTsFileIOWriter(
FSFactoryProducer.getFSFactory().getFile(resource.getTsFilePath() + MERGE_SUFFIX));
fileWriterCache.put(resource, writer);
}
return writer;
}
/**
* Query ChunkMetadata of a timeseries from the given TsFile (seq or unseq). The ChunkMetadata is
* not cached since it is usually huge.
*
* @param path name of the time series
*/
public List<ChunkMetadata> queryChunkMetadata(PartialPath path, TsFileResource seqFile)
throws IOException {
TsFileSequenceReader sequenceReader = getFileReader(seqFile);
return sequenceReader.getChunkMetadataList(path);
}
/**
* Construct the a new or get an existing TsFileSequenceReader of a TsFile.
*
* @return a TsFileSequenceReader
*/
public TsFileSequenceReader getFileReader(TsFileResource tsFileResource) throws IOException {
TsFileSequenceReader reader = fileReaderCache.get(tsFileResource);
if (reader == null) {
reader = new TsFileSequenceReader(tsFileResource.getTsFilePath(), true, cacheDeviceMeta);
fileReaderCache.put(tsFileResource, reader);
}
return reader;
}
/**
* Construct UnseqResourceMergeReaders of for each timeseries over all seqFiles. The readers are
* not cached since the method is only called once for each timeseries.
*
* @param paths names of the timeseries
* @return an array of UnseqResourceMergeReaders each corresponding to a timeseries in paths
*/
public IPointReader[] getUnseqReaders(List<PartialPath> paths) throws IOException {
List<Chunk>[] pathChunks = MergeUtils.collectUnseqChunks(paths, unseqFiles, this);
IPointReader[] ret = new IPointReader[paths.size()];
for (int i = 0; i < paths.size(); i++) {
TSDataType dataType = getSchema(paths.get(i)).getType();
ret[i] = new CachedUnseqResourceMergeReader(pathChunks[i], dataType);
}
return ret;
}
/**
* Construct the a new or get an existing ChunkWriter of a measurement. Different timeseries of
* the same measurement and data type shares the same instance.
*/
public IChunkWriter getChunkWriter(IMeasurementSchema measurementSchema) {
return chunkWriterCache.computeIfAbsent(measurementSchema, ChunkWriterImpl::new);
}
/**
* Get the modifications of a timeseries in the ModificationFile of a TsFile.
*
* @param path name of the time series
*/
public List<Modification> getModifications(TsFileResource tsFileResource, PartialPath path) {
// copy from TsFileResource so queries are not affected
List<Modification> modifications =
modificationCache.computeIfAbsent(
tsFileResource, resource -> new LinkedList<>(resource.getModFile().getModifications()));
List<Modification> pathModifications = new ArrayList<>();
Iterator<Modification> modificationIterator = modifications.iterator();
while (modificationIterator.hasNext()) {
Modification modification = modificationIterator.next();
if (modification.getPath().matchFullPath(path)) {
pathModifications.add(modification);
}
}
return pathModifications;
}
/**
* Remove and close the writer of the merge temp file of a SeqFile. The merge temp file is also
* deleted.
*
* @param tsFileResource the SeqFile
*/
public void removeFileAndWriter(TsFileResource tsFileResource) throws IOException {
RestorableTsFileIOWriter newFileWriter = fileWriterCache.remove(tsFileResource);
if (newFileWriter != null) {
newFileWriter.close();
newFileWriter.getFile().delete();
}
}
/**
* Remove and close the reader of the TsFile. The TsFile is NOT deleted.
*
* @param resource the SeqFile
*/
public void removeFileReader(TsFileResource resource) throws IOException {
TsFileSequenceReader sequenceReader = fileReaderCache.remove(resource);
if (sequenceReader != null) {
sequenceReader.close();
}
}
public List<TsFileResource> getSeqFiles() {
return seqFiles;
}
public void setSeqFiles(List<TsFileResource> seqFiles) {
this.seqFiles = seqFiles;
}
public List<TsFileResource> getUnseqFiles() {
return unseqFiles;
}
public void setUnseqFiles(List<TsFileResource> unseqFiles) {
this.unseqFiles = unseqFiles;
}
public void removeOutdatedSeqReaders() throws IOException {
Iterator<Entry<TsFileResource, TsFileSequenceReader>> entryIterator =
fileReaderCache.entrySet().iterator();
while (entryIterator.hasNext()) {
Entry<TsFileResource, TsFileSequenceReader> entry = entryIterator.next();
TsFileResource tsFile = entry.getKey();
if (!seqFiles.contains(tsFile)) {
TsFileSequenceReader reader = entry.getValue();
reader.close();
entryIterator.remove();
}
}
}
public void setCacheDeviceMeta(boolean cacheDeviceMeta) {
this.cacheDeviceMeta = cacheDeviceMeta;
}
public void setMeasurementSchemaMap(Map<PartialPath, IMeasurementSchema> measurementSchemaMap) {
this.measurementSchemaMap = measurementSchemaMap;
}
public void clearChunkWriterCache() {
this.chunkWriterCache.clear();
}
public void updateStartTime(TsFileResource tsFileResource, String device, long startTime) {
Map<String, Pair<Long, Long>> deviceStartEndTimePairMap =
startEndTimeCache.getOrDefault(tsFileResource, new HashMap<>());
Pair<Long, Long> startEndTimePair =
deviceStartEndTimePairMap.getOrDefault(device, new Pair<>(Long.MAX_VALUE, Long.MIN_VALUE));
long newStartTime = startEndTimePair.left > startTime ? startTime : startEndTimePair.left;
deviceStartEndTimePairMap.put(device, new Pair<>(newStartTime, startEndTimePair.right));
startEndTimeCache.put(tsFileResource, deviceStartEndTimePairMap);
}
public void updateEndTime(TsFileResource tsFileResource, String device, long endTime) {
Map<String, Pair<Long, Long>> deviceStartEndTimePairMap =
startEndTimeCache.getOrDefault(tsFileResource, new HashMap<>());
Pair<Long, Long> startEndTimePair =
deviceStartEndTimePairMap.getOrDefault(device, new Pair<>(Long.MAX_VALUE, Long.MIN_VALUE));
long newEndTime = startEndTimePair.right < endTime ? endTime : startEndTimePair.right;
deviceStartEndTimePairMap.put(device, new Pair<>(startEndTimePair.left, newEndTime));
startEndTimeCache.put(tsFileResource, deviceStartEndTimePairMap);
}
public Map<String, Pair<Long, Long>> getStartEndTime(TsFileResource tsFileResource) {
return startEndTimeCache.getOrDefault(tsFileResource, new HashMap<>());
}
}