blob: 7d6f22405c03424115aa880354adc1bd9ba76029 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.write;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.constant.TestConstant;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.file.metadata.utils.TestHelper;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.MeasurementGroup;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class TsFileIOWriterTest {
private static final String FILE_PATH =
TestConstant.BASE_OUTPUT_PATH.concat("TsFileIOWriterTest.tsfile");
private static final IDeviceID DEVICE_1 = new PlainDeviceID("device1");
private static final IDeviceID DEVICE_2 = new PlainDeviceID("device2");
private static final String SENSOR_1 = "sensor1";
private static final int CHUNK_GROUP_NUM = 2;
@Before
public void before() throws IOException {
TsFileIOWriter writer = new TsFileIOWriter(new File(FILE_PATH));
// file schema
MeasurementSchema measurementSchema = TestHelper.createSimpleMeasurementSchema(SENSOR_1);
VectorMeasurementSchema vectorMeasurementSchema =
new VectorMeasurementSchema(
"", new String[] {"s1", "s2"}, new TSDataType[] {TSDataType.INT64, TSDataType.INT64});
List<MeasurementSchema> schemas = new ArrayList<>();
schemas.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
schemas.add(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
MeasurementGroup group = new MeasurementGroup(true, schemas);
Schema schema = new Schema();
schema.registerTimeseries(new Path(DEVICE_1), measurementSchema);
schema.registerMeasurementGroup(new Path(DEVICE_2), group);
writeChunkGroup(writer, measurementSchema);
writeVectorChunkGroup(writer, vectorMeasurementSchema);
writer.setMinPlanIndex(100);
writer.setMaxPlanIndex(10000);
writer.writePlanIndices();
// end file
writer.endFile();
}
@After
public void after() {
File file = new File(FILE_PATH);
if (file.exists()) {
file.delete();
}
}
@Test
public void endFileTest() throws IOException {
TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH);
// magic_string
Assert.assertEquals(TSFileConfig.MAGIC_STRING, reader.readHeadMagic());
Assert.assertEquals(TSFileConfig.VERSION_NUMBER, reader.readVersionNumber());
Assert.assertEquals(TSFileConfig.MAGIC_STRING, reader.readTailMagic());
reader.position(TSFileConfig.MAGIC_STRING.getBytes().length + 1);
ChunkHeader header;
ChunkGroupHeader chunkGroupHeader;
for (int i = 0; i < CHUNK_GROUP_NUM; i++) {
// chunk group header
Assert.assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker());
chunkGroupHeader = reader.readChunkGroupHeader();
Assert.assertEquals(DEVICE_1, chunkGroupHeader.getDeviceID());
// ordinary chunk header
Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, reader.readMarker());
header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
Assert.assertEquals(SENSOR_1, header.getMeasurementID());
}
for (int i = 0; i < CHUNK_GROUP_NUM; i++) {
// chunk group header
Assert.assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker());
chunkGroupHeader = reader.readChunkGroupHeader();
Assert.assertEquals(DEVICE_2, chunkGroupHeader.getDeviceID());
// vector chunk header (time)
Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER, reader.readMarker());
header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
Assert.assertEquals("", header.getMeasurementID());
// vector chunk header (values)
Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, reader.readMarker());
header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
Assert.assertEquals("s1", header.getMeasurementID());
Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, reader.readMarker());
header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
Assert.assertEquals("s2", header.getMeasurementID());
}
Assert.assertEquals(MetaMarker.OPERATION_INDEX_RANGE, reader.readMarker());
reader.readPlanIndex();
Assert.assertEquals(100, reader.getMinPlanIndex());
Assert.assertEquals(10000, reader.getMaxPlanIndex());
Assert.assertEquals(MetaMarker.SEPARATOR, reader.readMarker());
// make sure timeseriesMetadata is only
Map<IDeviceID, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
reader.getAllTimeseriesMetadata(false);
Set<String> pathSet = new HashSet<>();
for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
deviceTimeseriesMetadataMap.entrySet()) {
for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
String seriesPath =
((PlainDeviceID) entry.getKey()).toStringID()
+ "."
+ timeseriesMetadata.getMeasurementId();
Assert.assertFalse(pathSet.contains(seriesPath));
pathSet.add(seriesPath);
}
}
// FileMetaData
TsFileMetadata metaData = reader.readFileMetadata();
Assert.assertEquals(2, metaData.getMetadataIndex().getChildren().size());
}
private void writeChunkGroup(TsFileIOWriter writer, MeasurementSchema measurementSchema)
throws IOException {
for (int i = 0; i < CHUNK_GROUP_NUM; i++) {
// chunk group
writer.startChunkGroup(DEVICE_1);
// ordinary chunk, chunk statistics
Statistics statistics = Statistics.getStatsByType(measurementSchema.getType());
statistics.updateStats(0L, 0L);
writer.startFlushChunk(
measurementSchema.getMeasurementId(),
measurementSchema.getCompressor(),
measurementSchema.getType(),
measurementSchema.getEncodingType(),
statistics,
0,
0,
0);
writer.endCurrentChunk();
writer.endChunkGroup();
}
}
private void writeVectorChunkGroup(
TsFileIOWriter writer, VectorMeasurementSchema vectorMeasurementSchema) throws IOException {
for (int i = 0; i < CHUNK_GROUP_NUM; i++) {
// chunk group
writer.startChunkGroup(DEVICE_2);
// vector chunk (time)
writer.startFlushChunk(
vectorMeasurementSchema.getMeasurementId(),
vectorMeasurementSchema.getCompressor(),
vectorMeasurementSchema.getType(),
vectorMeasurementSchema.getTimeTSEncoding(),
Statistics.getStatsByType(vectorMeasurementSchema.getType()),
0,
0,
TsFileConstant.TIME_COLUMN_MASK);
writer.endCurrentChunk();
// vector chunk (values)
for (int j = 0; j < vectorMeasurementSchema.getSubMeasurementsCount(); j++) {
Statistics subStatistics =
Statistics.getStatsByType(
vectorMeasurementSchema.getSubMeasurementsTSDataTypeList().get(j));
subStatistics.updateStats(0L, 0L);
writer.startFlushChunk(
vectorMeasurementSchema.getSubMeasurementsList().get(j),
vectorMeasurementSchema.getCompressor(),
vectorMeasurementSchema.getSubMeasurementsTSDataTypeList().get(j),
vectorMeasurementSchema.getSubMeasurementsTSEncodingList().get(j),
subStatistics,
0,
0,
TsFileConstant.VALUE_COLUMN_MASK);
writer.endCurrentChunk();
}
writer.endChunkGroup();
}
}
}