blob: 47aaea60dca0903709178399a2f9108d7afd81fe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.tsfile.write;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.constant.TestConstant;
import org.apache.iotdb.tsfile.file.IMetadataIndexEntry;
import org.apache.iotdb.tsfile.file.metadata.DeviceMetadataIndexEntry;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.MeasurementMetadataIndexEntry;
import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.MeasurementGroup;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.apache.iotdb.tsfile.utils.FileGenerator.generateIndexString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** test for MetadataIndexConstructor */
public class MetadataIndexConstructorTest {
private static final Logger logger = LoggerFactory.getLogger(MetadataIndexConstructorTest.class);
private final TSFileConfig conf = TSFileDescriptor.getInstance().getConfig();
private static final String FILE_PATH =
TestConstant.BASE_OUTPUT_PATH.concat("MetadataIndexConstructorTest.tsfile");
private static final String measurementPrefix = "sensor_";
private static final String vectorPrefix = "vector_";
private int maxDegreeOfIndexNode;
@Before
public void before() {
maxDegreeOfIndexNode = conf.getMaxDegreeOfIndexNode();
conf.setMaxDegreeOfIndexNode(10);
}
@After
public void after() {
conf.setMaxDegreeOfIndexNode(maxDegreeOfIndexNode);
File file = new File(FILE_PATH);
if (file.exists()) {
file.delete();
}
}
/** Example 1: 5 entities with 5 measurements each */
@Test
public void singleIndexTest1() {
int deviceNum = 5;
int measurementNum = 5;
IDeviceID[] devices = new IDeviceID[deviceNum];
int[][] vectorMeasurement = new int[deviceNum][];
String[][] singleMeasurement = new String[deviceNum][];
for (int i = 0; i < deviceNum; i++) {
devices[i] = new PlainDeviceID("d" + i);
vectorMeasurement[i] = new int[0];
singleMeasurement[i] = new String[measurementNum];
for (int j = 0; j < measurementNum; j++) {
singleMeasurement[i][j] = measurementPrefix + generateIndexString(j, measurementNum);
}
}
test(devices, vectorMeasurement, singleMeasurement);
}
/** Example 2: 1 entity with 150 measurements */
@Test
public void singleIndexTest2() {
int deviceNum = 1;
int measurementNum = 150;
IDeviceID[] devices = new IDeviceID[deviceNum];
int[][] vectorMeasurement = new int[deviceNum][];
String[][] singleMeasurement = new String[deviceNum][];
for (int i = 0; i < deviceNum; i++) {
devices[i] = new PlainDeviceID("d" + i);
vectorMeasurement[i] = new int[0];
singleMeasurement[i] = new String[measurementNum];
for (int j = 0; j < measurementNum; j++) {
singleMeasurement[i][j] = measurementPrefix + generateIndexString(j, measurementNum);
}
}
test(devices, vectorMeasurement, singleMeasurement);
}
/** Example 3: 150 entities with 1 measurement each */
@Test
public void singleIndexTest3() {
int deviceNum = 150;
int measurementNum = 1;
IDeviceID[] devices = new IDeviceID[deviceNum];
int[][] vectorMeasurement = new int[deviceNum][];
String[][] singleMeasurement = new String[deviceNum][];
for (int i = 0; i < deviceNum; i++) {
devices[i] = new PlainDeviceID("d" + generateIndexString(i, deviceNum));
vectorMeasurement[i] = new int[0];
singleMeasurement[i] = new String[measurementNum];
for (int j = 0; j < measurementNum; j++) {
singleMeasurement[i][j] = measurementPrefix + generateIndexString(j, measurementNum);
}
}
test(devices, vectorMeasurement, singleMeasurement);
}
/** Example 4: 150 entities with 150 measurements each */
@Test
public void singleIndexTest4() {
int deviceNum = 150;
int measurementNum = 1;
IDeviceID[] devices = new IDeviceID[deviceNum];
int[][] vectorMeasurement = new int[deviceNum][];
String[][] singleMeasurement = new String[deviceNum][];
for (int i = 0; i < deviceNum; i++) {
devices[i] = new PlainDeviceID("d" + generateIndexString(i, deviceNum));
vectorMeasurement[i] = new int[0];
singleMeasurement[i] = new String[measurementNum];
for (int j = 0; j < measurementNum; j++) {
singleMeasurement[i][j] = measurementPrefix + generateIndexString(j, measurementNum);
}
}
test(devices, vectorMeasurement, singleMeasurement);
}
/** Example 5: 1 entities with 1 vector containing 9 measurements */
@Test
public void vectorIndexTest() {
IDeviceID[] devices = {new PlainDeviceID("d0")};
int[][] vectorMeasurement = {{9}};
test(devices, vectorMeasurement, null);
}
/**
* Example 6: 2 entities, measurements of entities are shown in the following table
*
* <p>d0.s0~s4 | d0.z0~z3 | d1.v0.(s0~s3)
*/
@Test
public void compositeIndexTest() {
IDeviceID[] devices = {new PlainDeviceID("d0"), new PlainDeviceID("d1")};
int[][] vectorMeasurement = {{}, {4}};
String[][] singleMeasurement = {
{"s0", "s1", "s2", "s3", "s4", "z0", "z1", "z2", "z3"},
{}
};
test(devices, vectorMeasurement, singleMeasurement);
}
/**
* start test
*
* @param devices name and number of device
* @param vectorMeasurement the number of device and the number of values to include in the tablet
* @param singleMeasurement non-vector measurement name, set null if no need
*/
private void test(IDeviceID[] devices, int[][] vectorMeasurement, String[][] singleMeasurement) {
// 1. generate file
generateFile(devices, vectorMeasurement, singleMeasurement);
// 2. read metadata from file
List<IDeviceID> actualDevices = new ArrayList<>(); // contains all device by sequence
List<List<String>> actualMeasurements =
new ArrayList<>(); // contains all measurements group by device
readMetaDataDFS(actualDevices, actualMeasurements);
// 3. generate correct result
List<IDeviceID> correctDevices = new ArrayList<>(); // contains all device by sequence
List<List<String>> correctFirstMeasurements =
new ArrayList<>(); // contains first measurements of every leaf, group by device
List<String> correctPaths = new ArrayList<>(); // contains all paths by sequence
generateCorrectResult(
correctDevices,
correctFirstMeasurements,
correctPaths,
devices,
vectorMeasurement,
singleMeasurement);
// 4. compare correct result with TsFile's metadata
Arrays.sort(devices);
// 4.1 make sure device in order
assertEquals(correctDevices.size(), devices.length);
assertEquals(actualDevices.size(), correctDevices.size());
for (int i = 0; i < actualDevices.size(); i++) {
assertEquals(actualDevices.get(i), correctDevices.get(i));
}
// 4.2 make sure timeseries in order
try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
Iterator<Pair<IDeviceID, Boolean>> iterator = reader.getAllDevicesIteratorWithIsAligned();
while (iterator.hasNext()) {
for (IDeviceID correctDevice : correctDevices) {
assertEquals(correctDevice, iterator.next().left);
}
}
assertFalse(iterator.hasNext());
Map<IDeviceID, List<TimeseriesMetadata>> allTimeseriesMetadata =
reader.getAllTimeseriesMetadata(false);
for (int j = 0; j < actualDevices.size(); j++) {
for (int i = 0; i < actualMeasurements.get(j).size(); i++) {
assertEquals(
allTimeseriesMetadata.get(actualDevices.get(j)).get(i).getMeasurementId(),
correctFirstMeasurements.get(j).get(i));
}
}
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
// 4.3 make sure split leaf correctly
for (int j = 0; j < actualDevices.size(); j++) {
for (int i = 0; i < actualMeasurements.get(j).size(); i++) {
assertEquals(
actualMeasurements.get(j).get(i),
correctFirstMeasurements.get(j).get(i * conf.getMaxDegreeOfIndexNode()));
}
}
try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
Iterator<List<Path>> iterator = reader.getPathsIterator();
int idx = 0;
while (iterator.hasNext()) {
for (Path actualPath : iterator.next()) {
assertEquals(actualPath.getFullPath(), correctPaths.get(idx));
idx++;
}
}
assertEquals(correctPaths.size(), idx);
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
/**
* read TsFile metadata, load actual message in devices and measurements
*
* @param devices load actual devices
* @param measurements load actual measurement(first of every leaf)
*/
private void readMetaDataDFS(List<IDeviceID> devices, List<List<String>> measurements) {
try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
TsFileMetadata tsFileMetaData = reader.readFileMetadata();
MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
deviceDFS(devices, measurements, reader, metadataIndexNode);
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
/** DFS in device level load actual devices */
private void deviceDFS(
List<IDeviceID> devices,
List<List<String>> measurements,
TsFileSequenceReader reader,
MetadataIndexNode node) {
try {
assertTrue(
node.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)
|| node.getNodeType().equals(MetadataIndexNodeType.INTERNAL_DEVICE));
for (int i = 0; i < node.getChildren().size(); i++) {
IMetadataIndexEntry metadataIndexEntry = node.getChildren().get(i);
long endOffset = node.getEndOffset();
if (i != node.getChildren().size() - 1) {
endOffset = node.getChildren().get(i + 1).getOffset();
}
boolean currentChildLevelIsDevice =
MetadataIndexNodeType.INTERNAL_DEVICE.equals(node.getNodeType());
MetadataIndexNode subNode =
reader.readMetadataIndexNode(
metadataIndexEntry.getOffset(), endOffset, currentChildLevelIsDevice);
if (node.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
devices.add(((DeviceMetadataIndexEntry) metadataIndexEntry).getDeviceID());
measurements.add(new ArrayList<>());
measurementDFS(devices.size() - 1, measurements, reader, subNode);
} else if (node.getNodeType().equals(MetadataIndexNodeType.INTERNAL_DEVICE)) {
deviceDFS(devices, measurements, reader, subNode);
}
}
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
/** DFS in measurement level load actual measurements */
private void measurementDFS(
int deviceIndex,
List<List<String>> measurements,
TsFileSequenceReader reader,
MetadataIndexNode node) {
try {
assertTrue(
node.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)
|| node.getNodeType().equals(MetadataIndexNodeType.INTERNAL_MEASUREMENT));
for (int i = 0; i < node.getChildren().size(); i++) {
IMetadataIndexEntry metadataIndexEntry = node.getChildren().get(i);
long endOffset = node.getEndOffset();
if (i != node.getChildren().size() - 1) {
endOffset = node.getChildren().get(i + 1).getOffset();
}
if (node.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
measurements
.get(deviceIndex)
.add(((MeasurementMetadataIndexEntry) metadataIndexEntry).getName());
} else if (node.getNodeType().equals(MetadataIndexNodeType.INTERNAL_MEASUREMENT)) {
MetadataIndexNode subNode =
reader.readMetadataIndexNode(metadataIndexEntry.getOffset(), endOffset, false);
measurementDFS(deviceIndex, measurements, reader, subNode);
}
}
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
/**
* generate correct devices and measurements for test Note that if the metadata index tree is
* re-designed, you may need to modify this function as well.
*
* @param correctDevices output
* @param correctMeasurements output
* @param devices input
* @param vectorMeasurement input
* @param singleMeasurement input
*/
private void generateCorrectResult(
List<IDeviceID> correctDevices,
List<List<String>> correctMeasurements,
List<String> correctPaths,
IDeviceID[] devices,
int[][] vectorMeasurement,
String[][] singleMeasurement) {
for (int i = 0; i < devices.length; i++) {
IDeviceID device = devices[i];
correctDevices.add(device);
// generate measurement and sort
List<String> measurements = new ArrayList<>();
// single-variable measurement
if (singleMeasurement != null) {
for (String measurement : singleMeasurement[i]) {
measurements.add(measurement);
correctPaths.add(new Path(device, measurement, true).getFullPath());
}
}
// multi-variable measurement
for (int vectorIndex = 0; vectorIndex < vectorMeasurement[i].length; vectorIndex++) {
measurements.add("");
correctPaths.add(new Path(device, "", true).getFullPath());
int measurementNum = vectorMeasurement[i][vectorIndex];
for (int measurementIndex = 0; measurementIndex < measurementNum; measurementIndex++) {
String measurementName =
measurementPrefix + generateIndexString(measurementIndex, measurementNum);
measurements.add(TsFileConstant.PATH_SEPARATOR + measurementName);
correctPaths.add(new Path(device, measurementName, true).getFullPath());
}
}
Collections.sort(measurements);
correctMeasurements.add(measurements);
}
Collections.sort(correctDevices);
}
/**
* @param devices name and number of device
* @param vectorMeasurement the number of device and the number of values to include in the tablet
* @param singleMeasurement non-vector measurement name, set null if no need
*/
private void generateFile(
IDeviceID[] devices, int[][] vectorMeasurement, String[][] singleMeasurement) {
File f = FSFactoryProducer.getFSFactory().getFile(FILE_PATH);
if (f.exists() && !f.delete()) {
fail("can not delete " + f.getAbsolutePath());
}
Schema schema = new Schema();
try (TsFileWriter tsFileWriter = new TsFileWriter(f, schema)) {
// write single-variable timeseries
if (singleMeasurement != null) {
for (int i = 0; i < singleMeasurement.length; i++) {
IDeviceID device = devices[i];
for (String measurement : singleMeasurement[i]) {
tsFileWriter.registerTimeseries(
new Path(device),
new MeasurementSchema(measurement, TSDataType.INT64, TSEncoding.RLE));
}
// the number of record rows
int rowNum = 10;
for (int row = 0; row < rowNum; row++) {
TSRecord tsRecord = new TSRecord(row, ((PlainDeviceID) device).toStringID());
for (String measurement : singleMeasurement[i]) {
DataPoint dPoint = new LongDataPoint(measurement, row);
tsRecord.addTuple(dPoint);
}
if (tsRecord.dataPointList.size() > 0) {
tsFileWriter.write(tsRecord);
}
}
}
}
// write multi-variable timeseries
for (int i = 0; i < devices.length; i++) {
IDeviceID device = devices[i];
logger.info("generating device {}...", device);
// the number of rows to include in the tablet
int rowNum = 10;
for (int vectorIndex = 0; vectorIndex < vectorMeasurement[i].length; vectorIndex++) {
String vectorName =
vectorPrefix + generateIndexString(vectorIndex, vectorMeasurement.length);
logger.info("generating vector {}...", vectorName);
int measurementNum = vectorMeasurement[i][vectorIndex];
List<MeasurementSchema> schemas = new ArrayList<>();
List<MeasurementSchema> tabletSchema = new ArrayList<>();
for (int measurementIndex = 0; measurementIndex < measurementNum; measurementIndex++) {
String measurementName =
measurementPrefix + generateIndexString(measurementIndex, measurementNum);
logger.info("generating vector measurement {}...", measurementName);
// add measurements into file schema (all with INT64 data type)
MeasurementSchema schema1 =
new MeasurementSchema(measurementName, TSDataType.INT64, TSEncoding.RLE);
schemas.add(schema1);
tabletSchema.add(schema1);
}
MeasurementGroup group = new MeasurementGroup(true, schemas);
schema.registerMeasurementGroup(new Path(device), group);
// add measurements into TSFileWriter
// construct the tablet
Tablet tablet = new Tablet(((PlainDeviceID) device).toStringID(), tabletSchema);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
long timestamp = 1;
long value = 1000000L;
for (int r = 0; r < rowNum; r++, value++) {
int row = tablet.rowSize++;
timestamps[row] = timestamp++;
for (int j = 0; j < measurementNum; j++) {
long[] sensor = (long[]) values[j];
sensor[row] = value;
}
// write Tablet to TsFile
if (tablet.rowSize == tablet.getMaxRowNumber()) {
tsFileWriter.writeAligned(tablet);
tablet.reset();
}
}
// write Tablet to TsFile
if (tablet.rowSize != 0) {
tsFileWriter.writeAligned(tablet);
tablet.reset();
}
}
}
} catch (Exception e) {
logger.error("meet error in TsFileWrite with tablet", e);
fail(e.getMessage());
}
}
}