Create IoTDBClusterNodeRemoveIT.java
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeRemoveIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeRemoveIT.java new file mode 100644 index 0000000..9a4865b --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeRemoveIT.java
@@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.cluster; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; +import org.apache.iotdb.commons.client.exception.ClientManagerException; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.commons.cluster.RegionStatus; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; +import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq; +import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.thrift.TException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBClusterNodeRemoveIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBClusterNodeRemoveIT.class); + + private static final String sbinPath = EnvFactory.getEnv().getSbinPath(); + private static final String libPath = EnvFactory.getEnv().getLibPath(); + + private static final String testRegionGroupExtensionPolicy = "AUTO"; + private static final String testSchemaRegionConsensusProtocolClass = + ConsensusFactory.RATIS_CONSENSUS; + private static final String testDataRegionConsensusProtocolClass = ConsensusFactory.IOT_CONSENSUS; + private static final int testReplicationFactor = 3; + + private static final double testSchemaRegionPerDataNode = 2.0; + private static final double testDataRegionPerDataNode = 2.0; + + private static final String database = "root.db"; + private static final long testTimePartitionInterval = 604800000; + private static final int testMinSchemaRegionGroupNum = 2; + private static final int testMinDataRegionGroupNum = 2; + private static final int testDataNodeNum = 3; + + private static final int removeDataNodeId = 2; + private static final int correctScriptExitValue = 1; + + @Before + public void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setSchemaRegionConsensusProtocolClass(testSchemaRegionConsensusProtocolClass) + .setDataRegionConsensusProtocolClass(testDataRegionConsensusProtocolClass) + .setSchemaReplicationFactor(testReplicationFactor) + .setDataReplicationFactor(testReplicationFactor) + .setSchemaRegionPerDataNode(testSchemaRegionPerDataNode) + .setDataRegionPerDataNode(testDataRegionPerDataNode) + .setSchemaRegionGroupExtensionPolicy(testRegionGroupExtensionPolicy) + .setDataRegionGroupExtensionPolicy(testRegionGroupExtensionPolicy) + .setTimePartitionInterval(testTimePartitionInterval); + // Init 1C3D environment + EnvFactory.getEnv().initClusterEnvironment(1, testDataNodeNum); + } + + @After + public void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testClusterNodeRemove() + throws ClientManagerException, IOException, InterruptedException, TException, + IllegalPathException { + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + TSStatus status = + client.setDatabase( + new TDatabaseSchema(database) + .setMinSchemaRegionGroupNum(testMinSchemaRegionGroupNum) + .setMinDataRegionGroupNum(testMinDataRegionGroupNum)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + // Create 2 SchemaPartitions to extend 2 SchemaRegionGroups + for (int i = 0; i < testMinSchemaRegionGroupNum; i++) { + String device = database + ".d" + i + ".s"; + TSchemaPartitionReq schemaPartitionReq = new TSchemaPartitionReq(); + TSchemaPartitionTableResp schemaPartitionTableResp; + Map<String, Map<TSeriesPartitionSlot, TConsensusGroupId>> schemaPartitionTable; + ByteBuffer buffer = ConfigNodeTestUtils.generatePatternTreeBuffer(new String[] {device}); + schemaPartitionReq.setPathPatternTree(buffer); + schemaPartitionTableResp = client.getOrCreateSchemaPartitionTable(schemaPartitionReq); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + schemaPartitionTableResp.getStatus().getCode()); + Assert.assertEquals(1, schemaPartitionTableResp.getSchemaPartitionTableSize()); + schemaPartitionTable = schemaPartitionTableResp.getSchemaPartitionTable(); + Assert.assertEquals(1, schemaPartitionTable.get(database).size()); + } + + // Create 2 DataPartitions to extend 2 DataRegionGroups + for (int i = 0; i < testMinDataRegionGroupNum; i++) { + Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = + ConfigNodeTestUtils.constructPartitionSlotsMap( + database, i, i + 1, i, i + 1, testTimePartitionInterval); + TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap); + TDataPartitionTableResp dataPartitionTableResp = null; + for (int retry = 0; retry < 5; retry++) { + // Build new Client since it's unstable in Win8 environment + try (SyncConfigNodeIServiceClient configNodeClient = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + dataPartitionTableResp = + configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq); + if (dataPartitionTableResp != null) { + break; + } + } catch (Exception e) { + // Retry sometimes in order to avoid request timeout + LOGGER.error(e.getMessage()); + TimeUnit.SECONDS.sleep(1); + } + } + Assert.assertNotNull(dataPartitionTableResp); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + dataPartitionTableResp.getStatus().getCode()); + Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable()); + ConfigNodeTestUtils.checkDataPartitionTable( + database, + i, + i + 1, + i, + i + 1, + testTimePartitionInterval, + dataPartitionTableResp.getDataPartitionTable()); + } + + // Register 1 DataNode + EnvFactory.getEnv().registerNewDataNode(true); + + // Remove 1 DataNode + String os = System.getProperty("os.name").toLowerCase(); + if (os.startsWith("windows")) { + removeOnWindows(); + } else { + removeOnUnix(); + } + + // The number of DataNodes should be 3 + boolean removeSuccess = false; + for (int retry = 0; retry < 10; retry++) { + TShowClusterResp showClusterResp = client.showCluster(); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), showClusterResp.getStatus().getCode()); + if (showClusterResp.getDataNodeListSize() == 3) { + removeSuccess = true; + break; + } + TimeUnit.SECONDS.sleep(1); + } + Assert.assertTrue(removeSuccess); + + // The Regions should be redistributed + Map<Integer, AtomicInteger> schemaRegionCounter = new HashMap<>(); + Map<Integer, AtomicInteger> dataRegionCounter = new HashMap<>(); + TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq()); + showRegionResp + .getRegionInfoList() + .forEach( + regionInfo -> { + Assert.assertEquals(RegionStatus.Running.getStatus(), regionInfo.getStatus()); + switch (regionInfo.getConsensusGroupId().getType()) { + case SchemaRegion: + schemaRegionCounter + .computeIfAbsent(regionInfo.getDataNodeId(), empty -> new AtomicInteger(0)) + .incrementAndGet(); + break; + case DataRegion: + dataRegionCounter + .computeIfAbsent(regionInfo.getDataNodeId(), empty -> new AtomicInteger(0)) + .incrementAndGet(); + break; + } + }); + schemaRegionCounter.forEach( + (dataNodeId, counter) -> Assert.assertEquals(testMinSchemaRegionGroupNum, counter.get())); + dataRegionCounter.forEach( + (dataNodeId, counter) -> Assert.assertEquals(testMinDataRegionGroupNum, counter.get())); + } + } + + private void removeOnWindows() throws IOException, InterruptedException { + ProcessBuilder builder = + new ProcessBuilder( + "cmd.exe", + "/c", + sbinPath + File.separator + "remove-datanode.bat", + removeDataNodeId + ""); + builder.environment().put("CLASSPATH", libPath); + monitorRemoveProcess(builder.start()); + } + + private void removeOnUnix() throws IOException, InterruptedException { + ProcessBuilder builder = + new ProcessBuilder( + "sh", sbinPath + File.separator + "remove-datanode.sh", removeDataNodeId + ""); + builder.environment().put("CLASSPATH", libPath); + monitorRemoveProcess(builder.start()); + } + + private void monitorRemoveProcess(Process p) throws IOException, InterruptedException { + BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream())); + String line; + while ((line = br.readLine()) != null) { + LOGGER.info(line); + } + br.close(); + p.destroy(); + + while (p.isAlive()) { + TimeUnit.MILLISECONDS.sleep(100); + } + assertEquals(correctScriptExitValue, p.exitValue()); + } +}