blob: 0a09c964873c98f242ade5d39f3a0ba6a479b23f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.storageengine.dataregion.wal.node;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.CheckpointManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.checkpoint.MemTableInfo;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.MemTablePinException;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALInsertNodeCache;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class WALEntryHandlerTest {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final String identifier1 = String.valueOf(Integer.MAX_VALUE);
private static final String identifier2 = String.valueOf(Integer.MAX_VALUE - 1);
private static final String logDirectory1 =
TestConstant.BASE_OUTPUT_PATH.concat("wal-test" + identifier1);
private static final String logDirectory2 =
TestConstant.BASE_OUTPUT_PATH.concat("wal-test" + identifier2);
private static final String databasePath = "root.test_sg";
private static final String devicePath = databasePath + ".test_d";
private static final String dataRegionId = "1";
private WALMode prevMode;
private WALNode walNode1;
private WALNode walNode2;
@Before
public void setUp() throws Exception {
EnvironmentUtils.cleanDir(logDirectory1);
EnvironmentUtils.cleanDir(logDirectory2);
prevMode = config.getWalMode();
config.setWalMode(WALMode.SYNC);
walNode1 = new WALNode(identifier1, logDirectory1);
walNode2 = new WALNode(identifier2, logDirectory2);
}
@After
public void tearDown() throws Exception {
walNode1.close();
walNode2.close();
config.setWalMode(prevMode);
EnvironmentUtils.cleanDir(logDirectory1);
EnvironmentUtils.cleanDir(logDirectory2);
WALInsertNodeCache.getInstance(1).clear();
}
@Test(expected = MemTablePinException.class)
public void pinDeletedMemTable() throws Exception {
IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
walNode1.onMemTableCreated(memTable, logDirectory1 + "/" + "fake.tsfile");
WALFlushListener flushListener =
walNode1.log(
memTable.getMemTableId(), getInsertRowNode(devicePath, System.currentTimeMillis()));
walNode1.onMemTableFlushed(memTable);
Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
// pin flushed memTable
WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
}
@Test
public void pinMemTable() throws Exception {
IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
walNode1.onMemTableCreated(memTable, logDirectory1 + "/" + "fake.tsfile");
InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
node1.setSearchIndex(1);
WALFlushListener flushListener = walNode1.log(memTable.getMemTableId(), node1);
// pin memTable
WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
// roll wal file
walNode1.rollWALFile();
InsertRowNode node2 = getInsertRowNode(devicePath, System.currentTimeMillis());
node2.setSearchIndex(2);
walNode1.log(memTable.getMemTableId(), node2);
walNode1.onMemTableFlushed(memTable);
walNode1.rollWALFile();
// find node1
ConsensusReqReader.ReqIterator itr = walNode1.getReqIterator(1);
assertTrue(itr.hasNext());
assertEquals(
node1,
WALEntry.deserializeForConsensus(itr.next().getRequests().get(0).serializeToByteBuffer()));
// try to delete flushed but pinned memTable
walNode1.deleteOutdatedFiles();
// try to find node1
itr = walNode1.getReqIterator(1);
assertTrue(itr.hasNext());
assertEquals(
node1,
WALEntry.deserializeForConsensus(itr.next().getRequests().get(0).serializeToByteBuffer()));
}
@Test(expected = MemTablePinException.class)
public void unpinDeletedMemTable() throws Exception {
IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
walNode1.onMemTableCreated(memTable, logDirectory1 + "/" + "fake.tsfile");
WALFlushListener flushListener =
walNode1.log(
memTable.getMemTableId(), getInsertRowNode(devicePath, System.currentTimeMillis()));
walNode1.onMemTableFlushed(memTable);
// pin flushed memTable
WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.unpinMemTable();
}
@Test
public void unpinFlushedMemTable() throws Exception {
IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
walNode1.onMemTableCreated(memTable, logDirectory1 + "/" + "fake.tsfile");
WALFlushListener flushListener =
walNode1.log(
memTable.getMemTableId(), getInsertRowNode(devicePath, System.currentTimeMillis()));
WALEntryHandler handler = flushListener.getWalEntryHandler();
// pin twice
handler.pinMemTable();
handler.pinMemTable();
walNode1.onMemTableFlushed(memTable);
Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
// unpin 1
CheckpointManager checkpointManager = walNode1.getCheckpointManager();
handler.unpinMemTable();
MemTableInfo oldestMemTableInfo = checkpointManager.getOldestUnpinnedMemTableInfo();
assertNull(oldestMemTableInfo);
// unpin 2
handler.unpinMemTable();
assertNull(checkpointManager.getOldestUnpinnedMemTableInfo());
}
@Test
public void unpinMemTable() throws Exception {
IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
walNode1.onMemTableCreated(memTable, logDirectory1 + "/" + "fake.tsfile");
InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
node1.setSearchIndex(1);
WALFlushListener flushListener = walNode1.log(memTable.getMemTableId(), node1);
// pin memTable
WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
walNode1.onMemTableFlushed(memTable);
// roll wal file
walNode1.rollWALFile();
walNode1.rollWALFile();
// find node1
ConsensusReqReader.ReqIterator itr = walNode1.getReqIterator(1);
assertTrue(itr.hasNext());
assertEquals(
node1,
WALEntry.deserializeForConsensus(itr.next().getRequests().get(0).serializeToByteBuffer()));
// unpin flushed memTable
handler.unpinMemTable();
// try to delete flushed but pinned memTable
walNode1.deleteOutdatedFiles();
// try to find node1
itr = walNode1.getReqIterator(1);
assertFalse(itr.hasNext());
}
@Test
public void getUnFlushedValue() throws Exception {
IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
walNode1.onMemTableCreated(memTable, logDirectory1 + "/" + "fake.tsfile");
InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
node1.setSearchIndex(1);
WALFlushListener flushListener = walNode1.log(memTable.getMemTableId(), node1);
// pin memTable
WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
walNode1.onMemTableFlushed(memTable);
assertEquals(node1, handler.getInsertNode());
}
@Test
public void getFlushedValue() throws Exception {
IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
walNode1.onMemTableCreated(memTable, logDirectory1 + "/" + "fake.tsfile");
InsertRowNode node1 = getInsertRowNode(devicePath, System.currentTimeMillis());
node1.setSearchIndex(1);
WALFlushListener flushListener = walNode1.log(memTable.getMemTableId(), node1);
// pin memTable
WALEntryHandler handler = flushListener.getWalEntryHandler();
handler.pinMemTable();
walNode1.onMemTableFlushed(memTable);
// wait until wal flushed
Awaitility.await().until(() -> walNode1.isAllWALEntriesConsumed());
assertEquals(node1, handler.getInsertNode());
}
@Test
public void testConcurrentGetValue() throws Exception {
int threadsNum = 10;
ExecutorService executorService = Executors.newFixedThreadPool(threadsNum);
List<Future<Void>> futures = new ArrayList<>();
for (int i = 0; i < threadsNum; ++i) {
WALNode walNode = i % 2 == 0 ? walNode1 : walNode2;
String logDirectory = i % 2 == 0 ? logDirectory1 : logDirectory2;
Callable<Void> writeTask =
() -> {
IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
walNode.onMemTableCreated(memTable, logDirectory + "/" + "fake.tsfile");
List<WALFlushListener> walFlushListeners = new ArrayList<>();
List<InsertRowNode> expectedInsertRowNodes = new ArrayList<>();
try {
for (int j = 0; j < 1_000; ++j) {
long memTableId = memTable.getMemTableId();
InsertRowNode node =
getInsertRowNode(devicePath + memTableId, System.currentTimeMillis());
expectedInsertRowNodes.add(node);
WALFlushListener walFlushListener = walNode.log(memTableId, node);
walFlushListeners.add(walFlushListener);
}
} catch (IllegalPathException e) {
fail();
}
// wait until wal flushed
Awaitility.await().until(walNode::isAllWALEntriesConsumed);
walFlushListeners.get(0).getWalEntryHandler().pinMemTable();
walNode.onMemTableFlushed(memTable);
for (int j = 0; j < expectedInsertRowNodes.size(); ++j) {
InsertRowNode expect = expectedInsertRowNodes.get(j);
InsertRowNode actual =
(InsertRowNode) walFlushListeners.get(j).getWalEntryHandler().getInsertNode();
assertEquals(expect, actual);
}
walFlushListeners.get(0).getWalEntryHandler().unpinMemTable();
return null;
};
Future<Void> future = executorService.submit(writeTask);
futures.add(future);
}
// wait until all write tasks are done
for (Future<Void> future : futures) {
future.get();
}
executorService.shutdown();
}
private InsertRowNode getInsertRowNode(String devicePath, long time) throws IllegalPathException {
TSDataType[] dataTypes =
new TSDataType[] {
TSDataType.DOUBLE,
TSDataType.FLOAT,
TSDataType.INT64,
TSDataType.INT32,
TSDataType.BOOLEAN,
TSDataType.TEXT
};
Object[] columns = new Object[6];
columns[0] = 1.0d;
columns[1] = 2f;
columns[2] = 10000L;
columns[3] = 100;
columns[4] = false;
columns[5] = new Binary("hh" + 0, TSFileConfig.STRING_CHARSET);
InsertRowNode node =
new InsertRowNode(
new PlanNodeId(""),
new PartialPath(devicePath),
false,
new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
dataTypes,
time,
columns,
false);
MeasurementSchema[] schemas = new MeasurementSchema[6];
for (int i = 0; i < 6; i++) {
schemas[i] = new MeasurementSchema("s" + (i + 1), dataTypes[i]);
}
node.setMeasurementSchemas(schemas);
return node;
}
}