blob: cdbb8b32e850635cd0bb1b88095f37aa44c406bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tsfile.write;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.exception.write.NoMeasurementException;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.utils.MeasurementGroup;
import org.apache.tsfile.write.chunk.AlignedChunkGroupWriterImpl;
import org.apache.tsfile.write.chunk.IChunkGroupWriter;
import org.apache.tsfile.write.chunk.NonAlignedChunkGroupWriterImpl;
import org.apache.tsfile.write.record.TSRecord;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.record.datapoint.DataPoint;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.tsfile.write.schema.Schema;
import org.apache.tsfile.write.schema.VectorMeasurementSchema;
import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
import org.apache.tsfile.write.writer.TsFileIOWriter;
import org.apache.tsfile.write.writer.TsFileOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* TsFileWriter is the entrance for writing processing. It receives a record and send it to
* responding chunk group write. It checks memory size for all writing processing along its strategy
* and flush data stored in memory to OutputStream. At the end of writing, user should call {@code
* close()} method to flush the last data outside and close the normal outputStream and error
* outputStream.
*/
public class TsFileWriter implements AutoCloseable {
protected static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
private static final Logger LOG = LoggerFactory.getLogger(TsFileWriter.class);
/** schema of this TsFile. */
protected final Schema schema;
/** IO writer of this TsFile. */
private final TsFileIOWriter fileWriter;
private final int pageSize;
private long recordCount = 0;
// deviceId -> measurementIdList
private Map<IDeviceID, List<String>> flushedMeasurementsInDeviceMap = new HashMap<>();
// DeviceId -> LastTime
private Map<IDeviceID, Long> alignedDeviceLastTimeMap = new HashMap<>();
// TimeseriesId -> LastTime
private Map<IDeviceID, Map<String, Long>> nonAlignedTimeseriesLastTimeMap = new HashMap<>();
/**
* if true, this tsfile allow unsequential data when writing; Otherwise, it limits the user to
* write only sequential data
*/
private boolean isUnseq = false;
private Map<IDeviceID, IChunkGroupWriter> groupWriters = new HashMap<>();
/** min value of threshold of data points num check. */
private long recordCountForNextMemCheck = 100;
private long chunkGroupSizeThreshold;
/**
* init this TsFileWriter.
*
* @param file the File to be written by this TsFileWriter
*/
public TsFileWriter(File file) throws IOException {
this(new TsFileIOWriter(file), new Schema(), TSFileDescriptor.getInstance().getConfig());
}
/**
* init this TsFileWriter.
*
* @param fileWriter the io writer of this TsFile
*/
public TsFileWriter(TsFileIOWriter fileWriter) throws IOException {
this(fileWriter, new Schema(), TSFileDescriptor.getInstance().getConfig());
}
/**
* init this TsFileWriter.
*
* @param file the File to be written by this TsFileWriter
* @param schema the schema of this TsFile
*/
public TsFileWriter(File file, Schema schema) throws IOException {
this(new TsFileIOWriter(file), schema, TSFileDescriptor.getInstance().getConfig());
}
/**
* init this TsFileWriter.
*
* @param output the TsFileOutput of the file to be written by this TsFileWriter
* @param schema the schema of this TsFile
* @throws IOException
*/
public TsFileWriter(TsFileOutput output, Schema schema) throws IOException {
this(new TsFileIOWriter(output), schema, TSFileDescriptor.getInstance().getConfig());
}
/**
* init this TsFileWriter.
*
* @param file the File to be written by this TsFileWriter
* @param schema the schema of this TsFile
* @param conf the configuration of this TsFile
*/
public TsFileWriter(File file, Schema schema, TSFileConfig conf) throws IOException {
this(new TsFileIOWriter(file), schema, conf);
}
/**
* init this TsFileWriter.
*
* @param fileWriter the io writer of this TsFile
* @param schema the schema of this TsFile
* @param conf the configuration of this TsFile
*/
protected TsFileWriter(TsFileIOWriter fileWriter, Schema schema, TSFileConfig conf)
throws IOException {
if (!fileWriter.canWrite()) {
throw new IOException(
"the given file Writer does not support writing any more. Maybe it is an complete TsFile");
}
this.fileWriter = fileWriter;
if (fileWriter instanceof RestorableTsFileIOWriter) {
Map<Path, IMeasurementSchema> schemaMap =
((RestorableTsFileIOWriter) fileWriter).getKnownSchema();
Map<Path, MeasurementGroup> measurementGroupMap = new HashMap<>();
for (Map.Entry<Path, IMeasurementSchema> entry : schemaMap.entrySet()) {
IMeasurementSchema measurementSchema = entry.getValue();
if (measurementSchema instanceof VectorMeasurementSchema) {
MeasurementGroup group =
measurementGroupMap.getOrDefault(
new Path(entry.getKey().getDevice()), new MeasurementGroup(true));
List<String> measurementList = measurementSchema.getSubMeasurementsList();
for (int i = 0; i < measurementList.size(); i++) {
group
.getMeasurementSchemaMap()
.put(
measurementList.get(i),
new MeasurementSchema(
measurementList.get(i),
measurementSchema.getSubMeasurementsTSDataTypeList().get(i),
measurementSchema.getSubMeasurementsTSEncodingList().get(i)));
}
measurementGroupMap.put(new Path(entry.getKey().getDevice()), group);
} else {
MeasurementGroup group =
measurementGroupMap.getOrDefault(
new Path(entry.getKey().getDevice()), new MeasurementGroup(false));
group
.getMeasurementSchemaMap()
.put(measurementSchema.getMeasurementId(), (MeasurementSchema) measurementSchema);
measurementGroupMap.put(new Path(entry.getKey().getDevice()), group);
}
}
this.schema = new Schema(measurementGroupMap);
} else {
this.schema = schema;
}
this.pageSize = conf.getPageSizeInByte();
this.chunkGroupSizeThreshold = conf.getGroupSizeInByte();
config.setTSFileStorageFs(conf.getTSFileStorageFs());
if (this.pageSize >= chunkGroupSizeThreshold) {
LOG.warn(
"TsFile's page size {} is greater than chunk group size {}, please enlarge the chunk group"
+ " size or decrease page size. ",
pageSize,
chunkGroupSizeThreshold);
}
}
public void registerSchemaTemplate(
String templateName, Map<String, MeasurementSchema> template, boolean isAligned) {
schema.registerSchemaTemplate(templateName, new MeasurementGroup(isAligned, template));
}
/**
* This method is used to register all timeseries in the specified template under the specified
* device.
*
* @param deviceId
* @param templateName
* @throws WriteProcessException
*/
public void registerDevice(String deviceId, String templateName) throws WriteProcessException {
if (!schema.getSchemaTemplates().containsKey(templateName)) {
throw new WriteProcessException("given template is not existed! " + templateName);
}
if (schema.getRegisteredTimeseriesMap().containsKey(new Path(deviceId))) {
throw new WriteProcessException(
"this device "
+ deviceId
+ " has been registered, you can only use registerDevice method to register empty device.");
}
schema.registerDevice(deviceId, templateName);
}
/**
* Register nonAligned timeseries by single.
*
* @param devicePath
* @param measurementSchema
* @throws WriteProcessException
*/
public void registerTimeseries(Path devicePath, MeasurementSchema measurementSchema)
throws WriteProcessException {
MeasurementGroup measurementGroup;
if (schema.containsDevice(devicePath)) {
measurementGroup = schema.getSeriesSchema(devicePath);
if (measurementGroup.isAligned()) {
throw new WriteProcessException(
"given device " + devicePath + " has been registered for aligned timeseries.");
} else if (measurementGroup
.getMeasurementSchemaMap()
.containsKey(measurementSchema.getMeasurementId())) {
throw new WriteProcessException(
"given nonAligned timeseries "
+ (devicePath + "." + measurementSchema.getMeasurementId())
+ " has been registered.");
}
} else {
measurementGroup = new MeasurementGroup(false);
}
measurementGroup
.getMeasurementSchemaMap()
.put(measurementSchema.getMeasurementId(), measurementSchema);
schema.registerMeasurementGroup(devicePath, measurementGroup);
}
/**
* Register nonAligned timeseries by groups.
*
* @param devicePath
* @param measurementSchemas
*/
public void registerTimeseries(Path devicePath, List<MeasurementSchema> measurementSchemas) {
for (MeasurementSchema schema : measurementSchemas) {
try {
registerTimeseries(devicePath, schema);
} catch (WriteProcessException e) {
LOG.warn(e.getMessage());
}
}
}
/**
* Register aligned timeseries. Once the device is registered for aligned timeseries, it cannot be
* expanded.
*
* @param devicePath
* @param measurementSchemas
* @throws WriteProcessException
*/
public void registerAlignedTimeseries(Path devicePath, List<MeasurementSchema> measurementSchemas)
throws WriteProcessException {
if (schema.containsDevice(devicePath)) {
if (schema.getSeriesSchema(devicePath).isAligned()) {
throw new WriteProcessException(
"given device "
+ devicePath
+ " has been registered for aligned timeseries and should not be expanded.");
} else {
throw new WriteProcessException(
"given device " + devicePath + " has been registered for nonAligned timeseries.");
}
}
MeasurementGroup measurementGroup = new MeasurementGroup(true);
measurementSchemas.forEach(
measurementSchema -> {
measurementGroup
.getMeasurementSchemaMap()
.put(measurementSchema.getMeasurementId(), measurementSchema);
});
schema.registerMeasurementGroup(devicePath, measurementGroup);
}
private boolean checkIsTimeseriesExist(TSRecord record, boolean isAligned)
throws WriteProcessException, IOException {
// initial ChunkGroupWriter of this device in the TSRecord
IChunkGroupWriter groupWriter =
tryToInitialGroupWriter(new PlainDeviceID(record.deviceId), isAligned);
// initial all SeriesWriters of measurements in this TSRecord
Path devicePath = new Path(record.deviceId);
List<MeasurementSchema> measurementSchemas;
if (schema.containsDevice(devicePath)) {
measurementSchemas =
checkIsAllMeasurementsInGroup(
record.dataPointList, schema.getSeriesSchema(devicePath), isAligned);
if (isAligned) {
for (IMeasurementSchema s : measurementSchemas) {
if (flushedMeasurementsInDeviceMap.containsKey(
new PlainDeviceID(devicePath.getFullPath()))
&& !flushedMeasurementsInDeviceMap
.get(new PlainDeviceID(devicePath.getFullPath()))
.contains(s.getMeasurementId())) {
throw new WriteProcessException(
"TsFile has flushed chunk group and should not add new measurement "
+ s.getMeasurementId()
+ " in device "
+ devicePath.getFullPath());
}
}
}
groupWriter.tryToAddSeriesWriter(measurementSchemas);
} else if (schema.getSchemaTemplates() != null && schema.getSchemaTemplates().size() == 1) {
// use the default template without needing to register device
MeasurementGroup measurementGroup =
schema.getSchemaTemplates().entrySet().iterator().next().getValue();
measurementSchemas =
checkIsAllMeasurementsInGroup(record.dataPointList, measurementGroup, isAligned);
groupWriter.tryToAddSeriesWriter(measurementSchemas);
} else {
throw new NoMeasurementException("input devicePath is invalid: " + devicePath);
}
return true;
}
private void checkIsTimeseriesExist(Tablet tablet, boolean isAligned)
throws WriteProcessException, IOException {
IChunkGroupWriter groupWriter =
tryToInitialGroupWriter(new PlainDeviceID(tablet.deviceId), isAligned);
Path devicePath = new Path(tablet.deviceId);
List<MeasurementSchema> schemas = tablet.getSchemas();
if (schema.containsDevice(devicePath)) {
checkIsAllMeasurementsInGroup(schema.getSeriesSchema(devicePath), schemas, isAligned);
if (isAligned) {
for (IMeasurementSchema s : schemas) {
if (flushedMeasurementsInDeviceMap.containsKey(
new PlainDeviceID(devicePath.getFullPath()))
&& !flushedMeasurementsInDeviceMap
.get(new PlainDeviceID(devicePath.getFullPath()))
.contains(s.getMeasurementId())) {
throw new WriteProcessException(
"TsFile has flushed chunk group and should not add new measurement "
+ s.getMeasurementId()
+ " in device "
+ devicePath.getFullPath());
}
}
}
groupWriter.tryToAddSeriesWriter(schemas);
} else if (schema.getSchemaTemplates() != null && schema.getSchemaTemplates().size() == 1) {
MeasurementGroup measurementGroup =
schema.getSchemaTemplates().entrySet().iterator().next().getValue();
checkIsAllMeasurementsInGroup(measurementGroup, schemas, isAligned);
groupWriter.tryToAddSeriesWriter(schemas);
} else {
throw new NoMeasurementException("input devicePath is invalid: " + devicePath);
}
}
/**
* If it's aligned, then all measurementSchemas should be contained in the measurementGroup, or it
* will throw exception. If it's nonAligned, then remove the measurementSchema that is not
* contained in the measurementGroup.
*
* @param measurementGroup
* @param measurementSchemas
* @param isAligned
* @throws NoMeasurementException
*/
private void checkIsAllMeasurementsInGroup(
MeasurementGroup measurementGroup,
List<MeasurementSchema> measurementSchemas,
boolean isAligned)
throws NoMeasurementException {
if (isAligned && !measurementGroup.isAligned()) {
throw new NoMeasurementException("no aligned timeseries is registered in the group.");
} else if (!isAligned && measurementGroup.isAligned()) {
throw new NoMeasurementException("no nonAligned timeseries is registered in the group.");
}
for (MeasurementSchema measurementSchema : measurementSchemas) {
if (!measurementGroup
.getMeasurementSchemaMap()
.containsKey(measurementSchema.getMeasurementId())) {
if (isAligned) {
throw new NoMeasurementException(
"measurement "
+ measurementSchema.getMeasurementId()
+ " is not registered or in the default template");
} else {
measurementSchemas.remove(measurementSchema);
}
}
}
}
/**
* Check whether all measurements of dataPoints list are in the measurementGroup.
*
* @param dataPoints
* @param measurementGroup
* @param isAligned
* @return
* @throws NoMeasurementException
*/
private List<MeasurementSchema> checkIsAllMeasurementsInGroup(
List<DataPoint> dataPoints, MeasurementGroup measurementGroup, boolean isAligned)
throws NoMeasurementException {
if (isAligned && !measurementGroup.isAligned()) {
throw new NoMeasurementException("no aligned timeseries is registered in the group.");
} else if (!isAligned && measurementGroup.isAligned()) {
throw new NoMeasurementException("no nonAligned timeseries is registered in the group.");
}
List<MeasurementSchema> schemas = new ArrayList<>();
for (DataPoint dataPoint : dataPoints) {
if (!measurementGroup.getMeasurementSchemaMap().containsKey(dataPoint.getMeasurementId())) {
if (isAligned) {
throw new NoMeasurementException(
"aligned measurement "
+ dataPoint.getMeasurementId()
+ " is not registered or in the default template");
} else {
LOG.warn(
"Ignore nonAligned measurement "
+ dataPoint.getMeasurementId()
+ " , because it is not registered or in the default template");
}
} else {
schemas.add(measurementGroup.getMeasurementSchemaMap().get(dataPoint.getMeasurementId()));
}
}
return schemas;
}
private IChunkGroupWriter tryToInitialGroupWriter(IDeviceID deviceId, boolean isAligned) {
IChunkGroupWriter groupWriter;
if (!groupWriters.containsKey(deviceId)) {
if (isAligned) {
groupWriter = new AlignedChunkGroupWriterImpl(deviceId);
if (!isUnseq) { // Sequence File
((AlignedChunkGroupWriterImpl) groupWriter)
.setLastTime(alignedDeviceLastTimeMap.getOrDefault(deviceId, -1L));
}
} else {
groupWriter = new NonAlignedChunkGroupWriterImpl(deviceId);
if (!isUnseq) { // Sequence File
((NonAlignedChunkGroupWriterImpl) groupWriter)
.setLastTimeMap(
nonAlignedTimeseriesLastTimeMap.getOrDefault(deviceId, new HashMap<>()));
}
}
groupWriters.put(deviceId, groupWriter);
} else {
groupWriter = groupWriters.get(deviceId);
}
return groupWriter;
}
/**
* write a record in type of T.
*
* @param record - record responding a data line
* @return true -size of tsfile or metadata reaches the threshold. false - otherwise
* @throws IOException exception in IO
* @throws WriteProcessException exception in write process
*/
public boolean write(TSRecord record) throws IOException, WriteProcessException {
checkIsTimeseriesExist(record, false);
recordCount +=
groupWriters
.get(new PlainDeviceID(record.deviceId))
.write(record.time, record.dataPointList);
return checkMemorySizeAndMayFlushChunks();
}
public boolean writeAligned(TSRecord record) throws IOException, WriteProcessException {
checkIsTimeseriesExist(record, true);
recordCount +=
groupWriters
.get(new PlainDeviceID(record.deviceId))
.write(record.time, record.dataPointList);
return checkMemorySizeAndMayFlushChunks();
}
/**
* write a tablet
*
* @param tablet - multiple time series of one device that share a time column
* @throws IOException exception in IO
* @throws WriteProcessException exception in write process
*/
public boolean write(Tablet tablet) throws IOException, WriteProcessException {
// make sure the ChunkGroupWriter for this Tablet exist
checkIsTimeseriesExist(tablet, false);
// get corresponding ChunkGroupWriter and write this Tablet
recordCount += groupWriters.get(new PlainDeviceID(tablet.deviceId)).write(tablet);
return checkMemorySizeAndMayFlushChunks();
}
public boolean writeAligned(Tablet tablet) throws IOException, WriteProcessException {
// make sure the ChunkGroupWriter for this Tablet exist
checkIsTimeseriesExist(tablet, true);
// get corresponding ChunkGroupWriter and write this Tablet
recordCount += groupWriters.get(new PlainDeviceID(tablet.deviceId)).write(tablet);
return checkMemorySizeAndMayFlushChunks();
}
/**
* calculate total memory size occupied by all ChunkGroupWriter instances currently.
*
* @return total memory size used
*/
private long calculateMemSizeForAllGroup() {
long memTotalSize = 0;
for (IChunkGroupWriter group : groupWriters.values()) {
memTotalSize += group.updateMaxGroupMemSize();
}
return memTotalSize;
}
/**
* check occupied memory size, if it exceeds the chunkGroupSize threshold, flush them to given
* OutputStream.
*
* @return true - size of tsfile or metadata reaches the threshold. false - otherwise
* @throws IOException exception in IO
*/
private boolean checkMemorySizeAndMayFlushChunks() throws IOException {
if (recordCount >= recordCountForNextMemCheck) {
long memSize = calculateMemSizeForAllGroup();
assert memSize > 0;
if (memSize > chunkGroupSizeThreshold) {
LOG.debug("start to flush chunk groups, memory space occupy:{}", memSize);
recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize;
return flushAllChunkGroups();
} else {
recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize;
return false;
}
}
return false;
}
/**
* flush the data in all series writers of all chunk group writers and their page writers to
* outputStream.
*
* @return true - size of tsfile or metadata reaches the threshold. false - otherwise. But this
* function just return false, the Override of IoTDB may return true.
* @throws IOException exception in IO
*/
public boolean flushAllChunkGroups() throws IOException {
if (recordCount > 0) {
for (Map.Entry<IDeviceID, IChunkGroupWriter> entry : groupWriters.entrySet()) {
IDeviceID deviceId = entry.getKey();
IChunkGroupWriter groupWriter = entry.getValue();
fileWriter.startChunkGroup(deviceId);
long pos = fileWriter.getPos();
long dataSize = groupWriter.flushToFileWriter(fileWriter);
if (fileWriter.getPos() - pos != dataSize) {
throw new IOException(
String.format(
"Flushed data size is inconsistent with computation! Estimated: %d, Actual: %d",
dataSize, fileWriter.getPos() - pos));
}
fileWriter.endChunkGroup();
if (groupWriter instanceof AlignedChunkGroupWriterImpl) {
// add flushed measurements
List<String> measurementList =
flushedMeasurementsInDeviceMap.computeIfAbsent(deviceId, p -> new ArrayList<>());
((AlignedChunkGroupWriterImpl) groupWriter)
.getMeasurements()
.forEach(
measurementId -> {
if (!measurementList.contains(measurementId)) {
measurementList.add(measurementId);
}
});
// add lastTime
if (!isUnseq) { // Sequence TsFile
this.alignedDeviceLastTimeMap.put(
deviceId, ((AlignedChunkGroupWriterImpl) groupWriter).getLastTime());
}
} else {
// add lastTime
if (!isUnseq) { // Sequence TsFile
this.nonAlignedTimeseriesLastTimeMap.put(
deviceId, ((NonAlignedChunkGroupWriterImpl) groupWriter).getLastTimeMap());
}
}
}
reset();
}
return false;
}
private void reset() {
groupWriters.clear();
recordCount = 0;
}
/**
* calling this method to write the last data remaining in memory and close the normal and error
* OutputStream.
*
* @throws IOException exception in IO
*/
@Override
public void close() throws IOException {
LOG.info("start close file");
flushAllChunkGroups();
fileWriter.endFile();
}
/**
* this function is only for Test.
*
* @return TsFileIOWriter
*/
public TsFileIOWriter getIOWriter() {
return this.fileWriter;
}
}