blob: 562d5a0e193ffac5b25dc16153e4ad79dc2214be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.write;
import org.apache.iotdb.tsfile.exception.encoding.TsFileEncodingException;
import org.apache.iotdb.tsfile.exception.write.NoMeasurementException;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TsFileReader;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorForTest;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TsFileWriterTest {
TsFileWriter writer = null;
String fileName = TsFileGeneratorForTest.getTestTsFilePath("root.sg1", 0, 0, 1);
boolean closed = false;
@Before
public void setUp() {
try {
File f = new File(fileName);
if (!f.getParentFile().exists()) {
Assert.assertTrue(f.getParentFile().mkdirs());
}
writer = new TsFileWriter(f);
registerTimeseries();
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@After
public void tearDown() {
if (!closed) {
closeFile();
}
try {
Files.deleteIfExists(new File(fileName).toPath());
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
private void registerTimeseries() {
// register nonAligned timeseries "d1.s1","d1.s2","d1.s3"
try {
writer.registerTimeseries(
new Path("d1"),
new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY));
} catch (WriteProcessException e) {
e.printStackTrace();
fail(e.getMessage());
}
try {
writer.registerTimeseries(
new Path("d1"),
new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY));
} catch (WriteProcessException e) {
Assert.assertEquals("given nonAligned timeseries d1.s1 has been registered.", e.getMessage());
}
try {
List<MeasurementSchema> schemas = new ArrayList<>();
schemas.add(
new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY));
writer.registerAlignedTimeseries(new Path("d1"), schemas);
} catch (WriteProcessException e) {
Assert.assertEquals(
"given device d1 has been registered for nonAligned timeseries.", e.getMessage());
}
List<MeasurementSchema> schemas = new ArrayList<>();
schemas.add(
new MeasurementSchema("s2", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY));
schemas.add(
new MeasurementSchema("s3", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY));
writer.registerTimeseries(new Path("d1"), schemas);
// Register aligned timeseries "d2.s1" , "d2.s2", "d2.s3"
try {
List<MeasurementSchema> measurementSchemas = new ArrayList<>();
measurementSchemas.add(new MeasurementSchema("s1", TSDataType.TEXT, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s2", TSDataType.TEXT, TSEncoding.PLAIN));
measurementSchemas.add(new MeasurementSchema("s3", TSDataType.TEXT, TSEncoding.PLAIN));
writer.registerAlignedTimeseries(new Path("d2"), measurementSchemas);
} catch (WriteProcessException e) {
e.printStackTrace();
fail(e.getMessage());
}
try {
List<MeasurementSchema> measurementSchemas = new ArrayList<>();
measurementSchemas.add(new MeasurementSchema("s4", TSDataType.TEXT, TSEncoding.PLAIN));
writer.registerAlignedTimeseries(new Path("d2"), measurementSchemas);
} catch (WriteProcessException e) {
Assert.assertEquals(
"given device d2 has been registered for aligned timeseries and should not be expanded.",
e.getMessage());
}
try {
writer.registerTimeseries(
new Path("d2"),
new MeasurementSchema("s5", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY));
} catch (WriteProcessException e) {
Assert.assertEquals(
"given device d2 has been registered for aligned timeseries.", e.getMessage());
}
/*try {
for (int i = 2; i < 3; i++) {
writer.registerTimeseries(
new Path("d" + i, "s1"),
new MeasurementSchema(
"s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY));
}
} catch (WriteProcessException e) {
e.printStackTrace();
fail(e.getMessage());
}*/
}
@Test
public void registerTimeseriesTest() {
closeFile();
readNothing();
}
private void writeTSRecord() throws IOException, WriteProcessException {
// normal
TSRecord record = new TSRecord(10000, "d1");
record.addTuple(new FloatDataPoint("s1", 5.0f));
record.addTuple(new IntDataPoint("s2", 5));
writer.write(record);
// not existed time series
record = new TSRecord(10001, "d1");
record.addTuple(new FloatDataPoint("s4", 5));
try {
writer.write(record);
} catch (WriteProcessException e) {
assertTrue(e instanceof NoMeasurementException);
}
// not existed time series
record = new TSRecord(10001, "d1");
record.addTuple(new FloatDataPoint("s3", 5));
try {
writer.write(record);
} catch (TsFileEncodingException e) {
// do nothing
}
}
@Test
public void writeTSRecordTest() throws IOException, WriteProcessException {
writeTSRecord();
closeFile();
readOneRow();
}
@Test
public void writeIncorrectTSRecord0() throws IOException {
// incorrect data type
TSRecord record = new TSRecord(10002, "d2");
record.addTuple(new IntDataPoint("s1", 5));
try {
writer.write(record);
} catch (WriteProcessException e) {
Assert.assertEquals("no nonAligned timeseries is registered in the group.", e.getMessage());
}
closeFile();
readNothing();
}
@Test
public void writeIncorrectTSRecords() throws IOException, WriteProcessException {
// incorrect data type
for (int i = 2; i < 3; i++) {
TSRecord record = new TSRecord(10000 + i, "d" + i);
record.addTuple(new IntDataPoint("s1", 5));
try {
writer.writeAligned(record);
} catch (UnsupportedOperationException e) {
// do nothing
}
}
closeFile();
readNothing();
}
@Test
public void writeIncorrectTSRecord() throws IOException, WriteProcessException {
writeTSRecord();
// incorrect data type
TSRecord record = new TSRecord(10002, "d2");
record.addTuple(new IntDataPoint("s1", 5));
try {
writer.writeAligned(record);
} catch (UnsupportedOperationException e) {
// do nothing
}
closeFile();
readOneRow();
}
@Test
public void writeTablet() throws IOException, WriteProcessException {
Tablet tablet =
new Tablet(
"d1",
Arrays.asList(
new MeasurementSchema(
"s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY),
new MeasurementSchema(
"s2", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY)));
tablet.timestamps[0] = 10000;
((float[]) tablet.values[0])[0] = 5.0f;
((int[]) tablet.values[1])[0] = 5;
tablet.rowSize = 1;
writer.write(tablet);
closeFile();
readOneRow();
}
@Test
public void writeTablet2() throws IOException, WriteProcessException {
Tablet tablet =
new Tablet(
"d1",
Arrays.asList(
new MeasurementSchema(
"s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY),
new MeasurementSchema(
"s2", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY)));
tablet.timestamps[0] = 10000;
((float[]) tablet.values[0])[0] = 5.0f;
tablet.rowSize = 1;
writer.write(tablet);
closeFile();
// in this case, the value of s2 = 0 at time 10000.
readOneRow(0);
}
@Test
public void getIOWriter() {
// The interface is just for test
writer.getIOWriter();
closeFile();
readNothing();
}
@Test
public void flushForTest() throws IOException {
// The interface is just for test
writer.flushAllChunkGroups();
closeFile();
readNothing();
}
@Test
public void flushForTestWithVersion() throws IOException {
// The interface is just for test
writer.flushAllChunkGroups();
closeFile();
readNothing();
}
private void closeFile() {
try {
closed = true;
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void readNothing() {
// using TsFileReader for test
try {
TsFileReader tsFileReader = new TsFileReader(new TsFileSequenceReader(fileName));
QueryDataSet dataSet =
tsFileReader.query(
QueryExpression.create()
.addSelectedPath(new Path("d1", "s1", true))
.addSelectedPath(new Path("d1", "s2", true)));
assertFalse(dataSet.hasNext());
tsFileReader.close();
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
private void readOneRow() {
readOneRow(5);
}
private void readOneRow(int s2Value) {
try {
TsFileReader tsFileReader = new TsFileReader(new TsFileSequenceReader(fileName));
QueryDataSet dataSet =
tsFileReader.query(
QueryExpression.create()
.addSelectedPath(new Path("d1", "s1", true))
.addSelectedPath(new Path("d1", "s2", true))
.addSelectedPath(new Path("d1", "s3", true)));
while (dataSet.hasNext()) {
RowRecord result = dataSet.next();
assertEquals(2, result.getFields().size());
assertEquals(10000, result.getTimestamp());
assertEquals(5.0f, result.getFields().get(0).getFloatV(), 0.00001);
assertEquals(s2Value, result.getFields().get(1).getIntV());
}
tsFileReader.close();
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}