blob: eb183a44db7f65eef9fa14c608cf3eedab81de49 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.write;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileReader;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class TsFileWriteApiTest {
private final File f = FSFactoryProducer.getFSFactory().getFile("TsFileWriteTest.tsfile");
private final String deviceId = "root.sg.d1";
private final List<MeasurementSchema> alignedMeasurementSchemas = new ArrayList<>();
private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
private int oldChunkGroupSize = TSFileDescriptor.getInstance().getConfig().getGroupSizeInByte();
private int oldMaxNumOfPointsInPage =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
@Before
public void setUp() {
if (f.exists() && !f.delete()) {
throw new RuntimeException("can not delete " + f.getAbsolutePath());
}
}
@After
public void end() {
if (f.exists()) f.delete();
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(oldMaxNumOfPointsInPage);
TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(oldChunkGroupSize);
}
private void setEnv(int chunkGroupSize, int pageSize) {
TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(chunkGroupSize);
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(pageSize);
}
public void registerAlignedTimeseries(TsFileWriter tsFileWriter) throws WriteProcessException {
alignedMeasurementSchemas.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN));
alignedMeasurementSchemas.add(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.PLAIN));
alignedMeasurementSchemas.add(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.PLAIN));
alignedMeasurementSchemas.add(new MeasurementSchema("s4", TSDataType.INT64, TSEncoding.RLE));
// register align timeseries
tsFileWriter.registerAlignedTimeseries(new Path(deviceId), alignedMeasurementSchemas);
}
public void registerTimeseries(TsFileWriter tsFileWriter) {
measurementSchemas.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.PLAIN));
// register nonAlign timeseries
tsFileWriter.registerTimeseries(new Path(deviceId), measurementSchemas);
}
@Test
public void writeWithTsRecord() throws IOException, WriteProcessException {
setEnv(100 * 1024 * 1024, 10 * 1024);
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
registerTimeseries(tsFileWriter);
List<MeasurementSchema> writeMeasurementScheams = new ArrayList<>();
// example 1
writeMeasurementScheams.add(measurementSchemas.get(0));
writeMeasurementScheams.add(measurementSchemas.get(1));
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementScheams, 10000, 0, 0, false);
// example 2
writeMeasurementScheams.clear();
writeMeasurementScheams.add(measurementSchemas.get(2));
writeMeasurementScheams.add(measurementSchemas.get(0));
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementScheams, 10000, 10000, 100, false);
// example 3 : late data
writeMeasurementScheams.clear();
writeMeasurementScheams.add(measurementSchemas.get(2));
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementScheams, 10, 20000, 200000, false);
}
}
@Test
public void writeAlignedWithTsRecord() throws IOException, WriteProcessException {
setEnv(100 * 1024 * 1024, 10 * 1024);
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
registerAlignedTimeseries(tsFileWriter);
List<MeasurementSchema> writeMeasurementScheams = new ArrayList<>();
// example1
writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementScheams, 8, 0, 0, true);
// example2
writeMeasurementScheams.clear();
writeMeasurementScheams.add(alignedMeasurementSchemas.get(2));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementScheams, 20, 1000, 500, true);
// example3 : late data
writeMeasurementScheams.clear();
writeMeasurementScheams.add(alignedMeasurementSchemas.get(2));
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementScheams, 20, 300000, 50, true);
}
}
@Test
public void writeWithTablet() throws IOException, WriteProcessException {
setEnv(100 * 1024 * 1024, 10 * 1024);
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
registerTimeseries(tsFileWriter);
List<MeasurementSchema> writeMeasurementScheams = new ArrayList<>();
// example 1
writeMeasurementScheams.add(measurementSchemas.get(0));
writeMeasurementScheams.add(measurementSchemas.get(1));
TsFileGeneratorUtils.writeWithTablet(
tsFileWriter, deviceId, writeMeasurementScheams, 1000, 0, 0, false);
// example 2
writeMeasurementScheams.clear();
writeMeasurementScheams.add(measurementSchemas.get(2));
writeMeasurementScheams.add(measurementSchemas.get(1));
TsFileGeneratorUtils.writeWithTablet(
tsFileWriter, deviceId, writeMeasurementScheams, 1000, 2000, 0, false);
// example 3: late data
writeMeasurementScheams.clear();
writeMeasurementScheams.add(measurementSchemas.get(1));
TsFileGeneratorUtils.writeWithTablet(
tsFileWriter, deviceId, writeMeasurementScheams, 1000, 3111, 0, false);
}
}
@Test
public void writeAlignedWithTablet() throws IOException, WriteProcessException {
setEnv(100 * 1024 * 1024, 10 * 1024);
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
registerAlignedTimeseries(tsFileWriter);
List<MeasurementSchema> writeMeasurementScheams = new ArrayList<>();
// example 1
writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
TsFileGeneratorUtils.writeWithTablet(
tsFileWriter, deviceId, writeMeasurementScheams, 10, 0, 0, true);
// example 2
writeMeasurementScheams.clear();
writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
TsFileGeneratorUtils.writeWithTablet(
tsFileWriter, deviceId, writeMeasurementScheams, 200000, 10, 0, true);
// example 3
writeMeasurementScheams.clear();
writeMeasurementScheams.add(alignedMeasurementSchemas.get(2));
TsFileGeneratorUtils.writeWithTablet(
tsFileWriter, deviceId, writeMeasurementScheams, 10, 210000, 0, true);
}
}
@Test
public void writeNewAlignedMeasurementAfterFlushChunkGroup1() {
setEnv(100, 30);
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
registerAlignedTimeseries(tsFileWriter);
List<MeasurementSchema> writeMeasurementScheams = new ArrayList<>();
// example 1
writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
TsFileGeneratorUtils.writeWithTablet(
tsFileWriter, deviceId, writeMeasurementScheams, 100000, 0, 0, true);
// example 2
writeMeasurementScheams.clear();
writeMeasurementScheams.add(alignedMeasurementSchemas.get(2));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(3));
TsFileGeneratorUtils.writeWithTablet(
tsFileWriter, deviceId, writeMeasurementScheams, 20, 1000000, 0, true);
} catch (IOException | WriteProcessException e) {
Assert.assertEquals(
"TsFile has flushed chunk group and should not add new measurement s3 in device root.sg.d1",
e.getMessage());
}
}
@Test
public void writeNewAlignedMeasurementAfterFlushChunkGroup2() {
setEnv(100, 30);
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
registerAlignedTimeseries(tsFileWriter);
List<MeasurementSchema> writeMeasurementScheams = new ArrayList<>();
// example 1
writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
TsFileGeneratorUtils.writeWithTablet(
tsFileWriter, deviceId, writeMeasurementScheams, 100000, 0, 0, true);
// example 2
writeMeasurementScheams.clear();
writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(3));
TsFileGeneratorUtils.writeWithTablet(
tsFileWriter, deviceId, writeMeasurementScheams, 20, 1000000, 0, true);
} catch (IOException | WriteProcessException e) {
Assert.assertEquals(
"TsFile has flushed chunk group and should not add new measurement s4 in device root.sg.d1",
e.getMessage());
}
}
@Test
public void writeOutOfOrderAlignedData() throws IOException, WriteProcessException {
setEnv(100, 30);
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
registerAlignedTimeseries(tsFileWriter);
List<MeasurementSchema> writeMeasurementScheams = new ArrayList<>();
// example 1
writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
TsFileGeneratorUtils.writeWithTablet(
tsFileWriter, deviceId, writeMeasurementScheams, 1000, 0, 0, true);
// example 2
writeMeasurementScheams.clear();
writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
try {
TsFileGeneratorUtils.writeWithTablet(
tsFileWriter, deviceId, writeMeasurementScheams, 20, 100, 0, true);
Assert.fail("Expected to throw writeProcessException due to write out-of-order data.");
} catch (WriteProcessException e) {
Assert.assertEquals(
"Not allowed to write out-of-order data in timeseries root.sg.d1., time should later than 999",
e.getMessage());
}
// example 3
writeMeasurementScheams.clear();
writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
try {
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementScheams, 20, 100, 0, true);
Assert.fail("Expected to throw writeProcessException due to write out-of-order data.");
} catch (WriteProcessException e) {
Assert.assertEquals(
"Not allowed to write out-of-order data in timeseries root.sg.d1., time should later than 999",
e.getMessage());
}
}
}
@Test
public void writeOutOfOrderData() throws IOException, WriteProcessException {
setEnv(100, 30);
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
registerTimeseries(tsFileWriter);
List<MeasurementSchema> writeMeasurementScheams = new ArrayList<>();
// example 1
writeMeasurementScheams.add(measurementSchemas.get(0));
writeMeasurementScheams.add(measurementSchemas.get(1));
TsFileGeneratorUtils.writeWithTablet(
tsFileWriter, deviceId, writeMeasurementScheams, 1000, 0, 0, false);
// example 2
writeMeasurementScheams.clear();
writeMeasurementScheams.add(measurementSchemas.get(0));
try {
TsFileGeneratorUtils.writeWithTablet(
tsFileWriter, deviceId, writeMeasurementScheams, 20, 100, 0, false);
Assert.fail("Expected to throw writeProcessException due to write out-of-order data.");
} catch (WriteProcessException e) {
Assert.assertEquals(
"Not allowed to write out-of-order data in timeseries root.sg.d1.s1, time should later than 999",
e.getMessage());
}
// example 3
writeMeasurementScheams.clear();
writeMeasurementScheams.add(measurementSchemas.get(1));
try {
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementScheams, 20, 100, 0, false);
Assert.fail("Expected to throw writeProcessException due to write out-of-order data.");
} catch (WriteProcessException e) {
Assert.assertEquals(
"Not allowed to write out-of-order data in timeseries root.sg.d1.s2, time should later than 999",
e.getMessage());
}
}
}
@Test
public void writeNonAlignedWithTabletWithNullValue() {
setEnv(100, 30);
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
measurementSchemas.add(new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s2", TSDataType.TEXT, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s3", TSDataType.TEXT, TSEncoding.PLAIN));
// register nonAligned timeseries
tsFileWriter.registerTimeseries(new Path(deviceId), measurementSchemas);
Tablet tablet = new Tablet(deviceId, measurementSchemas);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
tablet.initBitMaps();
long sensorNum = measurementSchemas.size();
long startTime = 0;
for (long r = 0; r < 10000; r++) {
int row = tablet.rowSize++;
timestamps[row] = startTime++;
for (int i = 0; i < sensorNum; i++) {
if (i == 1 && r > 1000) {
tablet.bitMaps[i].mark((int) r % tablet.getMaxRowNumber());
continue;
}
Binary[] textSensor = (Binary[]) values[i];
textSensor[row] = new Binary("testString.........", TSFileConfig.STRING_CHARSET);
}
// write
if (tablet.rowSize == tablet.getMaxRowNumber()) {
tsFileWriter.write(tablet);
tablet.reset();
}
}
// write
if (tablet.rowSize != 0) {
tsFileWriter.write(tablet);
tablet.reset();
}
} catch (Throwable e) {
e.printStackTrace();
Assert.fail("Meet errors in test: " + e.getMessage());
}
}
@Test
public void writeAlignedWithTabletWithNullValue() {
setEnv(100, 30);
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
measurementSchemas.add(new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s2", TSDataType.TEXT, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s3", TSDataType.TEXT, TSEncoding.PLAIN));
// register aligned timeseries
tsFileWriter.registerAlignedTimeseries(new Path(deviceId), measurementSchemas);
Tablet tablet = new Tablet(deviceId, measurementSchemas);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
tablet.initBitMaps();
long sensorNum = measurementSchemas.size();
long startTime = 0;
for (long r = 0; r < 10000; r++) {
int row = tablet.rowSize++;
timestamps[row] = startTime++;
for (int i = 0; i < sensorNum; i++) {
if (i == 1 && r > 1000) {
tablet.bitMaps[i].mark((int) r % tablet.getMaxRowNumber());
continue;
}
Binary[] textSensor = (Binary[]) values[i];
textSensor[row] = new Binary("testString.........", TSFileConfig.STRING_CHARSET);
}
// write
if (tablet.rowSize == tablet.getMaxRowNumber()) {
tsFileWriter.writeAligned(tablet);
tablet.reset();
}
}
// write
if (tablet.rowSize != 0) {
tsFileWriter.writeAligned(tablet);
tablet.reset();
}
} catch (Throwable e) {
e.printStackTrace();
Assert.fail("Meet errors in test: " + e.getMessage());
}
}
/** Write an empty page and then write a nonEmpty page. */
@Test
public void writeAlignedTimeseriesWithEmptyPage() throws IOException, WriteProcessException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
registerAlignedTimeseries(tsFileWriter);
List<MeasurementSchema> writeMeasurementScheams = new ArrayList<>();
// example1
writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementScheams, 30, 0, 0, true);
// example2
writeMeasurementScheams.clear();
writeMeasurementScheams.add(alignedMeasurementSchemas.get(2));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementScheams, 30, 1000, 500, true);
// example3 : late data
writeMeasurementScheams.clear();
writeMeasurementScheams.add(alignedMeasurementSchemas.get(2));
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementScheams, 60, 300000, 50, true);
}
TsFileReader tsFileReader = new TsFileReader(new TsFileSequenceReader(f.getAbsolutePath()));
for (int i = 0; i < 3; i++) {
QueryExpression queryExpression =
QueryExpression.create(
Collections.singletonList(
new Path(deviceId, alignedMeasurementSchemas.get(i).getMeasurementId(), true)),
null);
QueryDataSet queryDataSet = tsFileReader.query(queryExpression);
int cnt = 0;
while (queryDataSet.hasNext()) {
cnt++;
queryDataSet.next();
}
if (i < 2) {
Assert.assertEquals(60, cnt);
} else {
Assert.assertEquals(90, cnt);
}
}
}
/** Write a nonEmpty page and then write an empty page. */
@Test
public void writeAlignedTimeseriesWithEmptyPage2() throws IOException, WriteProcessException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
registerAlignedTimeseries(tsFileWriter);
List<MeasurementSchema> writeMeasurementScheams = new ArrayList<>();
// example1
writeMeasurementScheams.add(alignedMeasurementSchemas.get(3));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(2));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementScheams, 30, 0, 0, true);
// example2
writeMeasurementScheams.clear();
writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementScheams, 30, 1000, 500, true);
}
TsFileReader tsFileReader = new TsFileReader(new TsFileSequenceReader(f.getAbsolutePath()));
for (int i = 0; i < 3; i++) {
QueryExpression queryExpression =
QueryExpression.create(
Collections.singletonList(
new Path(deviceId, alignedMeasurementSchemas.get(i).getMeasurementId(), true)),
null);
QueryDataSet queryDataSet = tsFileReader.query(queryExpression);
int cnt = 0;
while (queryDataSet.hasNext()) {
cnt++;
queryDataSet.next();
}
if (i < 2) {
Assert.assertEquals(60, cnt);
} else {
Assert.assertEquals(30, cnt);
}
}
}
/** Write a nonEmpty page and then write an empty page. */
@Test
public void writeAlignedTimeseriesWithEmptyPage3() throws IOException, WriteProcessException {
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
registerAlignedTimeseries(tsFileWriter);
List<IMeasurementSchema> writeMeasurementScheams = new ArrayList<>();
// example1
writeMeasurementScheams.add(alignedMeasurementSchemas.get(0));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(1));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(2));
writeMeasurementScheams.add(alignedMeasurementSchemas.get(3));
TsFileIOWriter tsFileIOWriter = tsFileWriter.getIOWriter();
tsFileIOWriter.startChunkGroup(new PlainDeviceID(deviceId));
AlignedChunkWriterImpl alignedChunkWriter =
new AlignedChunkWriterImpl(writeMeasurementScheams);
// write one nonEmpty page
for (long time = 0; time < 30; time++) {
for (int i = 0; i < 4; i++) {
alignedChunkWriter.getValueChunkWriterByIndex(i).write(time, time, false);
}
alignedChunkWriter.write(time);
}
alignedChunkWriter.sealCurrentPage();
// write a nonEmpty page of s0 and s1, an empty page of s2 and s3
for (long time = 30; time < 60; time++) {
for (int i = 0; i < 2; i++) {
alignedChunkWriter.getValueChunkWriterByIndex(i).write(time, time, false);
}
}
for (int i = 2; i < 4; i++) {
alignedChunkWriter.getValueChunkWriterByIndex(i).writeEmptyPageToPageBuffer();
}
for (long time = 30; time < 60; time++) {
alignedChunkWriter.write(time);
}
alignedChunkWriter.writeToFileWriter(tsFileIOWriter);
tsFileIOWriter.endChunkGroup();
}
// read file
TsFileReader tsFileReader = new TsFileReader(new TsFileSequenceReader(f.getAbsolutePath()));
for (int i = 0; i < 3; i++) {
QueryExpression queryExpression =
QueryExpression.create(
Collections.singletonList(
new Path(deviceId, alignedMeasurementSchemas.get(i).getMeasurementId(), true)),
null);
QueryDataSet queryDataSet = tsFileReader.query(queryExpression);
int cnt = 0;
while (queryDataSet.hasNext()) {
cnt++;
queryDataSet.next();
}
if (i < 2) {
Assert.assertEquals(60, cnt);
} else {
Assert.assertEquals(30, cnt);
}
}
}
@Test
public void writeTsFileByFlushingPageDirectly() throws IOException, WriteProcessException {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(30);
// create a tsfile with four pages in one timeseries
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
registerTimeseries(tsFileWriter);
List<MeasurementSchema> writeMeasurementSchemas = new ArrayList<>();
writeMeasurementSchemas.add(measurementSchemas.get(0));
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementSchemas, 30, 0, 0, false);
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementSchemas, 30, 30, 30, false);
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementSchemas, 30, 60, 60, false);
TsFileGeneratorUtils.writeWithTsRecord(
tsFileWriter, deviceId, writeMeasurementSchemas, 30, 90, 90, false);
}
ChunkWriterImpl chunkWriter = new ChunkWriterImpl(measurementSchemas.get(0));
// rewrite a new tsfile by flushing page directly
File file = FSFactoryProducer.getFSFactory().getFile("test.tsfile");
try (TsFileSequenceReader reader = new TsFileSequenceReader(f.getAbsolutePath());
TsFileIOWriter tsFileIOWriter = new TsFileIOWriter(file)) {
tsFileIOWriter.startChunkGroup(new PlainDeviceID(deviceId));
for (List<ChunkMetadata> chunkMetadatas :
reader.readChunkMetadataInDevice(new PlainDeviceID(deviceId)).values()) {
for (ChunkMetadata chunkMetadata : chunkMetadatas) {
Chunk chunk = reader.readMemChunk(chunkMetadata);
ByteBuffer chunkDataBuffer = chunk.getData();
ChunkHeader chunkHeader = chunk.getHeader();
int pageNum = 0;
while (chunkDataBuffer.remaining() > 0) {
// deserialize a PageHeader from chunkDataBuffer
PageHeader pageHeader;
if (((byte) (chunkHeader.getChunkType() & 0x3F))
== MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunk.getChunkStatistic());
} else {
pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, chunkHeader.getDataType());
}
// read compressed page data
int compressedPageBodyLength = pageHeader.getCompressedSize();
byte[] compressedPageBody = new byte[compressedPageBodyLength];
chunkDataBuffer.get(compressedPageBody);
chunkWriter.writePageHeaderAndDataIntoBuff(
ByteBuffer.wrap(compressedPageBody), pageHeader);
if (++pageNum % 2 == 0) {
chunkWriter.writeToFileWriter(tsFileIOWriter);
}
}
}
}
tsFileIOWriter.endChunkGroup();
tsFileIOWriter.endFile();
// read file
TsFileReader tsFileReader =
new TsFileReader(new TsFileSequenceReader(file.getAbsolutePath()));
QueryExpression queryExpression =
QueryExpression.create(
Collections.singletonList(
new Path(deviceId, measurementSchemas.get(0).getMeasurementId(), true)),
null);
QueryDataSet queryDataSet = tsFileReader.query(queryExpression);
int cnt = 0;
while (queryDataSet.hasNext()) {
cnt++;
// Assert.assertEquals(queryDataSet);
queryDataSet.next();
}
Assert.assertEquals(120, cnt);
} catch (Throwable throwable) {
if (file.exists()) {
file.delete();
}
throw throwable;
}
}
}