blob: 1347bd3988841275c1b48b4e2f2a54e188915a78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.wal.recover.file;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.wal.utils.TsFileUtilsForRecoverTest;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.BooleanDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.StringDataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TsFilePlanRedoerTest {
private static final String SG_NAME = "root.recover_sg";
private static final String DEVICE1_NAME = SG_NAME.concat(".d1");
private static final String DEVICE2_NAME = SG_NAME.concat(".d2");
private static final String DEVICE3_NAME = SG_NAME.concat(".d3");
private static final String FILE_NAME =
TsFileUtilsForRecoverTest.getTestTsFilePath(SG_NAME, 0, 0, 1);
private TsFileResource tsFileResource;
private CompressionType compressionType;
boolean prevIsAutoCreateSchemaEnabled;
boolean prevIsEnablePartialInsert;
@Before
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
// set recover config, avoid creating deleted time series when recovering wal
prevIsAutoCreateSchemaEnabled =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
prevIsEnablePartialInsert = IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert();
IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(true);
compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(SG_NAME));
IoTDB.schemaProcessor.createTimeseries(
new PartialPath(DEVICE1_NAME.concat(".s1")),
TSDataType.INT32,
TSEncoding.RLE,
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
IoTDB.schemaProcessor.createTimeseries(
new PartialPath(DEVICE1_NAME.concat(".s2")),
TSDataType.INT64,
TSEncoding.RLE,
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
IoTDB.schemaProcessor.createTimeseries(
new PartialPath(DEVICE2_NAME.concat(".s1")),
TSDataType.FLOAT,
TSEncoding.RLE,
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
IoTDB.schemaProcessor.createTimeseries(
new PartialPath(DEVICE2_NAME.concat(".s2")),
TSDataType.DOUBLE,
TSEncoding.RLE,
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
IoTDB.schemaProcessor.createAlignedTimeSeries(
new PartialPath(DEVICE3_NAME),
Arrays.asList("s1", "s2", "s3", "s4", "s5"),
Arrays.asList(
TSDataType.INT32,
TSDataType.INT64,
TSDataType.BOOLEAN,
TSDataType.FLOAT,
TSDataType.TEXT),
Arrays.asList(
TSEncoding.RLE, TSEncoding.RLE, TSEncoding.RLE, TSEncoding.RLE, TSEncoding.PLAIN),
Arrays.asList(
compressionType, compressionType, compressionType, compressionType, compressionType));
}
@After
public void tearDown() throws Exception {
if (tsFileResource != null) {
tsFileResource.close();
}
File modsFile = new File(FILE_NAME.concat(ModificationFile.FILE_SUFFIX));
if (modsFile.exists()) {
modsFile.delete();
}
EnvironmentUtils.cleanEnv();
// reset config
IoTDBDescriptor.getInstance()
.getConfig()
.setAutoCreateSchemaEnabled(prevIsAutoCreateSchemaEnabled);
IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(prevIsEnablePartialInsert);
}
@Test
public void testRedoInsertRowPlan() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE1_NAME, 1);
tsFileResource.updateEndTime(DEVICE1_NAME, 2);
tsFileResource.updateStartTime(DEVICE2_NAME, 3);
tsFileResource.updateEndTime(DEVICE2_NAME, 4);
// generate InsertRowPlan
long time = 5;
TSDataType[] dataTypes = new TSDataType[] {TSDataType.FLOAT, TSDataType.DOUBLE};
String[] columns = new String[] {1 + "", 1.0 + ""};
InsertRowPlan insertRowPlan =
new InsertRowPlan(
new PartialPath(DEVICE2_NAME), time, new String[] {"s1", "s2"}, dataTypes, columns);
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true, null);
planRedoer.redoInsert(insertRowPlan);
// check data in memTable
IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
// check d2.s1
MeasurementPath fullPath =
new MeasurementPath(
DEVICE2_NAME, "s1", new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
ReadOnlyMemChunk memChunk = recoveryMemTable.query(fullPath, Long.MIN_VALUE, null);
IPointReader iterator = memChunk.getPointReader();
time = 5;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(1, timeValuePair.getValue().getFloat(), 0.0001);
++time;
}
assertEquals(6, time);
// check d2.s2
fullPath =
new MeasurementPath(
DEVICE2_NAME, "s2", new MeasurementSchema("s2", TSDataType.DOUBLE, TSEncoding.RLE));
memChunk = recoveryMemTable.query(fullPath, Long.MIN_VALUE, null);
iterator = memChunk.getPointReader();
time = 5;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(1, timeValuePair.getValue().getDouble(), 0.0001);
++time;
}
assertEquals(6, time);
}
@Test
public void testRedoInsertAlignedRowPlan() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE3_NAME, 5);
tsFileResource.updateStartTime(DEVICE3_NAME, 5);
// generate InsertRowPlan
long time = 6;
TSDataType[] dataTypes =
new TSDataType[] {
TSDataType.INT32, TSDataType.INT64, TSDataType.BOOLEAN, TSDataType.FLOAT, TSDataType.TEXT
};
String[] columns = new String[] {1 + "", 1 + "", true + "", 1.0 + "", "1"};
InsertRowPlan insertRowPlan =
new InsertRowPlan(
new PartialPath(DEVICE3_NAME),
time,
new String[] {"s1", "s2", "s3", "s4", "s5"},
dataTypes,
columns,
true);
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true, null);
planRedoer.redoInsert(insertRowPlan);
// check data in memTable
IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
// check d3
AlignedPath fullPath =
new AlignedPath(
DEVICE3_NAME,
Arrays.asList("s1", "s2", "s3", "s4", "s5"),
Arrays.asList(
new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.RLE),
new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
new MeasurementSchema("s5", TSDataType.TEXT, TSEncoding.PLAIN)));
ReadOnlyMemChunk memChunk = recoveryMemTable.query(fullPath, Long.MIN_VALUE, null);
IPointReader iterator = memChunk.getPointReader();
time = 6;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(1, timeValuePair.getValue().getVector()[0].getInt());
assertEquals(1L, timeValuePair.getValue().getVector()[1].getLong());
assertEquals(true, timeValuePair.getValue().getVector()[2].getBoolean());
assertEquals(1, timeValuePair.getValue().getVector()[3].getFloat(), 0.00001);
assertEquals(Binary.valueOf("1"), timeValuePair.getValue().getVector()[4].getBinary());
++time;
}
assertEquals(7, time);
}
@Test
public void testRedoInsertTabletPlan() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE1_NAME, 1);
tsFileResource.updateEndTime(DEVICE1_NAME, 2);
tsFileResource.updateStartTime(DEVICE2_NAME, 3);
tsFileResource.updateEndTime(DEVICE2_NAME, 4);
// generate InsertTabletPlan
long[] times = {5, 6, 7, 8};
List<Integer> dataTypes = new ArrayList<>();
dataTypes.add(TSDataType.INT32.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
Object[] columns = new Object[2];
columns[0] = new int[times.length];
columns[1] = new long[times.length];
for (int r = 0; r < times.length; r++) {
((int[]) columns[0])[r] = 100;
((long[]) columns[1])[r] = 10000;
}
BitMap[] bitMaps = new BitMap[dataTypes.size()];
for (int i = 0; i < dataTypes.size(); i++) {
if (bitMaps[i] == null) {
bitMaps[i] = new BitMap(times.length);
}
// mark value of time=8 as null
bitMaps[i].mark(3);
}
InsertTabletPlan insertTabletPlan =
new InsertTabletPlan(new PartialPath(DEVICE1_NAME), new String[] {"s1", "s2"}, dataTypes);
insertTabletPlan.setTimes(times);
insertTabletPlan.setColumns(columns);
insertTabletPlan.setRowCount(times.length);
insertTabletPlan.setBitMaps(bitMaps);
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true, null);
planRedoer.redoInsert(insertTabletPlan);
// check data in memTable
IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
// check d1.s1
MeasurementPath fullPath =
new MeasurementPath(
DEVICE1_NAME, "s1", new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE));
ReadOnlyMemChunk memChunk = recoveryMemTable.query(fullPath, Long.MIN_VALUE, null);
IPointReader iterator = memChunk.getPointReader();
int time = 5;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(100, timeValuePair.getValue().getInt());
++time;
}
assertEquals(8, time);
// check d1.s2
fullPath =
new MeasurementPath(
DEVICE1_NAME, "s2", new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
memChunk = recoveryMemTable.query(fullPath, Long.MIN_VALUE, null);
iterator = memChunk.getPointReader();
time = 5;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(10000, timeValuePair.getValue().getLong());
++time;
}
assertEquals(8, time);
}
@Test
public void testRedoInsertAlignedTabletPlan() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE3_NAME, 5);
tsFileResource.updateStartTime(DEVICE3_NAME, 5);
// generate InsertTabletPlan
long[] times = {6, 7, 8, 9};
List<Integer> dataTypes = new ArrayList<>();
dataTypes.add(TSDataType.INT32.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
dataTypes.add(TSDataType.BOOLEAN.ordinal());
dataTypes.add(TSDataType.FLOAT.ordinal());
dataTypes.add(TSDataType.TEXT.ordinal());
Object[] columns = new Object[5];
columns[0] = new int[times.length];
columns[1] = new long[times.length];
columns[2] = new boolean[times.length];
columns[3] = new float[times.length];
columns[4] = new Binary[times.length];
for (int r = 0; r < times.length; r++) {
((int[]) columns[0])[r] = (r + 1) * 100;
((long[]) columns[1])[r] = (r + 1) * 100;
((boolean[]) columns[2])[r] = true;
((float[]) columns[3])[r] = (r + 1) * 100;
((Binary[]) columns[4])[r] = Binary.valueOf((r + 1) * 100 + "");
}
BitMap[] bitMaps = new BitMap[dataTypes.size()];
for (int i = 0; i < dataTypes.size(); i++) {
if (bitMaps[i] == null) {
bitMaps[i] = new BitMap(times.length);
}
// mark value of time=9 as null
bitMaps[i].mark(3);
}
InsertTabletPlan insertTabletPlan =
new InsertTabletPlan(
new PartialPath(DEVICE3_NAME),
new String[] {"s1", "s2", "s3", "s4", "s5"},
dataTypes,
true);
insertTabletPlan.setTimes(times);
insertTabletPlan.setColumns(columns);
insertTabletPlan.setRowCount(times.length);
insertTabletPlan.setBitMaps(bitMaps);
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true, null);
planRedoer.redoInsert(insertTabletPlan);
// check data in memTable
IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
// check d3
AlignedPath fullPath =
new AlignedPath(
DEVICE3_NAME,
Arrays.asList("s1", "s2", "s3", "s4", "s5"),
Arrays.asList(
new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.RLE),
new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
new MeasurementSchema("s5", TSDataType.TEXT, TSEncoding.PLAIN)));
ReadOnlyMemChunk memChunk = recoveryMemTable.query(fullPath, Long.MIN_VALUE, null);
IPointReader iterator = memChunk.getPointReader();
int time = 6;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals((time - 5) * 100, timeValuePair.getValue().getVector()[0].getInt());
assertEquals((time - 5) * 100L, timeValuePair.getValue().getVector()[1].getLong());
assertEquals(true, timeValuePair.getValue().getVector()[2].getBoolean());
assertEquals((time - 5) * 100, timeValuePair.getValue().getVector()[3].getFloat(), 0.00001);
assertEquals(
Binary.valueOf((time - 5) * 100 + ""),
timeValuePair.getValue().getVector()[4].getBinary());
++time;
}
assertEquals(9, time);
}
@Test
public void testRedoOverLapPlanIntoSeqFile() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE1_NAME, 1);
tsFileResource.updateEndTime(DEVICE1_NAME, 2);
tsFileResource.updateStartTime(DEVICE2_NAME, 3);
tsFileResource.updateEndTime(DEVICE2_NAME, 4);
// generate InsertTabletPlan, time=3 and time=4 are overlap
long[] times = {1, 2};
List<Integer> dataTypes = new ArrayList<>();
dataTypes.add(TSDataType.INT32.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
Object[] columns = new Object[2];
columns[0] = new int[times.length];
columns[1] = new long[times.length];
for (int r = 0; r < times.length; r++) {
((int[]) columns[0])[r] = 100;
((long[]) columns[1])[r] = 10000;
}
InsertTabletPlan insertTabletPlan =
new InsertTabletPlan(new PartialPath(DEVICE1_NAME), new String[] {"s1", "s2"}, dataTypes);
insertTabletPlan.setTimes(times);
insertTabletPlan.setColumns(columns);
insertTabletPlan.setRowCount(times.length);
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true, null);
planRedoer.redoInsert(insertTabletPlan);
// check data in memTable
IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
assertTrue(recoveryMemTable.isEmpty());
}
@Test
public void testRedoOverLapPlanIntoUnseqFile() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE1_NAME, 1);
tsFileResource.updateEndTime(DEVICE1_NAME, 2);
tsFileResource.updateStartTime(DEVICE2_NAME, 3);
tsFileResource.updateEndTime(DEVICE2_NAME, 4);
// generate InsertTabletPlan, time=3 and time=4 are overlap
long[] times = {1, 2};
List<Integer> dataTypes = new ArrayList<>();
dataTypes.add(TSDataType.INT32.ordinal());
dataTypes.add(TSDataType.INT64.ordinal());
Object[] columns = new Object[2];
columns[0] = new int[times.length];
columns[1] = new long[times.length];
for (int r = 0; r < times.length; r++) {
((int[]) columns[0])[r] = 100;
((long[]) columns[1])[r] = 10000;
}
InsertTabletPlan insertTabletPlan =
new InsertTabletPlan(new PartialPath(DEVICE1_NAME), new String[] {"s1", "s2"}, dataTypes);
insertTabletPlan.setTimes(times);
insertTabletPlan.setColumns(columns);
insertTabletPlan.setRowCount(times.length);
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, false, null);
planRedoer.redoInsert(insertTabletPlan);
// check data in memTable
IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
// check d1.s1
MeasurementPath fullPath =
new MeasurementPath(
DEVICE1_NAME, "s1", new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE));
ReadOnlyMemChunk memChunk = recoveryMemTable.query(fullPath, Long.MIN_VALUE, null);
IPointReader iterator = memChunk.getPointReader();
int time = 1;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(100, timeValuePair.getValue().getInt());
++time;
}
assertEquals(3, time);
// check d1.s2
fullPath =
new MeasurementPath(
DEVICE1_NAME, "s2", new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
memChunk = recoveryMemTable.query(fullPath, Long.MIN_VALUE, null);
iterator = memChunk.getPointReader();
time = 1;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(10000, timeValuePair.getValue().getLong());
++time;
}
assertEquals(3, time);
}
@Test
public void testRedoDeletePlan() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE1_NAME, 1);
tsFileResource.updateEndTime(DEVICE1_NAME, 2);
tsFileResource.updateStartTime(DEVICE2_NAME, 3);
tsFileResource.updateEndTime(DEVICE2_NAME, 4);
// generate DeletePlan
DeletePlan deletePlan =
new DeletePlan(Long.MIN_VALUE, Long.MAX_VALUE, new PartialPath(DEVICE1_NAME));
// redo DeletePlan, vsg processor is used to test IdTable, don't test IdTable here
File modsFile = new File(FILE_NAME.concat(ModificationFile.FILE_SUFFIX));
assertFalse(modsFile.exists());
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, false, null);
planRedoer.redoDelete(deletePlan);
assertTrue(modsFile.exists());
}
@Test
public void testRedoDeleteDataNode() throws Exception {
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE1_NAME, 1);
tsFileResource.updateEndTime(DEVICE1_NAME, 2);
tsFileResource.updateStartTime(DEVICE2_NAME, 3);
tsFileResource.updateEndTime(DEVICE2_NAME, 4);
// generate DeletePlan
DeleteDataNode deleteDataNode =
new DeleteDataNode(
new PlanNodeId(""),
Collections.singletonList(new PartialPath(DEVICE1_NAME)),
Long.MIN_VALUE,
Long.MAX_VALUE);
// redo DeletePlan, vsg processor is used to test IdTable, don't test IdTable here
File modsFile = new File(FILE_NAME.concat(ModificationFile.FILE_SUFFIX));
assertFalse(modsFile.exists());
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, false, null);
planRedoer.redoDelete(deleteDataNode);
assertTrue(modsFile.exists());
}
@Test
public void testRedoAlignedInsertAfterDeleteTimeseries() throws Exception {
// some timeseries have been deleted
IoTDB.schemaProcessor.deleteTimeseries(new PartialPath(DEVICE3_NAME.concat(".s1")));
IoTDB.schemaProcessor.deleteTimeseries(new PartialPath(DEVICE3_NAME.concat(".s5")));
// generate .tsfile and update resource in memory
File file = new File(FILE_NAME);
generateCompleteFile(file);
tsFileResource = new TsFileResource(file);
tsFileResource.updateStartTime(DEVICE3_NAME, 5);
tsFileResource.updateStartTime(DEVICE3_NAME, 5);
// generate InsertTabletPlan
long[] times = {6, 7, 8, 9};
List<Integer> dataTypes =
Arrays.asList(
TSDataType.INT32.ordinal(),
TSDataType.INT64.ordinal(),
TSDataType.BOOLEAN.ordinal(),
TSDataType.FLOAT.ordinal(),
TSDataType.TEXT.ordinal());
Object[] columns =
new Object[] {
new int[times.length],
new long[times.length],
new boolean[times.length],
new float[times.length],
new Binary[times.length]
};
for (int r = 0; r < times.length; r++) {
((int[]) columns[0])[r] = (r + 1) * 100;
((long[]) columns[1])[r] = (r + 1) * 100;
((boolean[]) columns[2])[r] = true;
((float[]) columns[3])[r] = (r + 1) * 100;
((Binary[]) columns[4])[r] = Binary.valueOf((r + 1) * 100 + "");
}
BitMap[] bitMaps = new BitMap[dataTypes.size()];
for (int i = 0; i < dataTypes.size(); i++) {
if (bitMaps[i] == null) {
bitMaps[i] = new BitMap(times.length);
}
// mark value of time=9 as null
bitMaps[i].mark(3);
}
InsertTabletPlan insertTabletPlan =
new InsertTabletPlan(
new PartialPath(DEVICE3_NAME),
new String[] {"s1", "s2", "s3", "s4", "s5"},
dataTypes,
true);
insertTabletPlan.setTimes(times);
insertTabletPlan.setColumns(columns);
insertTabletPlan.setRowCount(times.length);
insertTabletPlan.setBitMaps(bitMaps);
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true, null);
planRedoer.redoInsert(insertTabletPlan);
// generate InsertRowPlan
int time = 9;
TSDataType[] dataTypes2 =
new TSDataType[] {
TSDataType.INT32, TSDataType.INT64, TSDataType.BOOLEAN, TSDataType.FLOAT, TSDataType.TEXT
};
String[] columns2 = new String[] {400 + "", 400 + "", true + "", 400.0 + "", "400"};
InsertRowPlan insertRowPlan =
new InsertRowPlan(
new PartialPath(DEVICE3_NAME),
time,
new String[] {"s1", "s2", "s3", "s4", "s5"},
dataTypes2,
columns2,
true);
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't test IdTable here
planRedoer.redoInsert(insertRowPlan);
// check data in memTable
IMemTable recoveryMemTable = planRedoer.getRecoveryMemTable();
// check d3
AlignedPath fullPath =
new AlignedPath(
DEVICE3_NAME,
Arrays.asList("s1", "s2", "s3", "s4", "s5"),
Arrays.asList(
new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.RLE),
new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
new MeasurementSchema("s5", TSDataType.TEXT, TSEncoding.PLAIN)));
ReadOnlyMemChunk memChunk = recoveryMemTable.query(fullPath, Long.MIN_VALUE, null);
IPointReader iterator = memChunk.getPointReader();
time = 6;
while (iterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = iterator.nextTimeValuePair();
assertEquals(time, timeValuePair.getTimestamp());
assertEquals(null, timeValuePair.getValue().getVector()[0]);
assertEquals((time - 5) * 100L, timeValuePair.getValue().getVector()[1].getLong());
assertEquals(true, timeValuePair.getValue().getVector()[2].getBoolean());
assertEquals((time - 5) * 100, timeValuePair.getValue().getVector()[3].getFloat(), 0.00001);
assertEquals(null, timeValuePair.getValue().getVector()[4]);
time++;
}
assertEquals(10, time);
}
private void generateCompleteFile(File tsFile) throws IOException, WriteProcessException {
try (TsFileWriter writer = new TsFileWriter(tsFile)) {
writer.registerTimeseries(
new Path(DEVICE1_NAME), new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE));
writer.registerTimeseries(
new Path(DEVICE1_NAME), new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
writer.registerTimeseries(
new Path(DEVICE2_NAME), new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.RLE));
writer.registerTimeseries(
new Path(DEVICE2_NAME), new MeasurementSchema("s2", TSDataType.DOUBLE, TSEncoding.RLE));
writer.registerAlignedTimeseries(
new Path(DEVICE3_NAME),
Arrays.asList(
new MeasurementSchema("s1", TSDataType.INT32, TSEncoding.RLE),
new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE),
new MeasurementSchema("s3", TSDataType.BOOLEAN, TSEncoding.RLE),
new MeasurementSchema("s4", TSDataType.FLOAT, TSEncoding.RLE),
new MeasurementSchema("s5", TSDataType.TEXT, TSEncoding.PLAIN)));
writer.write(
new TSRecord(1, DEVICE1_NAME)
.addTuple(new IntDataPoint("s1", 1))
.addTuple(new LongDataPoint("s2", 1)));
writer.write(
new TSRecord(2, DEVICE1_NAME)
.addTuple(new IntDataPoint("s1", 2))
.addTuple(new LongDataPoint("s2", 2)));
writer.write(
new TSRecord(3, DEVICE2_NAME)
.addTuple(new FloatDataPoint("s1", 3))
.addTuple(new DoubleDataPoint("s2", 3)));
writer.write(
new TSRecord(4, DEVICE2_NAME)
.addTuple(new FloatDataPoint("s1", 4))
.addTuple(new DoubleDataPoint("s2", 4)));
writer.writeAligned(
new TSRecord(5, DEVICE3_NAME)
.addTuple(new IntDataPoint("s1", 5))
.addTuple(new LongDataPoint("s2", 5))
.addTuple(new BooleanDataPoint("s3", true))
.addTuple(new FloatDataPoint("s4", 5))
.addTuple(new StringDataPoint("s5", Binary.valueOf("5"))));
}
}
}