blob: 1d30d621346ba31697b6a2ede11f773af0c407d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.engine.memtable;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.utils.MathUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
public class PrimitiveMemTableTest {
double delta;
@Before
public void setUp() {
delta = Math.pow(0.1, TSFileDescriptor.getInstance().getConfig().getFloatPrecision());
}
@Test
public void memSeriesSortIteratorTest() throws IOException {
TSDataType dataType = TSDataType.INT32;
WritableMemChunk series =
new WritableMemChunk(
new MeasurementSchema("s1", dataType, TSEncoding.PLAIN), TVList.newList(dataType));
int count = 1000;
for (int i = 0; i < count; i++) {
series.write(i, i);
}
IPointReader it = series.getSortedTvListForQuery().getIterator();
int i = 0;
while (it.hasNextTimeValuePair()) {
Assert.assertEquals(i, it.nextTimeValuePair().getTimestamp());
i++;
}
Assert.assertEquals(count, i);
}
@Test
public void simpleTest() throws IOException, QueryProcessException, MetadataException {
IMemTable memTable = new PrimitiveMemTable();
int count = 10;
String deviceId = "d1";
String[] measurementId = new String[count];
for (int i = 0; i < measurementId.length; i++) {
measurementId[i] = "s" + i;
}
int dataSize = 10000;
for (int i = 0; i < dataSize; i++) {
memTable.write(
deviceId,
new MeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN),
dataSize - i - 1,
i + 10);
}
for (int i = 0; i < dataSize; i++) {
memTable.write(
deviceId,
new MeasurementSchema(measurementId[0], TSDataType.INT32, TSEncoding.PLAIN),
i,
i);
}
ReadOnlyMemChunk memChunk =
memTable.query(
deviceId,
measurementId[0],
new MeasurementSchema(
measurementId[0],
TSDataType.INT32,
TSEncoding.RLE,
CompressionType.UNCOMPRESSED,
Collections.emptyMap()),
Long.MIN_VALUE,
null);
IPointReader iterator = memChunk.getPointReader();
for (int i = 0; i < dataSize; i++) {
iterator.hasNextTimeValuePair();
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
Assert.assertEquals(i, timeValuePair.getTimestamp());
Assert.assertEquals(i, timeValuePair.getValue().getValue());
}
}
private void write(
IMemTable memTable,
String deviceId,
String sensorId,
TSDataType dataType,
TSEncoding encoding,
int size)
throws IOException, QueryProcessException, MetadataException {
TimeValuePair[] ret = genTimeValuePair(size, dataType);
for (TimeValuePair aRet : ret) {
memTable.write(
deviceId,
new MeasurementSchema(sensorId, dataType, encoding),
aRet.getTimestamp(),
aRet.getValue().getValue());
}
IPointReader tvPair =
memTable
.query(
deviceId,
sensorId,
new MeasurementSchema(
sensorId,
dataType,
encoding,
CompressionType.UNCOMPRESSED,
Collections.emptyMap()),
Long.MIN_VALUE,
null)
.getPointReader();
Arrays.sort(ret);
TimeValuePair last = null;
for (int i = 0; i < ret.length; i++) {
while (last != null && (i < ret.length && last.getTimestamp() == ret[i].getTimestamp())) {
i++;
}
if (i >= ret.length) {
break;
}
TimeValuePair pair = ret[i];
last = pair;
tvPair.hasNextTimeValuePair();
TimeValuePair next = tvPair.nextTimeValuePair();
Assert.assertEquals(pair.getTimestamp(), next.getTimestamp());
if (dataType == TSDataType.DOUBLE) {
Assert.assertEquals(
pair.getValue().getDouble(),
MathUtils.roundWithGivenPrecision(next.getValue().getDouble()),
delta);
} else if (dataType == TSDataType.FLOAT) {
float expected = pair.getValue().getFloat();
float actual = MathUtils.roundWithGivenPrecision(next.getValue().getFloat());
Assert.assertEquals(expected, actual, delta + Float.MIN_NORMAL);
} else {
Assert.assertEquals(pair.getValue(), next.getValue());
}
}
}
private void writeVector(IMemTable memTable)
throws IOException, QueryProcessException, MetadataException {
memTable.write(genInsertTablePlan(), 0, 100);
IPointReader tvPair =
memTable
.query(
"root.sg.device5",
"sensor1",
new VectorMeasurementSchema(
"$#$0",
new String[] {"sensor1"},
new TSDataType[] {TSDataType.INT64},
new TSEncoding[] {TSEncoding.GORILLA},
CompressionType.UNCOMPRESSED),
Long.MIN_VALUE,
null)
.getPointReader();
for (int i = 0; i < 100; i++) {
tvPair.hasNextTimeValuePair();
TimeValuePair next = tvPair.nextTimeValuePair();
Assert.assertEquals(i, next.getTimestamp());
Assert.assertEquals(i, next.getValue().getLong());
}
tvPair =
memTable
.query(
"root.sg.device5",
"$#$1",
new VectorMeasurementSchema(
"$#$0",
new String[] {"sensor0", "sensor1"},
new TSDataType[] {TSDataType.BOOLEAN, TSDataType.INT64},
new TSEncoding[] {TSEncoding.PLAIN, TSEncoding.GORILLA},
CompressionType.UNCOMPRESSED),
Long.MIN_VALUE,
null)
.getPointReader();
for (int i = 0; i < 100; i++) {
tvPair.hasNextTimeValuePair();
TimeValuePair next = tvPair.nextTimeValuePair();
Assert.assertEquals(i, next.getTimestamp());
Assert.assertEquals(i, next.getValue().getVector()[1].getLong());
}
}
@Test
public void testFloatType() throws IOException, QueryProcessException, MetadataException {
IMemTable memTable = new PrimitiveMemTable();
String deviceId = "d1";
int size = 100;
write(memTable, deviceId, "s1", TSDataType.FLOAT, TSEncoding.RLE, size);
}
@Test
public void testAllType() throws IOException, QueryProcessException, MetadataException {
IMemTable memTable = new PrimitiveMemTable();
int count = 10;
String deviceId = "d1";
String[] measurementId = new String[count];
for (int i = 0; i < measurementId.length; i++) {
measurementId[i] = "s" + i;
}
int index = 0;
int size = 10000;
write(memTable, deviceId, measurementId[index++], TSDataType.BOOLEAN, TSEncoding.RLE, size);
write(memTable, deviceId, measurementId[index++], TSDataType.INT32, TSEncoding.RLE, size);
write(memTable, deviceId, measurementId[index++], TSDataType.INT64, TSEncoding.RLE, size);
write(memTable, deviceId, measurementId[index++], TSDataType.FLOAT, TSEncoding.RLE, size);
write(memTable, deviceId, measurementId[index++], TSDataType.DOUBLE, TSEncoding.RLE, size);
write(memTable, deviceId, measurementId[index++], TSDataType.TEXT, TSEncoding.PLAIN, size);
writeVector(memTable);
}
private TimeValuePair[] genTimeValuePair(int size, TSDataType dataType) {
TimeValuePair[] ret = new TimeValuePair[size];
Random rand = new Random();
for (int i = 0; i < size; i++) {
switch (dataType) {
case BOOLEAN:
ret[i] = new TimeValuePair(rand.nextLong(), TsPrimitiveType.getByType(dataType, true));
break;
case INT32:
ret[i] =
new TimeValuePair(
rand.nextLong(), TsPrimitiveType.getByType(dataType, rand.nextInt()));
break;
case INT64:
ret[i] =
new TimeValuePair(
rand.nextLong(), TsPrimitiveType.getByType(dataType, rand.nextLong()));
break;
case FLOAT:
ret[i] =
new TimeValuePair(
rand.nextLong(), TsPrimitiveType.getByType(dataType, rand.nextFloat()));
break;
case DOUBLE:
ret[i] =
new TimeValuePair(
rand.nextLong(), TsPrimitiveType.getByType(dataType, rand.nextDouble()));
break;
case TEXT:
ret[i] =
new TimeValuePair(
rand.nextLong(),
TsPrimitiveType.getByType(dataType, new Binary("a" + rand.nextDouble())));
break;
default:
throw new UnSupportedDataTypeException("Unsupported data type:" + dataType);
}
}
return ret;
}
private InsertTabletPlan genInsertTablePlan() throws IllegalPathException {
String[] measurements = new String[2];
measurements[0] = "sensor0";
measurements[1] = "sensor1";
List<Integer> dataTypesList = new ArrayList<>();
TSDataType[] dataTypes = new TSDataType[2];
dataTypesList.add(TSDataType.BOOLEAN.ordinal());
dataTypesList.add(TSDataType.INT64.ordinal());
dataTypes[0] = TSDataType.BOOLEAN;
dataTypes[1] = TSDataType.INT64;
TSEncoding[] encodings = new TSEncoding[2];
encodings[0] = TSEncoding.PLAIN;
encodings[1] = TSEncoding.GORILLA;
String deviceId = "root.sg.device5";
IMeasurementMNode[] mNodes = new IMeasurementMNode[2];
IMeasurementSchema schema =
new VectorMeasurementSchema("$#$0", measurements, dataTypes, encodings);
mNodes[0] = new MeasurementMNode(null, "sensor0", schema, null);
mNodes[1] = new MeasurementMNode(null, "sensor1", schema, null);
InsertTabletPlan insertTabletPlan =
new InsertTabletPlan(
new PartialPath(deviceId), new String[] {"(sensor0,sensor1)"}, dataTypesList);
long[] times = new long[100];
Object[] columns = new Object[2];
columns[0] = new boolean[100];
columns[1] = new long[100];
for (long r = 0; r < 100; r++) {
times[(int) r] = r;
((boolean[]) columns[0])[(int) r] = false;
((long[]) columns[1])[(int) r] = r;
}
insertTabletPlan.setTimes(times);
insertTabletPlan.setColumns(columns);
insertTabletPlan.setRowCount(times.length);
insertTabletPlan.setMeasurementMNodes(mNodes);
insertTabletPlan.setStart(0);
insertTabletPlan.setEnd(100);
insertTabletPlan.setAligned(true);
return insertTabletPlan;
}
}