blob: de1662f303b525727b0514569b5275201463eb66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.wal.recover.file;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.TsFileUtilsForRecoverTest;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.IDeviceID;
import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.BooleanDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.StringDataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TsFilePlanRedoerTest {
private static final String SG_NAME = "root.recover_sg";
private static final IDeviceID DEVICE1_NAME = new PlainDeviceID(SG_NAME.concat(".d1"));
private static final IDeviceID DEVICE2_NAME = new PlainDeviceID(SG_NAME.concat(".d2"));
private static final IDeviceID DEVICE3_NAME = new PlainDeviceID(SG_NAME.concat(".d3"));
private static final String FILE_NAME =
TsFileUtilsForRecoverTest.getTestTsFilePath(SG_NAME, 0, 0, 1);
private TsFileResource tsFileResource;
private CompressionType compressionType;
boolean prevIsAutoCreateSchemaEnabled;
boolean prevIsEnablePartialInsert;
@Before
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
// set recover config, avoid creating deleted time series when recovering wal
prevIsAutoCreateSchemaEnabled =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
// IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
prevIsEnablePartialInsert = IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert();
IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(true);
compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
}
@After
public void tearDown() throws Exception {
if (tsFileResource != null) {
tsFileResource.close();
}
File modsFile = new File(FILE_NAME.concat(ModificationFile.FILE_SUFFIX));
if (modsFile.exists()) {
modsFile.delete();
}
EnvironmentUtils.cleanEnv();
// reset config
// IoTDBDescriptor.getInstance()
// .getConfig()
// .setAutoCreateSchemaEnabled(prevIsAutoCreateSchemaEnabled);
IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(prevIsEnablePartialInsert);
}
@Test
public void testRedoInsertRowPlan() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE1_NAME, 1);
tsFileResource.updateEndTime(DEVICE1_NAME, 2);
tsFileResource.updateStartTime(DEVICE2_NAME, 3);
tsFileResource.updateEndTime(DEVICE2_NAME, 4);
// generate InsertRowPlan
long time = 5;
TSDataType[] dataTypes = new TSDataType[] {TSDataType.FLOAT, TSDataType.DOUBLE};
Object[] columns = new Object[] {1f, 1.0d};
InsertRowNode insertRowNode =
new InsertRowNode(
new PlanNodeId("0"),
new PartialPath(DEVICE2_NAME),
false,
new String[] {"s1", "s2"},
dataTypes,
time,
columns,
false);
insertRowNode.setMeasurementSchemas(
new MeasurementSchema[] {
new MeasurementSchema("s1", TSDataType.FLOAT),
new MeasurementSchema("s2", TSDataType.DOUBLE),
});
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
planRedoer.redoInsert(insertRowNode);
// check data in memTable
IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
// check d2.s1
MeasurementPath fullPath =
new MeasurementPath(
DEVICE2_NAME, "s1", new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
ReadOnlyMemChunk memChunk =
recoveryMemTable.query(new QueryContext(), fullPath, Long.MIN_VALUE, null);
IPointReader iterator = memChunk.getPointReader();
time = 5;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(1, timeValuePair.getValue().getFloat(), 0.0001);
++time;
}
assertEquals(6, time);
// check d2.s2
fullPath =
new MeasurementPath(
DEVICE2_NAME, "s2", new MeasurementSchema("s2", TSDataType.DOUBLE, TSEncoding.RLE));
memChunk = recoveryMemTable.query(new QueryContext(), fullPath, Long.MIN_VALUE, null);
iterator = memChunk.getPointReader();
time = 5;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(1, timeValuePair.getValue().getDouble(), 0.0001);
++time;
}
assertEquals(6, time);
}
@Test
public void testRedoInsertAlignedRowPlan() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE3_NAME, 5);
tsFileResource.updateEndTime(DEVICE3_NAME, 5);
// generate InsertRowPlan
TSDataType[] dataTypes =
new TSDataType[] {
TSDataType.INT32, TSDataType.INT64, TSDataType.BOOLEAN, TSDataType.FLOAT, TSDataType.TEXT
};
Object[] columns =
new Object[] {1, 1L, true, 1.0f, new Binary("1", TSFileConfig.STRING_CHARSET)};
InsertRowNode insertRowNode1 =
new InsertRowNode(
new PlanNodeId("0"),
new PartialPath(DEVICE3_NAME),
true,
new String[] {"s1", "s2", "s3", "s4", "s5"},
dataTypes,
5,
columns,
false);
insertRowNode1.setMeasurementSchemas(
new MeasurementSchema[] {
new MeasurementSchema("s1", TSDataType.INT32),
new MeasurementSchema("s2", TSDataType.INT64),
new MeasurementSchema("s3", TSDataType.BOOLEAN),
new MeasurementSchema("s4", TSDataType.FLOAT),
new MeasurementSchema("s5", TSDataType.TEXT),
});
InsertRowNode insertRowNode2 =
new InsertRowNode(
new PlanNodeId("0"),
new PartialPath(DEVICE3_NAME),
true,
new String[] {"s1", "s2", "s3", "s4", "s5"},
dataTypes,
6,
columns,
false);
insertRowNode2.setMeasurementSchemas(
new MeasurementSchema[] {
new MeasurementSchema("s1", TSDataType.INT32),
new MeasurementSchema("s2", TSDataType.INT64),
new MeasurementSchema("s3", TSDataType.BOOLEAN),
new MeasurementSchema("s4", TSDataType.FLOAT),
new MeasurementSchema("s5", TSDataType.TEXT),
});
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
planRedoer.redoInsert(insertRowNode1);
planRedoer.redoInsert(insertRowNode2);
// check data in memTable
IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
// check d3
AlignedPath fullPath =
new AlignedPath(
((PlainDeviceID) DEVICE3_NAME).toStringID(),
Arrays.asList("s1", "s2", "s3", "s4", "s5"),
Arrays.asList(
new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.RLE),
new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
new MeasurementSchema("s5", TSDataType.TEXT, TSEncoding.PLAIN)));
ReadOnlyMemChunk memChunk =
recoveryMemTable.query(new QueryContext(), fullPath, Long.MIN_VALUE, null);
IPointReader iterator = memChunk.getPointReader();
int time = 6;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(1, timeValuePair.getValue().getVector()[0].getInt());
assertEquals(1L, timeValuePair.getValue().getVector()[1].getLong());
assertTrue(timeValuePair.getValue().getVector()[2].getBoolean());
assertEquals(1, timeValuePair.getValue().getVector()[3].getFloat(), 0.00001);
assertEquals(BytesUtils.valueOf("1"), timeValuePair.getValue().getVector()[4].getBinary());
++time;
}
assertEquals(7, time);
}
@Test
public void testRedoInsertTabletPlan() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE1_NAME, 1);
tsFileResource.updateEndTime(DEVICE1_NAME, 2);
tsFileResource.updateStartTime(DEVICE2_NAME, 3);
tsFileResource.updateEndTime(DEVICE2_NAME, 4);
// generate InsertTabletPlan
long[] times = {5, 6, 7, 8};
List<Integer> dataTypes = new ArrayList<>();
dataTypes.add(TSDataType.INT32.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
Object[] columns = new Object[2];
columns[0] = new int[times.length];
columns[1] = new long[times.length];
for (int r = 0; r < times.length; r++) {
((int[]) columns[0])[r] = 100;
((long[]) columns[1])[r] = 10000;
}
BitMap[] bitMaps = new BitMap[dataTypes.size()];
for (int i = 0; i < dataTypes.size(); i++) {
if (bitMaps[i] == null) {
bitMaps[i] = new BitMap(times.length);
}
// mark value of time=8 as null
bitMaps[i].mark(3);
}
InsertTabletNode insertTabletNode =
new InsertTabletNode(
new PlanNodeId("0"),
new PartialPath(DEVICE1_NAME),
false,
new String[] {"s1", "s2"},
new TSDataType[] {TSDataType.INT32, TSDataType.INT64},
null,
times,
bitMaps,
columns,
times.length);
insertTabletNode.setMeasurementSchemas(
new MeasurementSchema[] {
new MeasurementSchema("s1", TSDataType.INT32),
new MeasurementSchema("s2", TSDataType.INT64),
});
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
planRedoer.redoInsert(insertTabletNode);
// check data in memTable
IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
// check d1.s1
MeasurementPath fullPath =
new MeasurementPath(
DEVICE1_NAME, "s1", new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE));
ReadOnlyMemChunk memChunk =
recoveryMemTable.query(new QueryContext(), fullPath, Long.MIN_VALUE, null);
IPointReader iterator = memChunk.getPointReader();
int time = 5;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(100, timeValuePair.getValue().getInt());
++time;
}
assertEquals(8, time);
// check d1.s2
fullPath =
new MeasurementPath(
DEVICE1_NAME, "s2", new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
memChunk = recoveryMemTable.query(new QueryContext(), fullPath, Long.MIN_VALUE, null);
iterator = memChunk.getPointReader();
time = 5;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(10000, timeValuePair.getValue().getLong());
++time;
}
assertEquals(8, time);
}
@Test
public void testRedoInsertAlignedTabletPlan() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE3_NAME, 5);
tsFileResource.updateStartTime(DEVICE3_NAME, 5);
// generate InsertTabletPlan
long[] times = {6, 7, 8, 9};
List<Integer> dataTypes = new ArrayList<>();
dataTypes.add(TSDataType.INT32.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
dataTypes.add(TSDataType.BOOLEAN.ordinal());
dataTypes.add(TSDataType.FLOAT.ordinal());
dataTypes.add(TSDataType.TEXT.ordinal());
Object[] columns = new Object[5];
columns[0] = new int[times.length];
columns[1] = new long[times.length];
columns[2] = new boolean[times.length];
columns[3] = new float[times.length];
columns[4] = new Binary[times.length];
for (int r = 0; r < times.length; r++) {
((int[]) columns[0])[r] = (r + 1) * 100;
((long[]) columns[1])[r] = (r + 1) * 100;
((boolean[]) columns[2])[r] = true;
((float[]) columns[3])[r] = (r + 1) * 100;
((Binary[]) columns[4])[r] = BytesUtils.valueOf((r + 1) * 100 + "");
}
BitMap[] bitMaps = new BitMap[dataTypes.size()];
for (int i = 0; i < dataTypes.size(); i++) {
if (bitMaps[i] == null) {
bitMaps[i] = new BitMap(times.length);
}
// mark value of time=9 as null
bitMaps[i].mark(3);
}
InsertTabletNode insertTabletNode =
new InsertTabletNode(
new PlanNodeId("0"),
new PartialPath(DEVICE3_NAME),
true,
new String[] {"s1", "s2", "s3", "s4", "s5"},
new TSDataType[] {
TSDataType.INT32,
TSDataType.INT64,
TSDataType.BOOLEAN,
TSDataType.FLOAT,
TSDataType.TEXT
},
null,
times,
bitMaps,
columns,
times.length);
insertTabletNode.setMeasurementSchemas(
new MeasurementSchema[] {
new MeasurementSchema("s1", TSDataType.INT32),
new MeasurementSchema("s2", TSDataType.INT64),
new MeasurementSchema("s3", TSDataType.BOOLEAN),
new MeasurementSchema("s4", TSDataType.FLOAT),
new MeasurementSchema("s5", TSDataType.TEXT),
});
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
planRedoer.redoInsert(insertTabletNode);
// check data in memTable
IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
// check d3
AlignedPath fullPath =
new AlignedPath(
((PlainDeviceID) DEVICE3_NAME).toStringID(),
Arrays.asList("s1", "s2", "s3", "s4", "s5"),
Arrays.asList(
new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.RLE),
new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
new MeasurementSchema("s5", TSDataType.TEXT, TSEncoding.PLAIN)));
ReadOnlyMemChunk memChunk =
recoveryMemTable.query(new QueryContext(), fullPath, Long.MIN_VALUE, null);
IPointReader iterator = memChunk.getPointReader();
int time = 6;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals((time - 5) * 100, timeValuePair.getValue().getVector()[0].getInt());
assertEquals((time - 5) * 100L, timeValuePair.getValue().getVector()[1].getLong());
assertEquals(true, timeValuePair.getValue().getVector()[2].getBoolean());
assertEquals((time - 5) * 100, timeValuePair.getValue().getVector()[3].getFloat(), 0.00001);
assertEquals(
BytesUtils.valueOf((time - 5) * 100 + ""),
timeValuePair.getValue().getVector()[4].getBinary());
++time;
}
assertEquals(9, time);
}
@Test
public void testRedoOverLapPlanIntoSeqFile() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE1_NAME, 1);
tsFileResource.updateEndTime(DEVICE1_NAME, 2);
tsFileResource.updateStartTime(DEVICE2_NAME, 3);
tsFileResource.updateEndTime(DEVICE2_NAME, 4);
// generate InsertTabletPlan, time=3 and time=4 are overlap
long[] times = {1, 2};
List<Integer> dataTypes = new ArrayList<>();
dataTypes.add(TSDataType.INT32.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
Object[] columns = new Object[2];
columns[0] = new int[times.length];
columns[1] = new long[times.length];
for (int r = 0; r < times.length; r++) {
((int[]) columns[0])[r] = 100;
((long[]) columns[1])[r] = 10000;
}
InsertTabletNode insertTabletNode =
new InsertTabletNode(
new PlanNodeId("0"),
new PartialPath(DEVICE1_NAME),
false,
new String[] {"s1", "s2"},
new TSDataType[] {TSDataType.INT32, TSDataType.INT64},
new MeasurementSchema[] {
new MeasurementSchema("s1", TSDataType.INT32),
new MeasurementSchema("s2", TSDataType.INT64),
},
times,
null,
columns,
times.length);
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
planRedoer.redoInsert(insertTabletNode);
// check data in memTable
IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
assertTrue(recoveryMemTable.isEmpty());
}
@Test
public void testRedoOverLapPlanIntoUnseqFile() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE1_NAME, 1);
tsFileResource.updateEndTime(DEVICE1_NAME, 2);
tsFileResource.updateStartTime(DEVICE2_NAME, 3);
tsFileResource.updateEndTime(DEVICE2_NAME, 4);
// generate InsertTabletPlan, time=3 and time=4 are overlap
long[] times = {1, 2};
List<Integer> dataTypes = new ArrayList<>();
dataTypes.add(TSDataType.INT32.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
Object[] columns = new Object[2];
columns[0] = new int[times.length];
columns[1] = new long[times.length];
for (int r = 0; r < times.length; r++) {
((int[]) columns[0])[r] = 100;
((long[]) columns[1])[r] = 10000;
}
InsertTabletNode insertTabletNode =
new InsertTabletNode(
new PlanNodeId("0"),
new PartialPath(DEVICE1_NAME),
false,
new String[] {"s1", "s2"},
new TSDataType[] {TSDataType.INT32, TSDataType.INT64},
new MeasurementSchema[] {
new MeasurementSchema("s1", TSDataType.INT32),
new MeasurementSchema("s2", TSDataType.INT64),
},
times,
null,
columns,
times.length);
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
planRedoer.redoInsert(insertTabletNode);
// check data in memTable
IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
// check d1.s1
MeasurementPath fullPath =
new MeasurementPath(
DEVICE1_NAME, "s1", new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE));
ReadOnlyMemChunk memChunk =
recoveryMemTable.query(new QueryContext(), fullPath, Long.MIN_VALUE, null);
assertTrue(memChunk == null || memChunk.isEmpty());
// check d1.s2
fullPath =
new MeasurementPath(
DEVICE1_NAME, "s2", new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
memChunk = recoveryMemTable.query(new QueryContext(), fullPath, Long.MIN_VALUE, null);
assertTrue(memChunk == null || memChunk.isEmpty());
}
@Test
public void testRedoDeleteDataNode() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE1_NAME, 1);
tsFileResource.updateEndTime(DEVICE1_NAME, 2);
tsFileResource.updateStartTime(DEVICE2_NAME, 3);
tsFileResource.updateEndTime(DEVICE2_NAME, 4);
// generate DeleteDataNode
DeleteDataNode deleteDataNode =
new DeleteDataNode(
new PlanNodeId(""),
Collections.singletonList(new PartialPath(DEVICE1_NAME)),
Long.MIN_VALUE,
Long.MAX_VALUE);
// redo DeleteDataNode, vsg processor is used to test IdTable, don't test IdTable here
File modsFile = new File(FILE_NAME.concat(ModificationFile.FILE_SUFFIX));
assertFalse(modsFile.exists());
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
planRedoer.redoDelete(deleteDataNode);
assertTrue(modsFile.exists());
}
@Test
public void testRedoAlignedInsertAfterDeleteTimeseries() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE3_NAME, 5);
tsFileResource.updateStartTime(DEVICE3_NAME, 5);
// generate InsertTabletPlan
long[] times = {6, 7, 8, 9};
List<Integer> dataTypes =
Arrays.asList(
TSDataType.INT32.ordinal(),
TSDataType.INT64.ordinal(),
TSDataType.BOOLEAN.ordinal(),
TSDataType.FLOAT.ordinal(),
TSDataType.TEXT.ordinal());
Object[] columns =
new Object[] {
new int[times.length],
new long[times.length],
new boolean[times.length],
new float[times.length],
new Binary[times.length]
};
for (int r = 0; r < times.length; r++) {
((int[]) columns[0])[r] = (r + 1) * 100;
((long[]) columns[1])[r] = (r + 1) * 100;
((boolean[]) columns[2])[r] = true;
((float[]) columns[3])[r] = (r + 1) * 100;
((Binary[]) columns[4])[r] = BytesUtils.valueOf((r + 1) * 100 + "");
}
BitMap[] bitMaps = new BitMap[dataTypes.size()];
for (int i = 0; i < dataTypes.size(); i++) {
if (bitMaps[i] == null) {
bitMaps[i] = new BitMap(times.length);
}
// mark value of time=9 as null
bitMaps[i].mark(3);
}
MeasurementSchema[] schemas =
new MeasurementSchema[] {
null,
new MeasurementSchema("s2", TSDataType.INT64),
new MeasurementSchema("s3", TSDataType.BOOLEAN),
new MeasurementSchema("s4", TSDataType.FLOAT),
null
};
InsertTabletNode insertTabletNode =
new InsertTabletNode(
new PlanNodeId(""),
new PartialPath(DEVICE3_NAME),
true,
new String[] {null, "s2", "s3", "s4", null},
new TSDataType[] {
TSDataType.INT32,
TSDataType.INT64,
TSDataType.BOOLEAN,
TSDataType.FLOAT,
TSDataType.TEXT
},
schemas,
times,
bitMaps,
columns,
times.length);
// redo InsertTabletPlan, data region is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
insertTabletNode.setMeasurementSchemas(schemas);
planRedoer.redoInsert(insertTabletNode);
// generate InsertRowPlan
int time = 9;
TSDataType[] dataTypes2 =
new TSDataType[] {
TSDataType.INT32, TSDataType.INT64, TSDataType.BOOLEAN, TSDataType.FLOAT, TSDataType.TEXT
};
Object[] columns2 =
new Object[] {400, 400L, true, 400.0f, new Binary("400", TSFileConfig.STRING_CHARSET)};
// redo InsertTabletPlan, data region is used to test IdTable, don't test IdTable here
InsertRowNode insertRowNode =
new InsertRowNode(
new PlanNodeId(""),
new PartialPath(DEVICE3_NAME),
true,
new String[] {null, "s2", "s3", "s4", null},
dataTypes2,
time,
columns2,
false);
insertRowNode.setMeasurementSchemas(schemas);
planRedoer.redoInsert(insertRowNode);
// check data in memTable
IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
// check d3
AlignedPath fullPath =
new AlignedPath(
((PlainDeviceID) DEVICE3_NAME).toStringID(),
Arrays.asList("s1", "s2", "s3", "s4", "s5"),
Arrays.asList(
new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.RLE),
new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
new MeasurementSchema("s5", TSDataType.TEXT, TSEncoding.PLAIN)));
ReadOnlyMemChunk memChunk =
recoveryMemTable.query(new QueryContext(), fullPath, Long.MIN_VALUE, null);
IPointReader iterator = memChunk.getPointReader();
time = 6;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(null, timeValuePair.getValue().getVector()[0]);
assertEquals((time - 5) * 100L, timeValuePair.getValue().getVector()[1].getLong());
assertEquals(true, timeValuePair.getValue().getVector()[2].getBoolean());
assertEquals((time - 5) * 100, timeValuePair.getValue().getVector()[3].getFloat(), 0.00001);
assertEquals(null, timeValuePair.getValue().getVector()[4]);
time++;
}
assertEquals(10, time);
}
private void generateCompleteFile(File tsFile) throws IOException, WriteProcessException {
try (TsFileWriter writer = new TsFileWriter(tsFile)) {
writer.registerTimeseries(
new Path(DEVICE1_NAME), new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE));
writer.registerTimeseries(
new Path(DEVICE1_NAME), new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
writer.registerTimeseries(
new Path(DEVICE2_NAME), new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.registerTimeseries(
new Path(DEVICE2_NAME), new MeasurementSchema("s2", TSDataType.DOUBLE, TSEncoding.RLE));
writer.registerAlignedTimeseries(
new Path(DEVICE3_NAME),
Arrays.asList(
new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.RLE),
new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
new MeasurementSchema("s5", TSDataType.TEXT, TSEncoding.PLAIN)));
writer.write(
new TSRecord(1, DEVICE1_NAME)
.addTuple(new IntDataPoint("s1", 1))
.addTuple(new LongDataPoint("s2", 1)));
writer.write(
new TSRecord(2, DEVICE1_NAME)
.addTuple(new IntDataPoint("s1", 2))
.addTuple(new LongDataPoint("s2", 2)));
writer.write(
new TSRecord(3, DEVICE2_NAME)
.addTuple(new FloatDataPoint("s1", 3))
.addTuple(new DoubleDataPoint("s2", 3)));
writer.write(
new TSRecord(4, DEVICE2_NAME)
.addTuple(new FloatDataPoint("s1", 4))
.addTuple(new DoubleDataPoint("s2", 4)));
writer.writeAligned(
new TSRecord(5, DEVICE3_NAME)
.addTuple(new IntDataPoint("s1", 5))
.addTuple(new LongDataPoint("s2", 5))
.addTuple(new BooleanDataPoint("s3", true))
.addTuple(new FloatDataPoint("s4", 5))
.addTuple(new StringDataPoint("s5", BytesUtils.valueOf("5"))));
}
}
}