blob: a23416b890a239b009f4b3ffcd94ae94c0262f2a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.wal.node;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ConsensusReqReaderTest {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final String identifier = String.valueOf(Integer.MAX_VALUE);
private static final String logDirectory = TestConstant.BASE_OUTPUT_PATH.concat("wal-test");
private static final String devicePath = "root.test_sg.test_d";
private WALMode prevMode;
private WALNode walNode;
@Before
public void setUp() throws Exception {
EnvironmentUtils.cleanDir(logDirectory);
prevMode = config.getWalMode();
config.setWalMode(WALMode.SYNC);
walNode = new WALNode(identifier, logDirectory);
}
@After
public void tearDown() throws Exception {
walNode.close();
config.setWalMode(prevMode);
EnvironmentUtils.cleanDir(logDirectory);
}
/**
* Generate wal files as below: <br>
* _0-0-1.wal: 1,-1 <br>
* _1-1-1.wal: 2,2,2 <br>
* _2-2-1.wal: 3,3 <br>
* _3-3-1.wal: 3,4 <br>
* _4-4-1.wal: 4 <br>
* _5-4-1.wal: 4,4,5 <br>
* _6-5-1.wal: 6 <br>
* 1 - InsertRowNode, 2 - InsertRowsOfOneDeviceNode, 3 - InsertRowsNode, 4 -
* InsertMultiTabletsNode, 5 - InsertTabletNode, 6 - InsertRowNode
*/
private void simulateFileScenario01() throws IllegalPathException {
InsertTabletNode insertTabletNode;
InsertRowNode insertRowNode;
// _0-0-1.wal
insertRowNode = getInsertRowNode(devicePath);
insertRowNode.setSearchIndex(1);
walNode.log(0, insertRowNode); // 1
insertTabletNode = getInsertTabletNode(devicePath, new long[] {2});
walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // -1
walNode.rollWALFile();
// _1-1-1.wal
insertRowNode = getInsertRowNode(devicePath);
insertRowNode.setSearchIndex(2);
walNode.log(0, insertRowNode); // 2
walNode.log(0, insertRowNode); // 2
walNode.log(0, insertRowNode); // 2
walNode.rollWALFile();
// _2-2-1.wal
insertRowNode = getInsertRowNode(devicePath);
insertRowNode.setSearchIndex(3);
walNode.log(0, insertRowNode); // 3
walNode.log(0, insertRowNode); // 3
walNode.rollWALFile();
// _3-3-1.wal
insertRowNode.setDevicePath(new PartialPath(devicePath + "test"));
walNode.log(0, insertRowNode); // 3
insertTabletNode = getInsertTabletNode(devicePath, new long[] {4});
insertTabletNode.setSearchIndex(4);
walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
walNode.rollWALFile();
// _4-4-1.wal
walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
walNode.rollWALFile();
// _5-4-1.wal
walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 4
insertTabletNode = getInsertTabletNode(devicePath, new long[] {5});
insertTabletNode.setSearchIndex(5);
walNode.log(0, insertTabletNode, 0, insertTabletNode.getRowCount()); // 5
walNode.rollWALFile();
// _6-5-1.wal
insertRowNode = getInsertRowNode(devicePath);
insertRowNode.setSearchIndex(6);
walNode.log(0, insertRowNode); // 6
}
@Test
public void scenario01TestGetReqIterator01() throws Exception {
simulateFileScenario01();
walNode.rollWALFile();
IndexedConsensusRequest request;
PlanNode planNode;
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(1, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertRowNode);
Assert.assertEquals(1, ((InsertRowNode) planNode).getSearchIndex());
}
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(3, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertRowNode);
Assert.assertEquals(2, ((InsertRowNode) planNode).getSearchIndex());
}
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(3, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertRowNode);
Assert.assertEquals(3, ((InsertRowNode) planNode).getSearchIndex());
}
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(4, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertTabletNode);
Assert.assertEquals(4, ((InsertTabletNode) planNode).getSearchIndex());
}
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(1, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertTabletNode);
Assert.assertEquals(5, ((InsertTabletNode) planNode).getSearchIndex());
}
Assert.assertFalse(iterator.hasNext());
}
@Test
public void scenario01TestGetReqIterator02() throws Exception {
simulateFileScenario01();
IndexedConsensusRequest request;
PlanNode planNode;
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(4);
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(4, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertTabletNode);
Assert.assertEquals(4, ((InsertTabletNode) planNode).getSearchIndex());
}
Assert.assertFalse(iterator.hasNext());
// wait for next
ExecutorService checkThread = Executors.newSingleThreadExecutor();
Future<Boolean> future =
checkThread.submit(
() -> {
iterator.waitForNextReady();
Assert.assertTrue(iterator.hasNext());
IndexedConsensusRequest req = iterator.next();
Assert.assertEquals(1, req.getRequests().size());
for (IConsensusRequest innerRequest : req.getRequests()) {
PlanNode node =
WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(node instanceof InsertTabletNode);
Assert.assertEquals(5, ((InsertTabletNode) node).getSearchIndex());
}
return true;
});
Thread.sleep(500);
walNode.rollWALFile();
InsertRowNode insertRowNode = getInsertRowNode(devicePath);
walNode.log(0, insertRowNode); // put -1 after 6
Assert.assertTrue(future.get());
checkThread.shutdown();
}
@Test
public void scenario01TestGetReqIterator03() throws Exception {
simulateFileScenario01();
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
Assert.assertFalse(iterator.hasNext());
// wait for next
ExecutorService checkThread = Executors.newSingleThreadExecutor();
Future<Boolean> future =
checkThread.submit(
() -> {
iterator.waitForNextReady();
IndexedConsensusRequest request;
PlanNode planNode;
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(1, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertTabletNode);
Assert.assertEquals(5, ((InsertTabletNode) planNode).getSearchIndex());
}
iterator.waitForNextReady();
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(1, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertRowNode);
Assert.assertEquals(6, ((InsertRowNode) planNode).getSearchIndex());
}
return true;
});
Thread.sleep(500);
walNode.rollWALFile();
InsertRowNode insertRowNode = getInsertRowNode(devicePath);
walNode.log(0, insertRowNode); // put -1 after 6
walNode.rollWALFile();
insertRowNode = getInsertRowNode(devicePath);
walNode.log(0, insertRowNode); // put -1 after 6
Assert.assertTrue(future.get());
checkThread.shutdown();
}
@Test
public void scenario01TestGetReqIterator04() throws Exception {
simulateFileScenario01();
walNode.rollWALFile();
IndexedConsensusRequest request;
PlanNode planNode;
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(1, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertRowNode);
Assert.assertEquals(1, ((InsertRowNode) planNode).getSearchIndex());
}
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(3, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertRowNode);
Assert.assertEquals(2, ((InsertRowNode) planNode).getSearchIndex());
}
iterator.skipTo(4);
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(4, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertTabletNode);
Assert.assertEquals(4, ((InsertTabletNode) planNode).getSearchIndex());
}
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(1, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertTabletNode);
Assert.assertEquals(5, ((InsertTabletNode) planNode).getSearchIndex());
}
Assert.assertFalse(iterator.hasNext());
}
@Test
public void scenario01TestGetReqIterator05() throws Exception {
simulateFileScenario01();
walNode.rollWALFile();
IndexedConsensusRequest request;
PlanNode planNode;
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(1, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertTabletNode);
Assert.assertEquals(5, ((InsertTabletNode) planNode).getSearchIndex());
}
iterator.skipTo(2);
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(3, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertRowNode);
Assert.assertEquals(2, ((InsertRowNode) planNode).getSearchIndex());
}
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(3, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertRowNode);
Assert.assertEquals(3, ((InsertRowNode) planNode).getSearchIndex());
}
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(4, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertTabletNode);
Assert.assertEquals(4, ((InsertTabletNode) planNode).getSearchIndex());
}
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(1, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof InsertTabletNode);
Assert.assertEquals(5, ((InsertTabletNode) planNode).getSearchIndex());
}
Assert.assertFalse(iterator.hasNext());
}
@Test
public void scenario01TestGetReqIterator06() throws Exception {
simulateFileScenario01();
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(5);
Assert.assertFalse(iterator.hasNext());
}
@Test
public void scenario01TestGetReqIterator07() throws Exception {
simulateFileScenario01();
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(6);
Assert.assertFalse(iterator.hasNext());
}
/**
* Generate wal files as below: <br>
* _0-0-0.wal: -1,-1 <br>
* _1-0-0.wal: -1 <br>
* _2-0-1.wal: -1,1 <br>
* _3-1-0.wal: -1 <br>
* 1 - DeleteDataNode
*/
private void simulateFileScenario02() throws IllegalPathException {
InsertRowNode insertRowNode = getInsertRowNode(devicePath);
// _0-0-0.wal
walNode.log(0, insertRowNode); // -1
walNode.log(0, insertRowNode); // -1
walNode.rollWALFile();
// _1-0-0.wal
walNode.log(0, insertRowNode); // -1
walNode.rollWALFile();
// _2-0-1.wal
walNode.log(0, insertRowNode); // -1
DeleteDataNode deleteDataNode = getDeleteDataNode(devicePath);
deleteDataNode.setSearchIndex(1);
walNode.log(0, deleteDataNode); // 1
walNode.rollWALFile();
// _3-1-0.wal
walNode.log(0, insertRowNode); // -1
}
@Test
public void scenario02TestGetReqIterator01() throws Exception {
simulateFileScenario02();
walNode.rollWALFile();
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
IndexedConsensusRequest request;
PlanNode planNode;
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(1, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof DeleteDataNode);
Assert.assertEquals(1, ((DeleteDataNode) planNode).getSearchIndex());
}
}
/**
* Generate wal files as below: <br>
* _0-0-1.wal: broken <br>
* _1-0-1.wal: 1,-1 <br>
* _2-1-1.wal: 2,2,2 <br>
* _3-2-1.wal: 3,3 <br>
* _4-3-1.wal: broken <br>
* _5-3-1.wal: 3,5 <br>
* _6-5-1.wal: broken <br>
* _7-5-1.wal: broken <br>
* _8-5-1.wal: broken <br>
* _9-8-1.wal: 8 <br>
* 1,2,3,5,8 - DeleteDataNode
*/
private void simulateFileScenario03() throws IllegalPathException, IOException {
// _0-0-1.wal
walNode.rollWALFile();
new File(logDirectory, "_0-0-0.wal").delete();
new File(logDirectory, "_0-0-1.wal").createNewFile();
// _1-0-1.wal
DeleteDataNode deleteDataNode = getDeleteDataNode(devicePath);
deleteDataNode.setSearchIndex(1);
walNode.log(0, deleteDataNode); // 1
deleteDataNode = getDeleteDataNode(devicePath);
deleteDataNode.setSearchIndex(-1);
walNode.log(0, deleteDataNode); // -1
walNode.rollWALFile();
// _2-1-1.wal
deleteDataNode = getDeleteDataNode(devicePath);
deleteDataNode.setSearchIndex(2);
walNode.log(0, deleteDataNode); // 2
walNode.log(0, deleteDataNode); // 2
walNode.log(0, deleteDataNode); // 2
walNode.rollWALFile();
// _3-2-1.wal
deleteDataNode = getDeleteDataNode(devicePath);
deleteDataNode.setSearchIndex(3);
walNode.log(0, deleteDataNode); // 3
walNode.log(0, deleteDataNode); // 3
walNode.rollWALFile();
// _4-3-1.wal
walNode.rollWALFile();
new File(logDirectory, "_4-3-0.wal").delete();
new File(logDirectory, "_4-3-1.wal").createNewFile();
// _5-3-1.wal
deleteDataNode = getDeleteDataNode(devicePath);
deleteDataNode.setSearchIndex(3);
walNode.log(0, deleteDataNode); // 3
deleteDataNode = getDeleteDataNode(devicePath);
deleteDataNode.setSearchIndex(5);
walNode.log(0, deleteDataNode); // 5
walNode.rollWALFile();
// _6-5-1.wal
deleteDataNode = getDeleteDataNode(devicePath);
deleteDataNode.setSearchIndex(6);
walNode.log(0, deleteDataNode); // 6
deleteDataNode = getDeleteDataNode(devicePath);
deleteDataNode.setSearchIndex(7);
walNode.log(0, deleteDataNode); // 7
deleteDataNode = getDeleteDataNode(devicePath);
deleteDataNode.setSearchIndex(8);
walNode.log(0, deleteDataNode); // 8
walNode.rollWALFile();
new File(logDirectory, "_6-5-1.wal").delete();
new File(logDirectory, "_6-5-1.wal").createNewFile();
// _7-5-1.wal
walNode.rollWALFile();
new File(logDirectory, "_7-5-0.wal").delete();
new File(logDirectory, "_7-5-1.wal").createNewFile();
// _8-5-1.wal
walNode.rollWALFile();
new File(logDirectory, "_8-5-0.wal").delete();
new File(logDirectory, "_8-5-1.wal").createNewFile();
// _9-8-1.wal
deleteDataNode = getDeleteDataNode(devicePath);
deleteDataNode.setSearchIndex(8);
walNode.log(0, deleteDataNode); // 8
}
@Test
public void scenario03TestGetReqIterator01() throws Exception {
simulateFileScenario03();
walNode.rollWALFile();
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
IndexedConsensusRequest request;
PlanNode planNode;
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(1, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof DeleteDataNode);
Assert.assertEquals(1, ((DeleteDataNode) planNode).getSearchIndex());
}
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(3, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof DeleteDataNode);
Assert.assertEquals(2, ((DeleteDataNode) planNode).getSearchIndex());
}
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(3, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof DeleteDataNode);
Assert.assertEquals(3, ((DeleteDataNode) planNode).getSearchIndex());
}
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(1, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof DeleteDataNode);
Assert.assertEquals(5, ((DeleteDataNode) planNode).getSearchIndex());
}
Assert.assertFalse(iterator.hasNext());
walNode.rollWALFile();
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(1, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof DeleteDataNode);
Assert.assertEquals(8, ((DeleteDataNode) planNode).getSearchIndex());
}
Assert.assertFalse(iterator.hasNext());
}
@Test
public void scenario03TestGetReqIterator02() throws Exception {
simulateFileScenario03();
walNode.rollWALFile();
ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(6);
IndexedConsensusRequest request;
PlanNode planNode;
Assert.assertFalse(iterator.hasNext());
walNode.rollWALFile();
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(1, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof DeleteDataNode);
Assert.assertEquals(8, ((DeleteDataNode) planNode).getSearchIndex());
}
Assert.assertFalse(iterator.hasNext());
iterator.skipTo(3);
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(3, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof DeleteDataNode);
Assert.assertEquals(3, ((DeleteDataNode) planNode).getSearchIndex());
}
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(1, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof DeleteDataNode);
Assert.assertEquals(5, ((DeleteDataNode) planNode).getSearchIndex());
}
Assert.assertTrue(iterator.hasNext());
request = iterator.next();
Assert.assertEquals(1, request.getRequests().size());
for (IConsensusRequest innerRequest : request.getRequests()) {
planNode = WALEntry.deserializeForConsensus(innerRequest.serializeToByteBuffer());
Assert.assertTrue(planNode instanceof DeleteDataNode);
Assert.assertEquals(8, ((DeleteDataNode) planNode).getSearchIndex());
}
Assert.assertFalse(iterator.hasNext());
}
public static InsertRowNode getInsertRowNode(String devicePath) throws IllegalPathException {
long time = 110L;
TSDataType[] dataTypes =
new TSDataType[] {
TSDataType.DOUBLE,
TSDataType.FLOAT,
TSDataType.INT64,
TSDataType.INT32,
TSDataType.BOOLEAN,
TSDataType.TEXT
};
Object[] columns = new Object[6];
columns[0] = 1.0;
columns[1] = 2.0f;
columns[2] = 10000L;
columns[3] = 100;
columns[4] = false;
columns[5] = new Binary("hh" + 0, TSFileConfig.STRING_CHARSET);
InsertRowNode insertRowNode =
new InsertRowNode(
new PlanNodeId(""),
new PartialPath(devicePath),
false,
new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
dataTypes,
time,
columns,
false);
insertRowNode.setMeasurementSchemas(
new MeasurementSchema[] {
new MeasurementSchema("s1", TSDataType.DOUBLE),
new MeasurementSchema("s2", TSDataType.FLOAT),
new MeasurementSchema("s3", TSDataType.INT64),
new MeasurementSchema("s4", TSDataType.INT32),
new MeasurementSchema("s5", TSDataType.BOOLEAN),
new MeasurementSchema("s6", TSDataType.TEXT)
});
return insertRowNode;
}
private InsertTabletNode getInsertTabletNode(String devicePath, long[] times)
throws IllegalPathException {
TSDataType[] dataTypes =
new TSDataType[] {
TSDataType.DOUBLE,
TSDataType.FLOAT,
TSDataType.INT64,
TSDataType.INT32,
TSDataType.BOOLEAN,
TSDataType.TEXT
};
Object[] columns = new Object[6];
columns[0] = new double[times.length];
columns[1] = new float[times.length];
columns[2] = new long[times.length];
columns[3] = new int[times.length];
columns[4] = new boolean[times.length];
columns[5] = new Binary[times.length];
for (int r = 0; r < times.length; r++) {
((double[]) columns[0])[r] = 1.0 + r;
((float[]) columns[1])[r] = 2 + r;
((long[]) columns[2])[r] = 10000 + r;
((int[]) columns[3])[r] = 100 + r;
((boolean[]) columns[4])[r] = (r % 2 == 0);
((Binary[]) columns[5])[r] = new Binary("hh" + r, TSFileConfig.STRING_CHARSET);
}
BitMap[] bitMaps = new BitMap[dataTypes.length];
for (int i = 0; i < dataTypes.length; i++) {
if (bitMaps[i] == null) {
bitMaps[i] = new BitMap(times.length);
}
bitMaps[i].mark(i % times.length);
}
return new InsertTabletNode(
new PlanNodeId(""),
new PartialPath(devicePath),
false,
new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
dataTypes,
new MeasurementSchema[] {
new MeasurementSchema("s1", TSDataType.DOUBLE),
new MeasurementSchema("s2", TSDataType.FLOAT),
new MeasurementSchema("s3", TSDataType.INT64),
new MeasurementSchema("s4", TSDataType.INT32),
new MeasurementSchema("s5", TSDataType.BOOLEAN),
new MeasurementSchema("s6", TSDataType.TEXT)
},
times,
bitMaps,
columns,
times.length);
}
private DeleteDataNode getDeleteDataNode(String devicePath) throws IllegalPathException {
return new DeleteDataNode(
new PlanNodeId(""),
Collections.singletonList(new PartialPath(devicePath)),
Long.MIN_VALUE,
Long.MAX_VALUE);
}
}