| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iotdb.db.engine.storagegroup; |
| |
| import org.apache.iotdb.db.conf.IoTDBConfig; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.engine.modification.ModificationFile; |
| import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; |
| import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpgradeTsFileResourceCallBack; |
| import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex; |
| import org.apache.iotdb.db.engine.storagegroup.timeindex.ITimeIndex; |
| import org.apache.iotdb.db.engine.storagegroup.timeindex.TimeIndexLevel; |
| import org.apache.iotdb.db.engine.upgrade.UpgradeTask; |
| import org.apache.iotdb.db.exception.PartitionViolationException; |
| import org.apache.iotdb.db.service.UpgradeSevice; |
| import org.apache.iotdb.db.utils.FilePathUtils; |
| import org.apache.iotdb.db.utils.TestOnly; |
| import org.apache.iotdb.tsfile.common.constant.TsFileConstant; |
| import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.VectorChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.VectorTimeSeriesMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; |
| import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; |
| import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; |
| import org.apache.iotdb.tsfile.read.filter.basic.Filter; |
| import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.nio.file.FileAlreadyExistsException; |
| import java.nio.file.Files; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Random; |
| import java.util.Set; |
| |
| import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR; |
| import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_INDEX; |
| import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_MERGECNT_INDEX; |
| import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_SEPARATOR; |
| import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_TIME_INDEX; |
| import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_UNSEQMERGECNT_INDEX; |
| import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SUFFIX_VERSION_INDEX; |
| import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; |
| |
| @SuppressWarnings("java:S1135") // ignore todos |
| public class TsFileResource { |
| |
| private static final Logger logger = LoggerFactory.getLogger(TsFileResource.class); |
| |
| private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG"); |
| |
| private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); |
| |
| // tsfile |
| private File file; |
| |
| public static final String RESOURCE_SUFFIX = ".resource"; |
| static final String TEMP_SUFFIX = ".temp"; |
| |
| /** version number */ |
| public static final byte VERSION_NUMBER = 1; |
| |
| public TsFileProcessor getProcessor() { |
| return processor; |
| } |
| |
| private TsFileProcessor processor; |
| |
| protected ITimeIndex timeIndex; |
| |
| /** time index type, fileTimeIndex = 0, deviceTimeIndex = 1 */ |
| private byte timeIndexType; |
| |
| private ModificationFile modFile; |
| |
| private volatile boolean closed = false; |
| private volatile boolean deleted = false; |
| private volatile boolean isMerging = false; |
| |
| private TsFileLock tsFileLock = new TsFileLock(); |
| |
| private Random random = new Random(); |
| |
| /** |
| * Chunk metadata list of unsealed tsfile. Only be set in a temporal TsFileResource in a query |
| * process. |
| */ |
| private List<IChunkMetadata> chunkMetadataList; |
| |
| /** Mem chunk data. Only be set in a temporal TsFileResource in a query process. */ |
| private List<ReadOnlyMemChunk> readOnlyMemChunk; |
| |
| /** used for unsealed file to get TimeseriesMetadata */ |
| private ITimeSeriesMetadata timeSeriesMetadata; |
| |
| private FSFactory fsFactory = FSFactoryProducer.getFSFactory(); |
| |
| /** generated upgraded TsFile ResourceList used for upgrading v0.11.x/v2 -> 0.12/v3 */ |
| private List<TsFileResource> upgradedResources; |
| |
| /** |
| * load upgraded TsFile Resources to storage group processor used for upgrading v0.11.x/v2 -> |
| * 0.12/v3 |
| */ |
| private UpgradeTsFileResourceCallBack upgradeTsFileResourceCallBack; |
| |
| /** |
| * indicate if this tsfile resource belongs to a sequence tsfile or not used for upgrading |
| * v0.9.x/v1 -> 0.10/v2 |
| */ |
| private boolean isSeq; |
| |
| /** |
| * If it is not null, it indicates that the current tsfile resource is a snapshot of the |
| * originTsFileResource, and if so, when we want to used the lock, we should try to acquire the |
| * lock of originTsFileResource |
| */ |
| private TsFileResource originTsFileResource; |
| |
| /** Maximum index of plans executed within this TsFile. */ |
| protected long maxPlanIndex = Long.MIN_VALUE; |
| |
| /** Minimum index of plans executed within this TsFile. */ |
| protected long minPlanIndex = Long.MAX_VALUE; |
| |
| private long version = 0; |
| |
| public TsFileResource() {} |
| |
| public TsFileResource(TsFileResource other) throws IOException { |
| this.file = other.file; |
| this.processor = other.processor; |
| this.timeIndex = other.timeIndex; |
| this.timeIndexType = other.timeIndexType; |
| this.modFile = other.modFile; |
| this.closed = other.closed; |
| this.deleted = other.deleted; |
| this.isMerging = other.isMerging; |
| this.chunkMetadataList = other.chunkMetadataList; |
| this.readOnlyMemChunk = other.readOnlyMemChunk; |
| generateTimeSeriesMetadata(); |
| this.tsFileLock = other.tsFileLock; |
| this.fsFactory = other.fsFactory; |
| this.maxPlanIndex = other.maxPlanIndex; |
| this.minPlanIndex = other.minPlanIndex; |
| this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName()); |
| } |
| |
| /** for sealed TsFile, call setClosed to close TsFileResource */ |
| public TsFileResource(File file) { |
| this.file = file; |
| this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName()); |
| this.timeIndex = config.getTimeIndexLevel().getTimeIndex(); |
| this.timeIndexType = (byte) config.getTimeIndexLevel().ordinal(); |
| } |
| |
| /** unsealed TsFile */ |
| public TsFileResource(File file, TsFileProcessor processor, int deviceNumInLastClosedTsFile) { |
| this.file = file; |
| this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName()); |
| this.timeIndex = config.getTimeIndexLevel().getTimeIndex(deviceNumInLastClosedTsFile); |
| this.timeIndexType = (byte) config.getTimeIndexLevel().ordinal(); |
| this.processor = processor; |
| } |
| |
| /** unsealed TsFile */ |
| public TsFileResource( |
| List<ReadOnlyMemChunk> readOnlyMemChunk, |
| List<IChunkMetadata> chunkMetadataList, |
| TsFileResource originTsFileResource) |
| throws IOException { |
| this.file = originTsFileResource.file; |
| this.timeIndex = originTsFileResource.timeIndex; |
| this.timeIndexType = originTsFileResource.timeIndexType; |
| this.chunkMetadataList = chunkMetadataList; |
| this.readOnlyMemChunk = readOnlyMemChunk; |
| this.originTsFileResource = originTsFileResource; |
| this.version = originTsFileResource.version; |
| generateTimeSeriesMetadata(); |
| } |
| |
| @TestOnly |
| public TsFileResource( |
| File file, Map<String, Integer> deviceToIndex, long[] startTimes, long[] endTimes) { |
| this.file = file; |
| this.timeIndex = new DeviceTimeIndex(deviceToIndex, startTimes, endTimes); |
| this.timeIndexType = 1; |
| } |
| |
| @SuppressWarnings("squid:S3776") // high Cognitive Complexity |
| private void generateTimeSeriesMetadata() throws IOException { |
| TimeseriesMetadata timeTimeSeriesMetadata = new TimeseriesMetadata(); |
| timeTimeSeriesMetadata.setOffsetOfChunkMetaDataList(-1); |
| timeTimeSeriesMetadata.setDataSizeOfChunkMetaDataList(-1); |
| |
| if (!(chunkMetadataList == null || chunkMetadataList.isEmpty())) { |
| timeTimeSeriesMetadata.setMeasurementId(chunkMetadataList.get(0).getMeasurementUid()); |
| TSDataType dataType = chunkMetadataList.get(0).getDataType(); |
| timeTimeSeriesMetadata.setTSDataType(dataType); |
| } else if (!(readOnlyMemChunk == null || readOnlyMemChunk.isEmpty())) { |
| timeTimeSeriesMetadata.setMeasurementId(readOnlyMemChunk.get(0).getMeasurementUid()); |
| TSDataType dataType = readOnlyMemChunk.get(0).getDataType(); |
| timeTimeSeriesMetadata.setTSDataType(dataType); |
| } |
| if (timeTimeSeriesMetadata.getTSDataType() != null) { |
| if (timeTimeSeriesMetadata.getTSDataType() == TSDataType.VECTOR) { |
| Statistics<?> timeStatistics = |
| Statistics.getStatsByType(timeTimeSeriesMetadata.getTSDataType()); |
| |
| List<TimeseriesMetadata> valueTimeSeriesMetadataList = new ArrayList<>(); |
| |
| if (!(chunkMetadataList == null || chunkMetadataList.isEmpty())) { |
| VectorChunkMetadata vectorChunkMetadata = (VectorChunkMetadata) chunkMetadataList.get(0); |
| for (IChunkMetadata valueChunkMetadata : |
| vectorChunkMetadata.getValueChunkMetadataList()) { |
| TimeseriesMetadata valueMetadata = new TimeseriesMetadata(); |
| valueMetadata.setOffsetOfChunkMetaDataList(-1); |
| valueMetadata.setDataSizeOfChunkMetaDataList(-1); |
| valueMetadata.setMeasurementId(valueChunkMetadata.getMeasurementUid()); |
| valueMetadata.setTSDataType(valueChunkMetadata.getDataType()); |
| valueTimeSeriesMetadataList.add(valueMetadata); |
| valueMetadata.setStatistics( |
| Statistics.getStatsByType(valueChunkMetadata.getDataType())); |
| } |
| } else if (!(readOnlyMemChunk == null || readOnlyMemChunk.isEmpty())) { |
| VectorChunkMetadata vectorChunkMetadata = |
| (VectorChunkMetadata) readOnlyMemChunk.get(0).getChunkMetaData(); |
| for (IChunkMetadata valueChunkMetadata : |
| vectorChunkMetadata.getValueChunkMetadataList()) { |
| TimeseriesMetadata valueMetadata = new TimeseriesMetadata(); |
| valueMetadata.setOffsetOfChunkMetaDataList(-1); |
| valueMetadata.setDataSizeOfChunkMetaDataList(-1); |
| valueMetadata.setMeasurementId(valueChunkMetadata.getMeasurementUid()); |
| valueMetadata.setTSDataType(valueChunkMetadata.getDataType()); |
| valueTimeSeriesMetadataList.add(valueMetadata); |
| valueMetadata.setStatistics( |
| Statistics.getStatsByType(valueChunkMetadata.getDataType())); |
| } |
| } |
| |
| for (IChunkMetadata chunkMetadata : chunkMetadataList) { |
| VectorChunkMetadata vectorChunkMetadata = (VectorChunkMetadata) chunkMetadata; |
| timeStatistics.mergeStatistics( |
| vectorChunkMetadata.getTimeChunkMetadata().getStatistics()); |
| for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) { |
| valueTimeSeriesMetadataList |
| .get(i) |
| .getStatistics() |
| .mergeStatistics( |
| vectorChunkMetadata.getValueChunkMetadataList().get(i).getStatistics()); |
| } |
| } |
| |
| for (ReadOnlyMemChunk memChunk : readOnlyMemChunk) { |
| if (!memChunk.isEmpty()) { |
| VectorChunkMetadata vectorChunkMetadata = |
| (VectorChunkMetadata) memChunk.getChunkMetaData(); |
| timeStatistics.mergeStatistics( |
| vectorChunkMetadata.getTimeChunkMetadata().getStatistics()); |
| for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) { |
| valueTimeSeriesMetadataList |
| .get(i) |
| .getStatistics() |
| .mergeStatistics( |
| vectorChunkMetadata.getValueChunkMetadataList().get(i).getStatistics()); |
| } |
| } |
| } |
| timeTimeSeriesMetadata.setStatistics(timeStatistics); |
| timeSeriesMetadata = |
| new VectorTimeSeriesMetadata(timeTimeSeriesMetadata, valueTimeSeriesMetadataList); |
| } else { |
| Statistics<?> seriesStatistics = |
| Statistics.getStatsByType(timeTimeSeriesMetadata.getTSDataType()); |
| // flush chunkMetadataList one by one |
| for (IChunkMetadata chunkMetadata : chunkMetadataList) { |
| seriesStatistics.mergeStatistics(chunkMetadata.getStatistics()); |
| } |
| |
| for (ReadOnlyMemChunk memChunk : readOnlyMemChunk) { |
| if (!memChunk.isEmpty()) { |
| seriesStatistics.mergeStatistics(memChunk.getChunkMetaData().getStatistics()); |
| } |
| } |
| timeTimeSeriesMetadata.setStatistics(seriesStatistics); |
| this.timeSeriesMetadata = timeTimeSeriesMetadata; |
| } |
| } else { |
| this.timeSeriesMetadata = null; |
| } |
| } |
| |
| public synchronized void serialize() throws IOException { |
| try (OutputStream outputStream = |
| fsFactory.getBufferedOutputStream(file + RESOURCE_SUFFIX + TEMP_SUFFIX)) { |
| ReadWriteIOUtils.write(VERSION_NUMBER, outputStream); |
| ReadWriteIOUtils.write(timeIndexType, outputStream); |
| timeIndex.serialize(outputStream); |
| |
| ReadWriteIOUtils.write(maxPlanIndex, outputStream); |
| ReadWriteIOUtils.write(minPlanIndex, outputStream); |
| |
| if (modFile != null && modFile.exists()) { |
| String modFileName = new File(modFile.getFilePath()).getName(); |
| ReadWriteIOUtils.write(modFileName, outputStream); |
| } |
| } |
| File src = fsFactory.getFile(file + RESOURCE_SUFFIX + TEMP_SUFFIX); |
| File dest = fsFactory.getFile(file + RESOURCE_SUFFIX); |
| fsFactory.deleteIfExists(dest); |
| fsFactory.moveFile(src, dest); |
| } |
| |
| public void deserialize() throws IOException { |
| try (InputStream inputStream = fsFactory.getBufferedInputStream(file + RESOURCE_SUFFIX)) { |
| readVersionNumber(inputStream); |
| timeIndexType = ReadWriteIOUtils.readBytes(inputStream, 1)[0]; |
| timeIndex = TimeIndexLevel.valueOf(timeIndexType).getTimeIndex().deserialize(inputStream); |
| maxPlanIndex = ReadWriteIOUtils.readLong(inputStream); |
| minPlanIndex = ReadWriteIOUtils.readLong(inputStream); |
| if (inputStream.available() > 0) { |
| String modFileName = ReadWriteIOUtils.readString(inputStream); |
| if (modFileName != null) { |
| File modF = new File(file.getParentFile(), modFileName); |
| modFile = new ModificationFile(modF.getPath()); |
| } |
| } |
| } |
| } |
| |
| public void deserializeFromOldFile() throws IOException { |
| try (InputStream inputStream = fsFactory.getBufferedInputStream(file + RESOURCE_SUFFIX)) { |
| // deserialize old TsfileResource |
| int size = ReadWriteIOUtils.readInt(inputStream); |
| Map<String, Integer> deviceMap = new HashMap<>(); |
| long[] startTimesArray = new long[size]; |
| long[] endTimesArray = new long[size]; |
| for (int i = 0; i < size; i++) { |
| String path = ReadWriteIOUtils.readString(inputStream); |
| long time = ReadWriteIOUtils.readLong(inputStream); |
| deviceMap.put(path, i); |
| startTimesArray[i] = time; |
| } |
| size = ReadWriteIOUtils.readInt(inputStream); |
| for (int i = 0; i < size; i++) { |
| ReadWriteIOUtils.readString(inputStream); // String path |
| long time = ReadWriteIOUtils.readLong(inputStream); |
| endTimesArray[i] = time; |
| } |
| timeIndexType = (byte) 1; |
| timeIndex = new DeviceTimeIndex(deviceMap, startTimesArray, endTimesArray); |
| if (inputStream.available() > 0) { |
| int versionSize = ReadWriteIOUtils.readInt(inputStream); |
| for (int i = 0; i < versionSize; i++) { |
| // historicalVersions |
| ReadWriteIOUtils.readLong(inputStream); |
| } |
| } |
| if (inputStream.available() > 0) { |
| String modFileName = ReadWriteIOUtils.readString(inputStream); |
| if (modFileName != null) { |
| File modF = new File(file.getParentFile(), modFileName); |
| modFile = new ModificationFile(modF.getPath()); |
| } |
| } |
| } |
| } |
| |
| /** read version number, used for checking compatibility of TsFileResource in the future */ |
| private byte readVersionNumber(InputStream inputStream) throws IOException { |
| return ReadWriteIOUtils.readBytes(inputStream, 1)[0]; |
| } |
| |
| public void updateStartTime(String device, long time) { |
| timeIndex.updateStartTime(device, time); |
| } |
| |
| // used in merge, refresh all start time |
| public void putStartTime(String device, long time) { |
| timeIndex.putStartTime(device, time); |
| } |
| |
| public void updateEndTime(String device, long time) { |
| timeIndex.updateEndTime(device, time); |
| } |
| |
| // used in merge, refresh all end time |
| public void putEndTime(String device, long time) { |
| timeIndex.putEndTime(device, time); |
| } |
| |
| public boolean resourceFileExists() { |
| return fsFactory.getFile(file + RESOURCE_SUFFIX).exists(); |
| } |
| |
| public List<IChunkMetadata> getChunkMetadataList() { |
| return new ArrayList<>(chunkMetadataList); |
| } |
| |
| public List<ReadOnlyMemChunk> getReadOnlyMemChunk() { |
| return readOnlyMemChunk; |
| } |
| |
| public synchronized ModificationFile getModFile() { |
| if (modFile == null) { |
| modFile = new ModificationFile(file.getPath() + ModificationFile.FILE_SUFFIX); |
| } |
| return modFile; |
| } |
| |
| public void setFile(File file) { |
| this.file = file; |
| } |
| |
| public File getTsFile() { |
| return file; |
| } |
| |
| public String getTsFilePath() { |
| return file.getPath(); |
| } |
| |
| public long getTsFileSize() { |
| return file.length(); |
| } |
| |
| public long getStartTime(String deviceId) { |
| return timeIndex.getStartTime(deviceId); |
| } |
| |
| /** open file's end time is Long.MIN_VALUE */ |
| public long getEndTime(String deviceId) { |
| return timeIndex.getEndTime(deviceId); |
| } |
| |
| public Set<String> getDevices() { |
| return timeIndex.getDevices(); |
| } |
| |
| public boolean endTimeEmpty() { |
| return timeIndex.endTimeEmpty(); |
| } |
| |
| public boolean isClosed() { |
| return closed; |
| } |
| |
| public void close() throws IOException { |
| closed = true; |
| if (modFile != null) { |
| modFile.close(); |
| modFile = null; |
| } |
| processor = null; |
| chunkMetadataList = null; |
| timeIndex.close(); |
| } |
| |
| TsFileProcessor getUnsealedFileProcessor() { |
| return processor; |
| } |
| |
| public void writeLock() { |
| if (originTsFileResource == null) { |
| tsFileLock.writeLock(); |
| } else { |
| originTsFileResource.writeLock(); |
| } |
| } |
| |
| public void writeUnlock() { |
| if (originTsFileResource == null) { |
| tsFileLock.writeUnlock(); |
| } else { |
| originTsFileResource.writeUnlock(); |
| } |
| } |
| |
| /** |
| * If originTsFileResource is not null, we should acquire the read lock of originTsFileResource |
| * before construct the current TsFileResource |
| */ |
| public void readLock() { |
| if (originTsFileResource == null) { |
| tsFileLock.readLock(); |
| } else { |
| originTsFileResource.readLock(); |
| } |
| } |
| |
| public void readUnlock() { |
| if (originTsFileResource == null) { |
| tsFileLock.readUnlock(); |
| } else { |
| originTsFileResource.readUnlock(); |
| } |
| } |
| |
| public boolean tryWriteLock() { |
| return tsFileLock.tryWriteLock(); |
| } |
| |
| void doUpgrade() { |
| UpgradeSevice.getINSTANCE().submitUpgradeTask(new UpgradeTask(this)); |
| } |
| |
| public void removeModFile() throws IOException { |
| getModFile().remove(); |
| modFile = null; |
| } |
| |
| /** Remove the data file, its resource file, and its modification file physically. */ |
| public void remove() { |
| try { |
| fsFactory.deleteIfExists(file); |
| } catch (IOException e) { |
| logger.error("TsFile {} cannot be deleted: {}", file, e.getMessage()); |
| } |
| removeResourceFile(); |
| try { |
| fsFactory.deleteIfExists(fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX)); |
| } catch (IOException e) { |
| logger.error("ModificationFile {} cannot be deleted: {}", file, e.getMessage()); |
| } |
| } |
| |
| public void removeResourceFile() { |
| try { |
| fsFactory.deleteIfExists(fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX)); |
| } catch (IOException e) { |
| logger.error("TsFileResource {} cannot be deleted: {}", file, e.getMessage()); |
| } |
| } |
| |
| void moveTo(File targetDir) { |
| fsFactory.moveFile(file, fsFactory.getFile(targetDir, file.getName())); |
| fsFactory.moveFile( |
| fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX), |
| fsFactory.getFile(targetDir, file.getName() + RESOURCE_SUFFIX)); |
| File originModFile = fsFactory.getFile(file.getPath() + ModificationFile.FILE_SUFFIX); |
| if (originModFile.exists()) { |
| fsFactory.moveFile( |
| originModFile, |
| fsFactory.getFile(targetDir, file.getName() + ModificationFile.FILE_SUFFIX)); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return file.toString(); |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| TsFileResource that = (TsFileResource) o; |
| return Objects.equals(file, that.file); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(file); |
| } |
| |
| public void setClosed(boolean closed) { |
| this.closed = closed; |
| } |
| |
| public boolean isDeleted() { |
| return deleted; |
| } |
| |
| public void setDeleted(boolean deleted) { |
| this.deleted = deleted; |
| } |
| |
| boolean isMerging() { |
| return isMerging; |
| } |
| |
| public void setMerging(boolean merging) { |
| isMerging = merging; |
| } |
| |
| /** check if any of the device lives over the given time bound */ |
| public boolean stillLives(long timeLowerBound) { |
| return timeIndex.stillLives(timeLowerBound); |
| } |
| |
| /** @return true if the device is contained in the TsFile and it lives beyond TTL */ |
| public boolean isSatisfied( |
| String deviceId, Filter timeFilter, boolean isSeq, long ttl, boolean debug) { |
| if (!getDevices().contains(deviceId)) { |
| if (debug) { |
| DEBUG_LOGGER.info( |
| "Path: {} file {} is not satisfied because of no device!", deviceId, file); |
| } |
| return false; |
| } |
| |
| long startTime = getStartTime(deviceId); |
| long endTime = closed || !isSeq ? getEndTime(deviceId) : Long.MAX_VALUE; |
| |
| if (!isAlive(endTime, ttl)) { |
| if (debug) { |
| DEBUG_LOGGER.info("Path: {} file {} is not satisfied because of ttl!", deviceId, file); |
| } |
| return false; |
| } |
| |
| if (timeFilter != null) { |
| boolean res = timeFilter.satisfyStartEndTime(startTime, endTime); |
| if (debug && !res) { |
| DEBUG_LOGGER.info( |
| "Path: {} file {} is not satisfied because of time filter!", deviceId, fsFactory); |
| } |
| return res; |
| } |
| return true; |
| } |
| |
| /** @return whether the given time falls in ttl */ |
| private boolean isAlive(long time, long dataTTL) { |
| return dataTTL == Long.MAX_VALUE || (System.currentTimeMillis() - time) <= dataTTL; |
| } |
| |
| public void setProcessor(TsFileProcessor processor) { |
| this.processor = processor; |
| } |
| |
| /** |
| * Get a timeseriesMetadata. |
| * |
| * @return TimeseriesMetadata or the first ValueTimeseriesMetadata in VectorTimeseriesMetadata |
| */ |
| public TimeseriesMetadata getTimeSeriesMetadata() { |
| if (timeSeriesMetadata == null) { |
| return null; |
| } |
| if (timeSeriesMetadata instanceof TimeseriesMetadata) { |
| return (TimeseriesMetadata) timeSeriesMetadata; |
| } else { |
| // it's ok for us to return the first value timeseries metadata, |
| // because the MemChunkMetadataLoader is not depend on the timeseries metadata |
| return ((VectorTimeSeriesMetadata) timeSeriesMetadata) |
| .getValueTimeseriesMetadataList() |
| .get(0); |
| } |
| } |
| |
| public void setUpgradedResources(List<TsFileResource> upgradedResources) { |
| this.upgradedResources = upgradedResources; |
| } |
| |
| public List<TsFileResource> getUpgradedResources() { |
| return upgradedResources; |
| } |
| |
| public void setSeq(boolean isSeq) { |
| this.isSeq = isSeq; |
| } |
| |
| public boolean isSeq() { |
| return isSeq; |
| } |
| |
| public void setUpgradeTsFileResourceCallBack( |
| UpgradeTsFileResourceCallBack upgradeTsFileResourceCallBack) { |
| this.upgradeTsFileResourceCallBack = upgradeTsFileResourceCallBack; |
| } |
| |
| public UpgradeTsFileResourceCallBack getUpgradeTsFileResourceCallBack() { |
| return upgradeTsFileResourceCallBack; |
| } |
| |
| /** make sure Either the deviceToIndex is not empty Or the path contains a partition folder */ |
| public long getTimePartition() { |
| return timeIndex.getTimePartition(file.getAbsolutePath()); |
| } |
| |
| /** |
| * Used when load new TsFiles not generated by the server Check and get the time partition |
| * |
| * @throws PartitionViolationException if the data of the file spans partitions or it is empty |
| */ |
| public long getTimePartitionWithCheck() throws PartitionViolationException { |
| return timeIndex.getTimePartitionWithCheck(file.toString()); |
| } |
| |
| /** Check whether the tsFile spans multiple time partitions. */ |
| public boolean isSpanMultiTimePartitions() { |
| return timeIndex.isSpanMultiTimePartitions(); |
| } |
| |
| /** |
| * Create a hardlink for the TsFile and modification file (if exists) The hardlink will have a |
| * suffix like ".{sysTime}_{randomLong}" |
| * |
| * @return a new TsFileResource with its file changed to the hardlink or null the hardlink cannot |
| * be created. |
| */ |
| public TsFileResource createHardlink() { |
| if (!file.exists()) { |
| return null; |
| } |
| |
| TsFileResource newResource; |
| try { |
| newResource = new TsFileResource(this); |
| } catch (IOException e) { |
| logger.error("Cannot create hardlink for {}", file, e); |
| return null; |
| } |
| |
| while (true) { |
| String hardlinkSuffix = |
| TsFileConstant.PATH_SEPARATOR + System.currentTimeMillis() + "_" + random.nextLong(); |
| File hardlink = new File(file.getAbsolutePath() + hardlinkSuffix); |
| |
| try { |
| Files.createLink(Paths.get(hardlink.getAbsolutePath()), Paths.get(file.getAbsolutePath())); |
| newResource.setFile(hardlink); |
| if (modFile != null && modFile.exists()) { |
| newResource.setModFile(modFile.createHardlink()); |
| } |
| break; |
| } catch (FileAlreadyExistsException e) { |
| // retry a different name if the file is already created |
| } catch (IOException e) { |
| logger.error("Cannot create hardlink for {}", file, e); |
| return null; |
| } |
| } |
| return newResource; |
| } |
| |
| public synchronized void setModFile(ModificationFile modFile) { |
| this.modFile = modFile; |
| } |
| |
| /** @return initial resource map size */ |
| public long calculateRamSize() { |
| return timeIndex.calculateRamSize(); |
| } |
| |
| /** |
| * Calculate the resource ram increment when insert data in TsFileProcessor |
| * |
| * @return ramIncrement |
| */ |
| public long estimateRamIncrement(String deviceToBeChecked) { |
| return timeIndex.estimateRamIncrement(deviceToBeChecked); |
| } |
| |
| public void delete() throws IOException { |
| if (file.exists()) { |
| Files.delete(file.toPath()); |
| Files.delete( |
| FSFactoryProducer.getFSFactory() |
| .getFile(file.toPath() + TsFileResource.RESOURCE_SUFFIX) |
| .toPath()); |
| } |
| } |
| |
| public long getMaxPlanIndex() { |
| return maxPlanIndex; |
| } |
| |
| public long getMinPlanIndex() { |
| return minPlanIndex; |
| } |
| |
| public void updatePlanIndexes(long planIndex) { |
| if (planIndex == Long.MIN_VALUE || planIndex == Long.MAX_VALUE) { |
| return; |
| } |
| maxPlanIndex = Math.max(maxPlanIndex, planIndex); |
| minPlanIndex = Math.min(minPlanIndex, planIndex); |
| if (closed) { |
| try { |
| serialize(); |
| } catch (IOException e) { |
| logger.error( |
| "Cannot serialize TsFileResource {} when updating plan index {}-{}", |
| this, |
| maxPlanIndex, |
| planIndex); |
| } |
| } |
| } |
| |
| /** For merge, the index range of the new file should be the union of all files' in this merge. */ |
| public void updatePlanIndexes(TsFileResource another) { |
| maxPlanIndex = Math.max(maxPlanIndex, another.maxPlanIndex); |
| minPlanIndex = Math.min(minPlanIndex, another.minPlanIndex); |
| } |
| |
| public boolean isPlanIndexOverlap(TsFileResource another) { |
| return another.maxPlanIndex >= this.minPlanIndex && another.minPlanIndex <= this.maxPlanIndex; |
| } |
| |
| public boolean isPlanRangeCovers(TsFileResource another) { |
| return this.minPlanIndex <= another.minPlanIndex && another.maxPlanIndex <= this.maxPlanIndex; |
| } |
| |
| public void setMaxPlanIndex(long maxPlanIndex) { |
| this.maxPlanIndex = maxPlanIndex; |
| } |
| |
| public void setMinPlanIndex(long minPlanIndex) { |
| this.minPlanIndex = minPlanIndex; |
| } |
| |
| public void setVersion(long version) { |
| this.version = version; |
| } |
| |
| public long getVersion() { |
| return version; |
| } |
| |
| public void setTimeIndex(ITimeIndex timeIndex) { |
| this.timeIndex = timeIndex; |
| } |
| |
| // change tsFile name |
| |
| public static String getNewTsFileName(long time, long version, int mergeCnt, int unSeqMergeCnt) { |
| return time |
| + FILE_NAME_SEPARATOR |
| + version |
| + FILE_NAME_SEPARATOR |
| + mergeCnt |
| + FILE_NAME_SEPARATOR |
| + unSeqMergeCnt |
| + TSFILE_SUFFIX; |
| } |
| |
| public static TsFileName getTsFileName(String FileName) { |
| String[] fileName = |
| FileName.split(FILE_NAME_SUFFIX_SEPARATOR)[FILE_NAME_SUFFIX_INDEX].split( |
| FILE_NAME_SEPARATOR); |
| TsFileName tsFileName = |
| new TsFileName( |
| Long.parseLong(fileName[FILE_NAME_SUFFIX_TIME_INDEX]), |
| Long.parseLong(fileName[FILE_NAME_SUFFIX_VERSION_INDEX]), |
| Integer.parseInt(fileName[FILE_NAME_SUFFIX_MERGECNT_INDEX]), |
| Integer.parseInt(fileName[FILE_NAME_SUFFIX_UNSEQMERGECNT_INDEX])); |
| return tsFileName; |
| } |
| |
| public static TsFileResource modifyTsFileNameUnseqMergCnt(TsFileResource tsFileResource) { |
| File tsFile = tsFileResource.getTsFile(); |
| String path = tsFile.getParent(); |
| TsFileName tsFileName = getTsFileName(tsFileResource.getTsFile().getName()); |
| tsFileName.setUnSeqMergeCnt(tsFileName.getUnSeqMergeCnt() + 1); |
| tsFileResource.setFile( |
| new File( |
| path, |
| tsFileName.time |
| + FILE_NAME_SEPARATOR |
| + tsFileName.version |
| + FILE_NAME_SEPARATOR |
| + tsFileName.mergeCnt |
| + FILE_NAME_SEPARATOR |
| + tsFileName.unSeqMergeCnt |
| + TSFILE_SUFFIX)); |
| return tsFileResource; |
| } |
| |
| public static File modifyTsFileNameUnseqMergCnt(File tsFile) { |
| String path = tsFile.getParent(); |
| TsFileName tsFileName = getTsFileName(tsFile.getName()); |
| tsFileName.setUnSeqMergeCnt(tsFileName.getUnSeqMergeCnt() + 1); |
| return new File( |
| path, |
| tsFileName.time |
| + FILE_NAME_SEPARATOR |
| + tsFileName.version |
| + FILE_NAME_SEPARATOR |
| + tsFileName.mergeCnt |
| + FILE_NAME_SEPARATOR |
| + tsFileName.unSeqMergeCnt |
| + TSFILE_SUFFIX); |
| } |
| |
| public static File modifyTsFileNameMergeCnt(File tsFile) { |
| String path = tsFile.getParent(); |
| TsFileName tsFileName = getTsFileName(tsFile.getName()); |
| tsFileName.setMergeCnt(tsFileName.getMergeCnt() + 1); |
| return new File( |
| path, |
| tsFileName.time |
| + FILE_NAME_SEPARATOR |
| + tsFileName.version |
| + FILE_NAME_SEPARATOR |
| + tsFileName.mergeCnt |
| + FILE_NAME_SEPARATOR |
| + tsFileName.unSeqMergeCnt |
| + TSFILE_SUFFIX); |
| } |
| |
| public static class TsFileName { |
| |
| private long time; |
| private long version; |
| private int mergeCnt; |
| private int unSeqMergeCnt; |
| |
| public TsFileName(long time, long version, int mergeCnt, int unSeqMergeCnt) { |
| this.time = time; |
| this.version = version; |
| this.mergeCnt = mergeCnt; |
| this.unSeqMergeCnt = unSeqMergeCnt; |
| } |
| |
| public long getTime() { |
| return time; |
| } |
| |
| public long getVersion() { |
| return version; |
| } |
| |
| public int getMergeCnt() { |
| return mergeCnt; |
| } |
| |
| public int getUnSeqMergeCnt() { |
| return unSeqMergeCnt; |
| } |
| |
| public void setTime(long time) { |
| this.time = time; |
| } |
| |
| public void setVersion(long version) { |
| this.version = version; |
| } |
| |
| public void setMergeCnt(int mergeCnt) { |
| this.mergeCnt = mergeCnt; |
| } |
| |
| public void setUnSeqMergeCnt(int unSeqMergeCnt) { |
| this.unSeqMergeCnt = unSeqMergeCnt; |
| } |
| } |
| } |