blob: 41ed2257ce8953e962ad7b6c043a482ddd8acbce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.write.writer;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexConstructor;
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
/**
* TsFileIOWriter is used to construct metadata and write data stored in memory to output stream.
*/
public class TsFileIOWriter {
protected static final byte[] MAGIC_STRING_BYTES;
public static final byte VERSION_NUMBER_BYTE;
protected static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
private static final Logger logger = LoggerFactory.getLogger(TsFileIOWriter.class);
private static final Logger resourceLogger = LoggerFactory.getLogger("FileMonitor");
static {
MAGIC_STRING_BYTES = BytesUtils.stringToBytes(TSFileConfig.MAGIC_STRING);
VERSION_NUMBER_BYTE = TSFileConfig.VERSION_NUMBER;
}
protected TsFileOutput out;
protected boolean canWrite = true;
protected File file;
// current flushed Chunk
private ChunkMetadata currentChunkMetadata;
// current flushed ChunkGroup
protected List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
// all flushed ChunkGroups
protected List<ChunkGroupMetadata> chunkGroupMetadataList = new ArrayList<>();
private long markedPosition;
private String currentChunkGroupDeviceId;
// for upgrade tool
Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap;
// the two longs marks the index range of operations in current MemTable
// and are serialized after MetaMarker.OPERATION_INDEX_RANGE to recover file-level range
private long minPlanIndex;
private long maxPlanIndex;
/** empty construct function. */
protected TsFileIOWriter() {}
/**
* for writing a new tsfile.
*
* @param file be used to output written data
* @throws IOException if I/O error occurs
*/
public TsFileIOWriter(File file) throws IOException {
this.out = FSFactoryProducer.getFileOutputFactory().getTsFileOutput(file.getPath(), false);
this.file = file;
if (resourceLogger.isDebugEnabled()) {
resourceLogger.debug("{} writer is opened.", file.getName());
}
startFile();
}
/**
* for writing a new tsfile.
*
* @param output be used to output written data
*/
public TsFileIOWriter(TsFileOutput output) throws IOException {
this.out = output;
startFile();
}
/** for test only */
public TsFileIOWriter(TsFileOutput output, boolean test) {
this.out = output;
}
/**
* Writes given bytes to output stream. This method is called when total memory size exceeds the
* chunk group size threshold.
*
* @param bytes - data of several pages which has been packed
* @throws IOException if an I/O error occurs.
*/
public void writeBytesToStream(PublicBAOS bytes) throws IOException {
bytes.writeTo(out.wrapAsStream());
}
protected void startFile() throws IOException {
out.write(MAGIC_STRING_BYTES);
out.write(VERSION_NUMBER_BYTE);
}
public void startChunkGroup(String deviceId) throws IOException {
this.currentChunkGroupDeviceId = deviceId;
if (logger.isDebugEnabled()) {
logger.debug("start chunk group:{}, file position {}", deviceId, out.getPosition());
}
chunkMetadataList = new ArrayList<>();
ChunkGroupHeader chunkGroupHeader = new ChunkGroupHeader(currentChunkGroupDeviceId);
chunkGroupHeader.serializeTo(out.wrapAsStream());
}
/**
* end chunk and write some log. If there is no data in the chunk group, nothing will be flushed.
*/
public void endChunkGroup() throws IOException {
if (currentChunkGroupDeviceId == null || chunkMetadataList.isEmpty()) {
return;
}
chunkGroupMetadataList.add(
new ChunkGroupMetadata(currentChunkGroupDeviceId, chunkMetadataList));
currentChunkGroupDeviceId = null;
chunkMetadataList = null;
out.flush();
}
/**
* start a {@linkplain ChunkMetadata ChunkMetaData}.
*
* @param measurementId - measurementId of this time series
* @param compressionCodecName - compression name of this time series
* @param tsDataType - data type
* @param statistics - Chunk statistics
* @param dataSize - the serialized size of all pages
* @param mask - 0x80 for time chunk, 0x40 for value chunk, 0x00 for common chunk
* @throws IOException if I/O error occurs
*/
public void startFlushChunk(
String measurementId,
CompressionType compressionCodecName,
TSDataType tsDataType,
TSEncoding encodingType,
Statistics<?> statistics,
int dataSize,
int numOfPages,
int mask)
throws IOException {
currentChunkMetadata =
new ChunkMetadata(measurementId, tsDataType, out.getPosition(), statistics);
currentChunkMetadata.setMask((byte) mask);
ChunkHeader header =
new ChunkHeader(
measurementId,
dataSize,
tsDataType,
compressionCodecName,
encodingType,
numOfPages,
mask);
header.serializeTo(out.wrapAsStream());
}
/** Write a whole chunk in another file into this file. Providing fast merge for IoTDB. */
public void writeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws IOException {
ChunkHeader chunkHeader = chunk.getHeader();
currentChunkMetadata =
new ChunkMetadata(
chunkHeader.getMeasurementID(),
chunkHeader.getDataType(),
out.getPosition(),
chunkMetadata.getStatistics());
chunkHeader.serializeTo(out.wrapAsStream());
out.write(chunk.getData());
endCurrentChunk();
if (logger.isDebugEnabled()) {
logger.debug(
"end flushing a chunk:{}, totalvalue:{}",
chunkHeader.getMeasurementID(),
chunkMetadata.getNumOfPoints());
}
}
/** end chunk and write some log. */
public void endCurrentChunk() {
chunkMetadataList.add(currentChunkMetadata);
currentChunkMetadata = null;
}
/**
* write {@linkplain TsFileMetadata TSFileMetaData} to output stream and close it.
*
* @throws IOException if I/O error occurs
*/
public void endFile() throws IOException {
long metaOffset = out.getPosition();
// serialize the SEPARATOR of MetaData
ReadWriteIOUtils.write(MetaMarker.SEPARATOR, out.wrapAsStream());
// group ChunkMetadata by series
Map<Path, List<IChunkMetadata>> chunkMetadataListMap = new TreeMap<>();
for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
for (IChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
Path series = new Path(chunkGroupMetadata.getDevice(), chunkMetadata.getMeasurementUid());
chunkMetadataListMap.computeIfAbsent(series, k -> new ArrayList<>()).add(chunkMetadata);
}
}
MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap);
TsFileMetadata tsFileMetaData = new TsFileMetadata();
tsFileMetaData.setMetadataIndex(metadataIndex);
tsFileMetaData.setMetaOffset(metaOffset);
long footerIndex = out.getPosition();
if (logger.isDebugEnabled()) {
logger.debug("start to flush the footer,file pos:{}", footerIndex);
}
// write TsFileMetaData
int size = tsFileMetaData.serializeTo(out.wrapAsStream());
if (logger.isDebugEnabled()) {
logger.debug("finish flushing the footer {}, file pos:{}", tsFileMetaData, out.getPosition());
}
// write bloom filter
size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(), chunkMetadataListMap.keySet());
if (logger.isDebugEnabled()) {
logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition());
}
// write TsFileMetaData size
ReadWriteIOUtils.write(size, out.wrapAsStream()); // write the size of the file metadata.
// write magic string
out.write(MAGIC_STRING_BYTES);
// close file
out.close();
if (resourceLogger.isDebugEnabled() && file != null) {
resourceLogger.debug("{} writer is closed.", file.getName());
}
canWrite = false;
}
/**
* Flush TsFileMetadata, including ChunkMetadataList and TimeseriesMetaData
*
* @return MetadataIndexEntry list in TsFileMetadata
*/
private MetadataIndexNode flushMetadataIndex(Map<Path, List<IChunkMetadata>> chunkMetadataListMap)
throws IOException {
// convert ChunkMetadataList to this field
deviceTimeseriesMetadataMap = new LinkedHashMap<>();
// create device -> TimeseriesMetaDataList Map
for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
Path path = entry.getKey();
String device = path.getDevice();
// create TimeseriesMetaData
PublicBAOS publicBAOS = new PublicBAOS();
TSDataType dataType = entry.getValue().get(entry.getValue().size() - 1).getDataType();
Statistics seriesStatistics = Statistics.getStatsByType(dataType);
int chunkMetadataListLength = 0;
boolean serializeStatistic = (entry.getValue().size() > 1);
// flush chunkMetadataList one by one
for (IChunkMetadata chunkMetadata : entry.getValue()) {
if (!chunkMetadata.getDataType().equals(dataType)) {
continue;
}
chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
}
TimeseriesMetadata timeseriesMetadata =
new TimeseriesMetadata(
(byte)
((serializeStatistic ? (byte) 1 : (byte) 0) | entry.getValue().get(0).getMask()),
chunkMetadataListLength,
path.getMeasurement(),
dataType,
seriesStatistics,
publicBAOS);
deviceTimeseriesMetadataMap
.computeIfAbsent(device, k -> new ArrayList<>())
.add(timeseriesMetadata);
}
// construct TsFileMetadata and return
return MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, out);
}
/**
* get the length of normal OutputStream.
*
* @return - length of normal OutputStream
* @throws IOException if I/O error occurs
*/
public long getPos() throws IOException {
return out.getPosition();
}
// device -> ChunkMetadataList
public Map<String, List<ChunkMetadata>> getDeviceChunkMetadataMap() {
Map<String, List<ChunkMetadata>> deviceChunkMetadataMap = new HashMap<>();
for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
deviceChunkMetadataMap
.computeIfAbsent(chunkGroupMetadata.getDevice(), k -> new ArrayList<>())
.addAll(chunkGroupMetadata.getChunkMetadataList());
}
return deviceChunkMetadataMap;
}
public boolean canWrite() {
return canWrite;
}
public void mark() throws IOException {
markedPosition = getPos();
}
public void reset() throws IOException {
out.truncate(markedPosition);
}
/**
* close the outputStream or file channel without writing FileMetadata. This is just used for
* Testing.
*/
public void close() throws IOException {
canWrite = false;
out.close();
}
void writeSeparatorMaskForTest() throws IOException {
out.write(new byte[] {MetaMarker.SEPARATOR});
}
void writeChunkGroupMarkerForTest() throws IOException {
out.write(new byte[] {MetaMarker.CHUNK_GROUP_HEADER});
}
public File getFile() {
return file;
}
public void setFile(File file) {
this.file = file;
}
/** Remove such ChunkMetadata that its startTime is not in chunkStartTimes */
public void filterChunks(Map<Path, List<Long>> chunkStartTimes) {
Map<Path, Integer> startTimeIdxes = new HashMap<>();
chunkStartTimes.forEach((p, t) -> startTimeIdxes.put(p, 0));
Iterator<ChunkGroupMetadata> chunkGroupMetaDataIterator = chunkGroupMetadataList.iterator();
while (chunkGroupMetaDataIterator.hasNext()) {
ChunkGroupMetadata chunkGroupMetaData = chunkGroupMetaDataIterator.next();
String deviceId = chunkGroupMetaData.getDevice();
int chunkNum = chunkGroupMetaData.getChunkMetadataList().size();
Iterator<ChunkMetadata> chunkMetaDataIterator =
chunkGroupMetaData.getChunkMetadataList().iterator();
while (chunkMetaDataIterator.hasNext()) {
IChunkMetadata chunkMetaData = chunkMetaDataIterator.next();
Path path = new Path(deviceId, chunkMetaData.getMeasurementUid());
int startTimeIdx = startTimeIdxes.get(path);
List<Long> pathChunkStartTimes = chunkStartTimes.get(path);
boolean chunkValid =
startTimeIdx < pathChunkStartTimes.size()
&& pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime();
if (!chunkValid) {
chunkMetaDataIterator.remove();
chunkNum--;
} else {
startTimeIdxes.put(path, startTimeIdx + 1);
}
}
if (chunkNum == 0) {
chunkGroupMetaDataIterator.remove();
}
}
}
public void writePlanIndices() throws IOException {
ReadWriteIOUtils.write(MetaMarker.OPERATION_INDEX_RANGE, out.wrapAsStream());
ReadWriteIOUtils.write(minPlanIndex, out.wrapAsStream());
ReadWriteIOUtils.write(maxPlanIndex, out.wrapAsStream());
out.flush();
}
/**
* this function is only for Test.
*
* @return TsFileOutput
*/
public TsFileOutput getIOWriterOut() {
return out;
}
/**
* this function is only for Upgrade Tool.
*
* @return DeviceTimeseriesMetadataMap
*/
public Map<String, List<TimeseriesMetadata>> getDeviceTimeseriesMetadataMap() {
return deviceTimeseriesMetadataMap;
}
public long getMinPlanIndex() {
return minPlanIndex;
}
public void setMinPlanIndex(long minPlanIndex) {
this.minPlanIndex = minPlanIndex;
}
public long getMaxPlanIndex() {
return maxPlanIndex;
}
public void setMaxPlanIndex(long maxPlanIndex) {
this.maxPlanIndex = maxPlanIndex;
}
}