blob: 9f9e5ccfc0f70872efbe3a4795ae4a881013e4a8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.write;
import org.apache.iotdb.tsfile.exception.encoding.TsFileEncodingException;
import org.apache.iotdb.tsfile.exception.write.NoMeasurementException;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.QueryExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import static org.junit.Assert.*;
public class TsFileWriterTest {
TsFileWriter writer = null;
long fileName = System.nanoTime();
boolean closed = false;
@Before
public void setUp() {
try {
writer = new TsFileWriter(new File("target/tsfileWriter-" + fileName));
addMeasurement();
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@After
public void tearDown() {
if (!closed) {
closeFile();
}
try {
Files.deleteIfExists(new File("target/tsfileWriter-" + fileName).toPath());
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
private void addMeasurement() {
try {
// String measurementId, TSDataType type, TSEncoding encoding,
// CompressionType compressionType
writer.registerTimeseries(
new Path("d1", "s1"),
new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY));
} catch (WriteProcessException e) {
e.printStackTrace();
fail(e.getMessage());
}
try {
// String measurementId, TSDataType type, TSEncoding encoding,
// CompressionType compressionType
writer.registerTimeseries(
new Path("d1", "s1"),
new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY));
} catch (WriteProcessException e) {
Assert.assertEquals("given timeseries has exists! d1.s1", e.getMessage());
}
try {
// String measurementId, TSDataType type, TSEncoding encoding,
// CompressionType compressionType
writer.registerTimeseries(
new Path("d1", "s2"),
new MeasurementSchema("s2", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY));
} catch (WriteProcessException e) {
e.printStackTrace();
fail(e.getMessage());
}
try {
for (int i = 2; i < 3; i++) {
writer.registerTimeseries(
new Path("d" + i, "s1"),
new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY));
}
} catch (WriteProcessException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void addMeasurementTest() {
closeFile();
readNothing();
}
private void writeTSRecord() throws IOException, WriteProcessException {
// normal
TSRecord record = new TSRecord(10000, "d1");
record.addTuple(new FloatDataPoint("s1", 5.0f));
record.addTuple(new IntDataPoint("s2", 5));
writer.write(record);
// not existed time series
record = new TSRecord(10001, "d1");
record.addTuple(new FloatDataPoint("s3", 5.0f));
try {
writer.write(record);
} catch (WriteProcessException e) {
assertTrue(e instanceof NoMeasurementException);
}
}
@Test
public void writeTSRecordTest() throws IOException, WriteProcessException {
writeTSRecord();
closeFile();
readOneRow();
}
@Test
public void writeIncorrectTSRecord0() throws IOException, WriteProcessException {
// incorrect data type
TSRecord record = new TSRecord(10002, "d2");
record.addTuple(new IntDataPoint("s1", 5));
try {
writer.write(record);
} catch (TsFileEncodingException e) {
// do nothing
}
closeFile();
readNothing();
}
@Test
public void writeIncorrectTSRecords() throws IOException, WriteProcessException {
// incorrect data type
for (int i = 2; i < 3; i++) {
TSRecord record = new TSRecord(10000 + i, "d" + i);
record.addTuple(new IntDataPoint("s1", 5));
try {
writer.write(record);
} catch (TsFileEncodingException e) {
// do nothing
}
}
closeFile();
readNothing();
}
@Test
public void writeIncorrectTSRecord() throws IOException, WriteProcessException {
writeTSRecord();
// incorrect data type
TSRecord record = new TSRecord(10002, "d2");
record.addTuple(new IntDataPoint("s1", 5));
try {
writer.write(record);
} catch (TsFileEncodingException e) {
// do nothing
}
closeFile();
readOneRow();
}
@Test
public void writeTablet() throws IOException, WriteProcessException {
Tablet tablet =
new Tablet(
"d1",
Arrays.asList(
new MeasurementSchema(
"s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY),
new MeasurementSchema(
"s2", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY)));
tablet.timestamps[0] = 10000;
((float[]) tablet.values[0])[0] = 5.0f;
((int[]) tablet.values[1])[0] = 5;
tablet.rowSize = 1;
writer.write(tablet);
closeFile();
readOneRow();
}
@Test
public void writeTablet2() throws IOException, WriteProcessException {
Tablet tablet =
new Tablet(
"d1",
Arrays.asList(
new MeasurementSchema(
"s1", TSDataType.FLOAT, TSEncoding.RLE, CompressionType.SNAPPY),
new MeasurementSchema(
"s2", TSDataType.INT32, TSEncoding.RLE, CompressionType.SNAPPY)));
tablet.timestamps[0] = 10000;
((float[]) tablet.values[0])[0] = 5.0f;
tablet.rowSize = 1;
writer.write(tablet);
closeFile();
// in this case, the value of s2 = 0 at time 10000.
readOneRow(0);
}
@Test
public void getIOWriter() {
// The interface is just for test
writer.getIOWriter();
closeFile();
readNothing();
}
@Test
public void flushForTest() throws IOException {
// The interface is just for test
writer.flushAllChunkGroups();
closeFile();
readNothing();
}
@Test
public void flushForTestWithVersion() throws IOException {
// The interface is just for test
writer.flushAllChunkGroups();
closeFile();
readNothing();
}
private void closeFile() {
try {
closed = true;
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void readNothing() {
// using TsFileReader for test
try {
ReadOnlyTsFile readOnlyTsFile =
new ReadOnlyTsFile(new TsFileSequenceReader("target/tsfileWriter-" + fileName));
QueryDataSet dataSet =
readOnlyTsFile.query(
QueryExpression.create()
.addSelectedPath(new Path("d1", "s1"))
.addSelectedPath(new Path("d1", "s2")));
assertFalse(dataSet.hasNext());
readOnlyTsFile.close();
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
private void readOneRow() {
readOneRow(5);
}
private void readOneRow(int s2Value) {
try {
ReadOnlyTsFile readOnlyTsFile =
new ReadOnlyTsFile(new TsFileSequenceReader("target/tsfileWriter-" + fileName));
QueryDataSet dataSet =
readOnlyTsFile.query(
QueryExpression.create()
.addSelectedPath(new Path("d1", "s1"))
.addSelectedPath(new Path("d1", "s2"))
.addSelectedPath(new Path("d1", "s3")));
while (dataSet.hasNext()) {
RowRecord result = dataSet.next();
assertEquals(2, result.getFields().size());
assertEquals(10000, result.getTimestamp());
assertEquals(5.0f, result.getFields().get(0).getFloatV(), 0.00001);
assertEquals(s2Value, result.getFields().get(1).getIntV());
}
readOnlyTsFile.close();
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}