blob: 15d396f8d467e908bbb32e732173e0844c445b8d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.it.regionmigration;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.utils.KillPoint.KillNode;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.itbase.exception.InconsistentDataException;
import org.apache.iotdb.metrics.utils.SystemType;
import org.apache.thrift.TException;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class IoTDBRegionMigrateReliabilityITFramework {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBRegionMigrateReliabilityITFramework.class);
private static final String INSERTION1 =
"INSERT INTO root.sg.d1(timestamp,speed,temperature) values(100, 1, 2)";
private static final String INSERTION2 =
"INSERT INTO root.sg.d1(timestamp,speed,temperature) values(101, 3, 4)";
private static final String FLUSH_COMMAND = "flush";
private static final String SHOW_REGIONS = "show regions";
private static final String SHOW_DATANODES = "show datanodes";
private static final String COUNT_TIMESERIES = "select count(*) from root.sg.**";
private static final String REGION_MIGRATE_COMMAND_FORMAT = "migrate region %d from %d to %d";
private static final String CONFIGURATION_FILE_NAME = "configuration.dat";
ExecutorService executorService = IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT");
public static Consumer<KillPointContext> actionOfKillNode =
context -> {
context.getNodeWrapper().stopForcibly();
LOGGER.info("Node {} stopped.", context.getNodeWrapper().getId());
Assert.assertFalse(context.getNodeWrapper().isAlive());
if (context.getNodeWrapper() instanceof ConfigNodeWrapper) {
context.getNodeWrapper().start();
LOGGER.info("Node {} restarted.", context.getNodeWrapper().getId());
Assert.assertTrue(context.getNodeWrapper().isAlive());
}
};
public static Consumer<KillPointContext> actionOfRestartCluster =
context -> {
context.getEnv().getNodeWrapperList().parallelStream()
.forEach(AbstractNodeWrapper::stopForcibly);
LOGGER.info("Cluster has been stopped");
context.getEnv().getNodeWrapperList().parallelStream().forEach(AbstractNodeWrapper::start);
LOGGER.info("Cluster has been restarted");
};
@Before
public void setUp() throws Exception {
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
}
@After
public void tearDown() throws InterruptedException {
EnvFactory.getEnv().cleanClusterEnvironment();
}
public void successTest(
final int dataReplicateFactor,
final int schemaReplicationFactor,
final int configNodeNum,
final int dataNodeNum,
KeySetView<String, Boolean> killConfigNodeKeywords,
KeySetView<String, Boolean> killDataNodeKeywords,
KillNode killNode)
throws Exception {
generalTestWithAllOptions(
dataReplicateFactor,
schemaReplicationFactor,
configNodeNum,
dataNodeNum,
killConfigNodeKeywords,
killDataNodeKeywords,
actionOfKillNode,
true,
killNode);
}
public void failTest(
final int dataReplicateFactor,
final int schemaReplicationFactor,
final int configNodeNum,
final int dataNodeNum,
KeySetView<String, Boolean> killConfigNodeKeywords,
KeySetView<String, Boolean> killDataNodeKeywords,
KillNode killNode)
throws Exception {
generalTestWithAllOptions(
dataReplicateFactor,
schemaReplicationFactor,
configNodeNum,
dataNodeNum,
killConfigNodeKeywords,
killDataNodeKeywords,
actionOfKillNode,
false,
killNode);
}
public void killClusterTest(
KeySetView<String, Boolean> configNodeKeywords, boolean expectMigrateSuccess)
throws Exception {
generalTestWithAllOptions(
2,
3,
3,
3,
configNodeKeywords,
noKillPoints(),
actionOfRestartCluster,
expectMigrateSuccess,
KillNode.ALL_NODES);
}
// region general test
public void generalTestWithAllOptions(
final int dataReplicateFactor,
final int schemaReplicationFactor,
final int configNodeNum,
final int dataNodeNum,
KeySetView<String, Boolean> configNodeKeywords,
KeySetView<String, Boolean> dataNodeKeywords,
Consumer<KillPointContext> actionWhenDetectKeyWords,
final boolean expectMigrateSuccess,
KillNode killNode)
throws Exception {
// prepare env
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setDataReplicationFactor(dataReplicateFactor)
.setSchemaReplicationFactor(schemaReplicationFactor);
EnvFactory.getEnv().registerConfigNodeKillPoints(new ArrayList<>(configNodeKeywords));
EnvFactory.getEnv().registerDataNodeKillPoints(new ArrayList<>(dataNodeKeywords));
EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum);
try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement();
SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
statement.execute(INSERTION1);
ResultSet result = statement.executeQuery(SHOW_REGIONS);
Map<Integer, Set<Integer>> regionMap = getRegionMap(result);
result = statement.executeQuery(SHOW_DATANODES);
Set<Integer> allDataNodeId = new HashSet<>();
while (result.next()) {
allDataNodeId.add(result.getInt(ColumnHeaderConstant.NODE_ID));
}
final int selectedRegion = selectRegion(regionMap);
final int originalDataNode = selectOriginalDataNode(regionMap, selectedRegion);
final int destDataNode = selectDestDataNode(allDataNodeId, regionMap, selectedRegion);
checkRegionFileExist(originalDataNode);
checkPeersExist(regionMap.get(selectedRegion), originalDataNode, selectedRegion);
try {
awaitUntilFlush(statement, originalDataNode);
} catch (ConditionTimeoutException e) {
LOGGER.error("Flush timeout:", e);
Assert.fail();
}
// set kill points
if (killNode == KillNode.ORIGINAL_DATANODE) {
setDataNodeKillPoints(
Collections.singletonList(
EnvFactory.getEnv().dataNodeIdToWrapper(originalDataNode).get()),
dataNodeKeywords,
actionWhenDetectKeyWords);
} else if (killNode == KillNode.DESTINATION_DATANODE) {
setDataNodeKillPoints(
Collections.singletonList(EnvFactory.getEnv().dataNodeIdToWrapper(destDataNode).get()),
dataNodeKeywords,
actionWhenDetectKeyWords);
} else {
setConfigNodeKillPoints(configNodeKeywords, actionWhenDetectKeyWords);
setDataNodeKillPoints(
EnvFactory.getEnv().getDataNodeWrapperList(),
dataNodeKeywords,
actionWhenDetectKeyWords);
}
LOGGER.info("DataNode set before migration: {}", regionMap.get(selectedRegion));
System.out.println(
"originalDataNode: "
+ EnvFactory.getEnv().dataNodeIdToWrapper(originalDataNode).get().getNodePath());
System.out.println(
"destDataNode: "
+ EnvFactory.getEnv().dataNodeIdToWrapper(destDataNode).get().getNodePath());
// region migration start
statement.execute(buildRegionMigrateCommand(selectedRegion, originalDataNode, destDataNode));
boolean success = false;
try {
awaitUntilSuccess(client, selectedRegion, originalDataNode, destDataNode);
success = true;
} catch (ConditionTimeoutException e) {
if (expectMigrateSuccess) {
LOGGER.error("Region migrate failed", e);
Assert.fail();
}
}
if (!expectMigrateSuccess && success) {
LOGGER.error("Region migrate succeeded unexpectedly");
Assert.fail();
}
// make sure all kill points have been triggered
checkKillPointsAllTriggered(configNodeKeywords);
checkKillPointsAllTriggered(dataNodeKeywords);
// check the remaining file
if (success) {
checkRegionFileClearIfNodeAlive(originalDataNode);
checkRegionFileExistIfNodeAlive(destDataNode);
checkPeersClearIfNodeAlive(allDataNodeId, originalDataNode, selectedRegion);
checkClusterStillWritable();
} else {
checkRegionFileClearIfNodeAlive(destDataNode);
checkRegionFileExistIfNodeAlive(originalDataNode);
checkPeersClearIfNodeAlive(allDataNodeId, destDataNode, selectedRegion);
}
} catch (InconsistentDataException ignore) {
}
LOGGER.info("test pass");
}
private void restartDataNodes(List<DataNodeWrapper> dataNodeWrappers) {
dataNodeWrappers.parallelStream()
.forEach(
nodeWrapper -> {
nodeWrapper.stop();
Awaitility.await()
.atMost(1, TimeUnit.MINUTES)
.pollDelay(2, TimeUnit.SECONDS)
.until(() -> !nodeWrapper.isAlive());
LOGGER.info("Node {} stopped.", nodeWrapper.getId());
nodeWrapper.start();
Awaitility.await()
.atMost(1, TimeUnit.MINUTES)
.pollDelay(2, TimeUnit.SECONDS)
.until(nodeWrapper::isAlive);
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
LOGGER.info("Node {} restarted.", nodeWrapper.getId());
});
}
private void setConfigNodeKillPoints(
KeySetView<String, Boolean> killConfigNodeKeywords, Consumer<KillPointContext> action) {
EnvFactory.getEnv()
.getConfigNodeWrapperList()
.forEach(
configNodeWrapper ->
executorService.submit(
() ->
doActionWhenDetectKeywords(
configNodeWrapper, killConfigNodeKeywords, action)));
}
private void setDataNodeKillPoints(
List<DataNodeWrapper> dataNodeWrappers,
KeySetView<String, Boolean> killDataNodeKeywords,
Consumer<KillPointContext> action) {
dataNodeWrappers.forEach(
dataNodeWrapper ->
executorService.submit(
() -> doActionWhenDetectKeywords(dataNodeWrapper, killDataNodeKeywords, action)));
}
/**
* Monitor the node's log and kill it when detect specific log.
*
* @param nodeWrapper Easy to understand
* @param keywords When detect these keywords in node's log, stop the node forcibly
*/
private static void doActionWhenDetectKeywords(
AbstractNodeWrapper nodeWrapper,
KeySetView<String, Boolean> keywords,
Consumer<KillPointContext> action) {
if (keywords.isEmpty()) {
return;
}
final String logFileName;
if (nodeWrapper instanceof ConfigNodeWrapper) {
logFileName = "log_confignode_all.log";
} else {
logFileName = "log_datanode_all.log";
}
SystemType type = SystemType.getSystemType();
ProcessBuilder builder;
if (type == SystemType.LINUX || type == SystemType.MAC) {
builder =
new ProcessBuilder(
"tail",
"-f",
nodeWrapper.getNodePath() + File.separator + "logs" + File.separator + logFileName);
} else if (type == SystemType.WINDOWS) {
builder =
new ProcessBuilder(
"powershell",
"-Command",
"Get-Content "
+ nodeWrapper.getNodePath()
+ File.separator
+ "logs"
+ File.separator
+ logFileName
+ " -Wait");
} else {
throw new UnsupportedOperationException("Unsupported system type " + type);
}
builder.redirectErrorStream(true);
try {
Process process = builder.start();
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(process.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
// if trigger more than one keyword at a same time, test code may have mistakes
Assert.assertTrue(
line,
keywords.stream().map(KillPoint::addKillPointPrefix).filter(line::contains).count()
<= 1);
String finalLine = line;
Optional<String> detectedKeyword =
keywords.stream()
.filter(keyword -> finalLine.contains(KillPoint.addKillPointPrefix(keyword)))
.findAny();
if (detectedKeyword.isPresent()) {
// each keyword only trigger once
keywords.remove(detectedKeyword.get());
action.accept(new KillPointContext(nodeWrapper, (AbstractEnv) EnvFactory.getEnv()));
LOGGER.info("Kill point triggered: {}", detectedKeyword.get());
}
if (keywords.isEmpty()) {
break;
}
}
} catch (AssertionError e) {
LOGGER.error("gg", e);
throw e;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
void checkKillPointsAllTriggered(KeySetView<String, Boolean> killPoints) {
if (!killPoints.isEmpty()) {
killPoints.forEach(killPoint -> LOGGER.error("Kill point {} not triggered", killPoint));
Assert.fail("Some kill points was not triggered");
}
}
private static String buildRegionMigrateCommand(int who, int from, int to) {
String result = String.format(REGION_MIGRATE_COMMAND_FORMAT, who, from, to);
LOGGER.info(result);
return result;
}
private static Map<Integer, Set<Integer>> getRegionMap(ResultSet showRegionsResult)
throws SQLException {
Map<Integer, Set<Integer>> regionMap = new HashMap<>();
while (showRegionsResult.next()) {
if (String.valueOf(TConsensusGroupType.DataRegion)
.equals(showRegionsResult.getString(ColumnHeaderConstant.TYPE))) {
int regionId = showRegionsResult.getInt(ColumnHeaderConstant.REGION_ID);
int dataNodeId = showRegionsResult.getInt(ColumnHeaderConstant.DATA_NODE_ID);
regionMap.computeIfAbsent(regionId, id -> new HashSet<>()).add(dataNodeId);
}
}
return regionMap;
}
private static Map<Integer, Set<Integer>> getRegionMap(List<TRegionInfo> regionInfoList) {
Map<Integer, Set<Integer>> regionMap = new HashMap<>();
regionInfoList.forEach(
regionInfo -> {
int regionId = regionInfo.getConsensusGroupId().getId();
regionMap
.computeIfAbsent(regionId, regionId1 -> new HashSet<>())
.add(regionInfo.getDataNodeId());
});
return regionMap;
}
private static int selectRegion(Map<Integer, Set<Integer>> regionMap) {
return regionMap.keySet().stream().findAny().orElseThrow(() -> new RuntimeException("gg"));
}
private static int selectOriginalDataNode(
Map<Integer, Set<Integer>> regionMap, int selectedRegion) {
return regionMap.get(selectedRegion).stream()
.findAny()
.orElseThrow(() -> new RuntimeException("cannot find original DataNode"));
}
private static int selectDestDataNode(
Set<Integer> dataNodeSet, Map<Integer, Set<Integer>> regionMap, int selectedRegion) {
return dataNodeSet.stream()
.filter(dataNodeId -> !regionMap.get(selectedRegion).contains(dataNodeId))
.findAny()
.orElseThrow(() -> new RuntimeException("cannot find dest DataNode"));
}
private static void awaitUntilFlush(Statement statement, int originalDataNode) {
long startTime = System.currentTimeMillis();
File sequence = new File(buildDataPath(originalDataNode, true));
File unsequence = new File(buildDataPath(originalDataNode, false));
Awaitility.await()
.atMost(1, TimeUnit.MINUTES)
.pollDelay(2, TimeUnit.SECONDS)
.until(
() -> {
statement.execute(FLUSH_COMMAND);
int fileNum = 0;
if (sequence.exists() && sequence.listFiles() != null) {
fileNum += Objects.requireNonNull(sequence.listFiles()).length;
}
if (unsequence.exists() && unsequence.listFiles() != null) {
fileNum += Objects.requireNonNull(unsequence.listFiles()).length;
}
return fileNum > 0;
});
LOGGER.info("DataNode {} has been flushed", originalDataNode);
LOGGER.info("Flush cost time: {}ms", System.currentTimeMillis() - startTime);
}
private static void awaitUntilSuccess(
SyncConfigNodeIServiceClient client,
int selectedRegion,
int originalDataNode,
int destDataNode) {
AtomicReference<Set<Integer>> lastTimeDataNodes = new AtomicReference<>();
AtomicReference<Exception> lastException = new AtomicReference<>();
AtomicReference<SyncConfigNodeIServiceClient> clientRef = new AtomicReference<>(client);
try {
Awaitility.await()
.atMost(1, TimeUnit.MINUTES)
.pollDelay(2, TimeUnit.SECONDS)
.until(
() -> {
try {
TShowRegionResp resp = clientRef.get().showRegion(new TShowRegionReq());
Map<Integer, Set<Integer>> newRegionMap = getRegionMap(resp.getRegionInfoList());
Set<Integer> dataNodes = newRegionMap.get(selectedRegion);
lastTimeDataNodes.set(dataNodes);
return !dataNodes.contains(originalDataNode) && dataNodes.contains(destDataNode);
} catch (TException e) {
clientRef.set(
(SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection());
lastException.set(e);
return false;
} catch (Exception e) {
// Any exception can be ignored
lastException.set(e);
return false;
}
});
} catch (ConditionTimeoutException e) {
if (lastTimeDataNodes.get() == null) {
LOGGER.error(
"maybe show regions fail, lastTimeDataNodes is null, last Exception:",
lastException.get());
throw e;
}
String actualSetStr = lastTimeDataNodes.get().toString();
lastTimeDataNodes.get().remove(originalDataNode);
lastTimeDataNodes.get().add(destDataNode);
String expectSetStr = lastTimeDataNodes.toString();
LOGGER.error("DataNode Set {} is unexpected, expect {}", actualSetStr, expectSetStr);
if (lastException.get() == null) {
LOGGER.info("No exception during awaiting");
} else {
LOGGER.error("Last exception during awaiting:", lastException.get());
}
throw e;
}
LOGGER.info("DataNode set has been successfully changed to {}", lastTimeDataNodes.get());
}
private static void checkRegionFileExistIfNodeAlive(int dataNode) {
if (EnvFactory.getEnv().dataNodeIdToWrapper(dataNode).get().isAlive()) {
checkRegionFileExist(dataNode);
}
}
private static void checkRegionFileExist(int dataNode) {
File originalRegionDir = new File(buildRegionDirPath(dataNode));
Assert.assertTrue(originalRegionDir.isDirectory());
Assert.assertNotEquals(0, Objects.requireNonNull(originalRegionDir.listFiles()).length);
}
private static void checkRegionFileClearIfNodeAlive(int dataNode) {
if (EnvFactory.getEnv().dataNodeIdToWrapper(dataNode).get().isAlive()) {
checkRegionFileClear(dataNode);
}
}
/** Check whether the original DataNode's region file has been deleted. */
private static void checkRegionFileClear(int dataNode) {
File originalRegionDir = new File(buildRegionDirPath(dataNode));
Assert.assertTrue(originalRegionDir.isDirectory());
Assert.assertEquals(0, Objects.requireNonNull(originalRegionDir.listFiles()).length);
LOGGER.info("Original DataNode {} region file clear", dataNode);
}
private static void checkPeersExistIfNodeAlive(
Set<Integer> dataNodes, int originalDataNode, int regionId) {
dataNodes.forEach(
targetDataNode -> checkPeerExistIfNodeAlive(targetDataNode, originalDataNode, regionId));
}
private static void checkPeersExist(Set<Integer> dataNodes, int originalDataNode, int regionId) {
dataNodes.forEach(targetDataNode -> checkPeerExist(targetDataNode, originalDataNode, regionId));
}
private static void checkPeerExistIfNodeAlive(
int checkTargetDataNode, int originalDataNode, int regionId) {
if (EnvFactory.getEnv().dataNodeIdToWrapper(checkTargetDataNode).get().isAlive()) {
checkPeerExist(checkTargetDataNode, originalDataNode, regionId);
}
}
private static void checkPeerExist(int checkTargetDataNode, int originalDataNode, int regionId) {
File expectExistedFile =
new File(buildConfigurationDataFilePath(checkTargetDataNode, originalDataNode, regionId));
Assert.assertTrue(
"configuration file should exist, but it didn't: " + expectExistedFile.getPath(),
expectExistedFile.exists());
}
private static void checkPeersClearIfNodeAlive(
Set<Integer> dataNodes, int originalDataNode, int regionId) {
dataNodes.stream()
.filter(dataNode -> dataNode != originalDataNode)
.forEach(
targetDataNode ->
checkPeerClearIfNodeAlive(targetDataNode, originalDataNode, regionId));
}
private static void checkPeersClear(Set<Integer> dataNodes, int originalDataNode, int regionId) {
dataNodes.stream()
.filter(dataNode -> dataNode != originalDataNode)
.forEach(targetDataNode -> checkPeerClear(targetDataNode, originalDataNode, regionId));
LOGGER.info("Peer clear");
}
private static void checkPeerClearIfNodeAlive(
int checkTargetDataNode, int originalDataNode, int regionId) {
if (EnvFactory.getEnv().dataNodeIdToWrapper(checkTargetDataNode).get().isAlive()) {
checkPeerClear(checkTargetDataNode, originalDataNode, regionId);
}
}
private static void checkPeerClear(int checkTargetDataNode, int originalDataNode, int regionId) {
File expectDeletedFile =
new File(buildConfigurationDataFilePath(checkTargetDataNode, originalDataNode, regionId));
Assert.assertFalse(
"configuration file should be deleted, but it didn't: " + expectDeletedFile.getPath(),
expectDeletedFile.exists());
LOGGER.info("configuration file has been deleted: {}", expectDeletedFile.getPath());
}
private void checkClusterStillWritable() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(INSERTION2);
ResultSet resultSet = statement.executeQuery(COUNT_TIMESERIES);
resultSet.next();
Assert.assertEquals(2, resultSet.getLong(1));
Assert.assertEquals(2, resultSet.getLong(2));
LOGGER.info("Region group is still writable");
} catch (SQLException e) {
LOGGER.error("Something wrong", e);
Assert.fail("Something wrong");
}
}
private static String buildRegionDirPath(int dataNode) {
String nodePath = EnvFactory.getEnv().dataNodeIdToWrapper(dataNode).get().getNodePath();
return nodePath
+ File.separator
+ IoTDBConstant.DATA_FOLDER_NAME
+ File.separator
+ "datanode"
+ File.separator
+ IoTDBConstant.CONSENSUS_FOLDER_NAME
+ File.separator
+ IoTDBConstant.DATA_REGION_FOLDER_NAME;
}
private static String buildDataPath(int dataNode, boolean isSequence) {
String nodePath = EnvFactory.getEnv().dataNodeIdToWrapper(dataNode).get().getNodePath();
return nodePath
+ File.separator
+ IoTDBConstant.DATA_FOLDER_NAME
+ File.separator
+ "datanode"
+ File.separator
+ IoTDBConstant.DATA_FOLDER_NAME
+ File.separator
+ (isSequence ? IoTDBConstant.SEQUENCE_FOLDER_NAME : IoTDBConstant.UNSEQUENCE_FOLDER_NAME);
}
private static String buildConfigurationDataFilePath(
int localDataNodeId, int remoteDataNodeId, int regionId) {
String configurationDatDirName =
buildRegionDirPath(localDataNodeId) + File.separator + "1_" + regionId;
String expectDeletedFileName =
IoTConsensusServerImpl.generateConfigurationDatFileName(
remoteDataNodeId, CONFIGURATION_FILE_NAME);
return configurationDatDirName + File.separator + expectDeletedFileName;
}
protected static KeySetView<String, Boolean> noKillPoints() {
return ConcurrentHashMap.newKeySet();
}
@SafeVarargs
protected static <T extends Enum<?>> KeySetView<String, Boolean> buildSet(T... keywords) {
KeySetView<String, Boolean> result = ConcurrentHashMap.newKeySet();
result.addAll(
Arrays.stream(keywords).map(KillPoint::enumToString).collect(Collectors.toList()));
return result;
}
}