| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.tsfile.write.writer; |
| |
| import org.apache.iotdb.tsfile.exception.NotCompatibleTsFileException; |
| import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; |
| import org.apache.iotdb.tsfile.file.metadata.IDeviceID; |
| import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; |
| import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; |
| import org.apache.iotdb.tsfile.read.TsFileCheckStatus; |
| import org.apache.iotdb.tsfile.read.TsFileSequenceReader; |
| import org.apache.iotdb.tsfile.read.common.Path; |
| import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.channels.FileChannel; |
| import java.nio.file.Paths; |
| import java.nio.file.StandardOpenOption; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| /** |
| * This writer is for opening and recover a TsFile |
| * |
| * <p>(1) If the TsFile is closed normally, hasCrashed()=false and canWrite()=false |
| * |
| * <p>(2) Otherwise, the writer generates metadata for already flushed Chunks and truncate crashed |
| * data. The hasCrashed()=true and canWrite()=true |
| * |
| * <p>Notice!!! If you want to query this file through the generated metadata, remember to call the |
| * makeMetadataVisible() |
| */ |
| public class RestorableTsFileIOWriter extends TsFileIOWriter { |
| |
| private static final Logger logger = LoggerFactory.getLogger("FileMonitor"); |
| private long truncatedSize = -1; |
| private Map<Path, IMeasurementSchema> knownSchemas = new HashMap<>(); |
| |
| private int lastFlushedChunkGroupIndex = 0; |
| |
| private boolean crashed; |
| |
| private long minPlanIndex = Long.MAX_VALUE; |
| private long maxPlanIndex = Long.MIN_VALUE; |
| |
| /** all chunk group metadata which have been serialized on disk. */ |
| private final Map<IDeviceID, Map<String, List<ChunkMetadata>>> metadatasForQuery = |
| new HashMap<>(); |
| |
| /** |
| * @param file a given tsfile path you want to (continue to) write |
| * @throws IOException if write failed, or the file is broken but autoRepair==false. |
| */ |
| public RestorableTsFileIOWriter(File file) throws IOException { |
| this(file, true); |
| } |
| |
| /** |
| * @param file a given tsfile path you want to (continue to) write |
| * @throws IOException if write failed, or the file is broken but autoRepair==false. |
| */ |
| public RestorableTsFileIOWriter(File file, long maxMetadataSize) throws IOException { |
| this(file, true); |
| this.maxMetadataSize = maxMetadataSize; |
| this.chunkMetadataTempFile = new File(file.getAbsolutePath() + CHUNK_METADATA_TEMP_FILE_SUFFIX); |
| this.checkMetadataSizeAndMayFlush(); |
| } |
| |
| public RestorableTsFileIOWriter(File file, boolean truncate) throws IOException { |
| if (logger.isDebugEnabled()) { |
| logger.debug("{} is opened.", file.getName()); |
| } |
| this.file = file; |
| this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), true); |
| |
| // file doesn't exist |
| if (file.length() == 0) { |
| startFile(); |
| crashed = true; |
| canWrite = true; |
| return; |
| } |
| |
| if (file.exists()) { |
| try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) { |
| |
| truncatedSize = reader.selfCheck(knownSchemas, chunkGroupMetadataList, true); |
| minPlanIndex = reader.getMinPlanIndex(); |
| maxPlanIndex = reader.getMaxPlanIndex(); |
| if (truncatedSize == TsFileCheckStatus.COMPLETE_FILE) { |
| crashed = false; |
| canWrite = false; |
| out.close(); |
| } else if (truncatedSize == TsFileCheckStatus.INCOMPATIBLE_FILE) { |
| out.close(); |
| throw new NotCompatibleTsFileException( |
| String.format("%s is not in TsFile format.", file.getAbsolutePath())); |
| } else { |
| crashed = true; |
| canWrite = true; |
| // remove broken data |
| if (truncate) { |
| out.truncate(truncatedSize); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Given a TsFile, generate a writable RestorableTsFileIOWriter. That is, for a complete TsFile, |
| * the function erases all FileMetadata and supports writing new data; For a incomplete TsFile, |
| * the function supports writing new data directly. However, it is more efficient using the |
| * construction function of RestorableTsFileIOWriter, if the tsfile is incomplete. |
| * |
| * @param file a TsFile |
| * @return a writable RestorableTsFileIOWriter |
| */ |
| public static RestorableTsFileIOWriter getWriterForAppendingDataOnCompletedTsFile(File file) |
| throws IOException { |
| long position = file.length(); |
| |
| try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath(), false)) { |
| // this tsfile is complete |
| if (reader.isComplete()) { |
| reader.loadMetadataSize(); |
| position = reader.getFileMetadataPos(); |
| } |
| } |
| |
| if (position != file.length()) { |
| // if the file is complete, we will remove all file metadatas |
| try (FileChannel channel = |
| FileChannel.open(Paths.get(file.getAbsolutePath()), StandardOpenOption.WRITE)) { |
| channel.truncate(position - 1); // remove the last marker. |
| } |
| } |
| return new RestorableTsFileIOWriter(file); |
| } |
| |
| long getTruncatedSize() { |
| return truncatedSize; |
| } |
| |
| public Map<Path, IMeasurementSchema> getKnownSchema() { |
| return knownSchemas; |
| } |
| |
| /** |
| * For query. |
| * |
| * <p>get chunks' metadata from memory. |
| * |
| * @param deviceId the device id |
| * @param measurementId the measurement id |
| * @param dataType the value type |
| * @return chunks' metadata |
| */ |
| public List<ChunkMetadata> getVisibleMetadataList( |
| IDeviceID deviceId, String measurementId, TSDataType dataType) { |
| List<ChunkMetadata> chunkMetadataList = new ArrayList<>(); |
| if (metadatasForQuery.containsKey(deviceId) |
| && metadatasForQuery.get(deviceId).containsKey(measurementId)) { |
| for (IChunkMetadata chunkMetaData : metadatasForQuery.get(deviceId).get(measurementId)) { |
| // filter: if a device'measurement is defined as float type, and data has been persistent. |
| // Then someone deletes the timeseries and recreate it with Int type. We have to ignore |
| // all the stale data. |
| if (dataType == null || dataType.equals(chunkMetaData.getDataType())) { |
| chunkMetadataList.add((ChunkMetadata) chunkMetaData); |
| } |
| } |
| } |
| return chunkMetadataList; |
| } |
| |
| public Map<IDeviceID, Map<String, List<ChunkMetadata>>> getMetadatasForQuery() { |
| return metadatasForQuery; |
| } |
| |
| /** |
| * add all appendChunkMetadatas into memory. After calling this method, other classes can read |
| * these metadata. |
| */ |
| public void makeMetadataVisible() { |
| List<ChunkGroupMetadata> newlyFlushedMetadataList = getAppendedRowMetadata(); |
| if (!newlyFlushedMetadataList.isEmpty()) { |
| for (ChunkGroupMetadata chunkGroupMetadata : newlyFlushedMetadataList) { |
| List<ChunkMetadata> rowMetaDataList = chunkGroupMetadata.getChunkMetadataList(); |
| |
| IDeviceID device = chunkGroupMetadata.getDevice(); |
| for (ChunkMetadata chunkMetaData : rowMetaDataList) { |
| String measurementId = chunkMetaData.getMeasurementUid(); |
| if (!metadatasForQuery.containsKey(device)) { |
| metadatasForQuery.put(device, new HashMap<>()); |
| } |
| if (!metadatasForQuery.get(device).containsKey(measurementId)) { |
| metadatasForQuery.get(device).put(measurementId, new ArrayList<>()); |
| } |
| metadatasForQuery.get(device).get(measurementId).add(chunkMetaData); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Whether this TsFile is crashed. |
| * |
| * @return false when this TsFile is complete |
| */ |
| public boolean hasCrashed() { |
| return crashed; |
| } |
| |
| /** |
| * get all the chunk's metadata which are appended after the last calling of this method, or after |
| * the class instance is initialized if this is the first time to call the method. |
| * |
| * @return a list of Device ChunkMetadataList Pair |
| */ |
| private List<ChunkGroupMetadata> getAppendedRowMetadata() { |
| List<ChunkGroupMetadata> append = new ArrayList<>(); |
| if (lastFlushedChunkGroupIndex < chunkGroupMetadataList.size()) { |
| append.addAll( |
| chunkGroupMetadataList.subList( |
| lastFlushedChunkGroupIndex, chunkGroupMetadataList.size())); |
| lastFlushedChunkGroupIndex = chunkGroupMetadataList.size(); |
| } |
| return append; |
| } |
| |
| public void addSchema(Path path, IMeasurementSchema schema) { |
| knownSchemas.put(path, schema); |
| } |
| |
| @Override |
| public long getMinPlanIndex() { |
| return minPlanIndex; |
| } |
| |
| @Override |
| public long getMaxPlanIndex() { |
| return maxPlanIndex; |
| } |
| } |