[IOTDB-6111] Fix Region creation bugs (#10844) (#10858)
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java index dba7b81..df83e00 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionCreationIT.java
@@ -114,7 +114,10 @@ public void testPartitionAllocation() throws Exception { try (SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { - // Status: Running, Running, Running, Region: [0], [0], [0] + // Current cluster: 1C3D + // Create 1 DataPartition to extend 1 DataRegionGroup + // DataNode status: Running, Running, Running + // Region distribution: [0], [0], [0] Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = ConfigNodeTestUtils.constructPartitionSlotsMap( sg, @@ -153,7 +156,10 @@ testTimePartitionInterval, dataPartitionTableResp.getDataPartitionTable()); - // Status: Running, Running, Removing, Region: [0], [0], [0] + // Current cluster: 1C3D + // Set 1 DataNode to Removing status + // DataNode status: Running, Running, Removing + // Region distribution: [0], [0], [0] TSetDataNodeStatusReq setDataNodeStatusReq = new TSetDataNodeStatusReq(); DataNodeWrapper dataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(2); setDataNodeStatusReq.setTargetDataNode( @@ -183,7 +189,11 @@ TimeUnit.SECONDS.sleep(1); } - // Status: Running, Running, Removing, Running, RegionGroup: [0, 1], [0, 1], [0], [1] + // Register 1 DataNode and Create 1 DataPartition to extend 1 DataRegionGroup + // The new DataRegions wouldn't be allocated to the Removing DataNode + // Current cluster: 1C4D + // DataNode status: Running, Running, Removing, Running + // Region distribution: [0, 1], [0, 1], [0], [1] EnvFactory.getEnv().registerNewDataNode(true); partitionSlotsMap = ConfigNodeTestUtils.constructPartitionSlotsMap( @@ -222,8 +232,10 @@ testTimePartitionInterval, dataPartitionTableResp.getDataPartitionTable()); - // Status: Running, Running, Removing, ReadOnly, RegionGroup: [0, 1], [0, 1], - // [0], [1] + // Current cluster: 1C4D + // Set 1 DataNode to ReadOnly status + // DataNode status: Running, Running, Removing, ReadOnly + // Region distribution: [0, 1], [0, 1], [0], [1] setDataNodeStatusReq = new TSetDataNodeStatusReq(); dataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(3); setDataNodeStatusReq.setTargetDataNode( @@ -253,8 +265,11 @@ TimeUnit.SECONDS.sleep(1); } - // Status: Running, Running, Removing, ReadOnly, Running, RegionGroup: [0, 1, 2], [0, 1, 2], - // [0], [1], [2] + // Register 1 DataNode and Create 1 DataPartition to extend 1 DataRegionGroup + // The new DataRegions wouldn't be allocated to the Removing and ReadOnly DataNode + // Current cluster: 1C5D + // DataNode status: Running, Running, Removing, ReadOnly, Running + // Region distribution: [0, 1, 2], [0, 1, 2], [0], [1], [2] EnvFactory.getEnv().registerNewDataNode(true); partitionSlotsMap = ConfigNodeTestUtils.constructPartitionSlotsMap( @@ -293,8 +308,10 @@ testTimePartitionInterval, dataPartitionTableResp.getDataPartitionTable()); - // Status: Running, Running, Removing, ReadOnly, Unknown, RegionGroup:[0, 1, 2], [0, 1, 2], - // [0], [1], [2] + // Shutdown 1 DataNode + // Current cluster: 1C5D + // DataNode status: Running, Running, Removing, ReadOnly, Unknown + // Region distribution: [0, 1, 2], [0, 1, 2], [0], [1], [2] EnvFactory.getEnv().shutdownDataNode(4); // Wait for shutdown check while (true) { @@ -315,10 +332,13 @@ TimeUnit.SECONDS.sleep(1); } - // Status: Running, Running, Removing, ReadOnly, Unknown, Running, - // RegionGroup: [0, 1, 2, 3], [0, 1, 2, 3], [0], [1], [2], [3] + // Register 1 DataNode and Create 1 DataPartition to extend 1 DataRegionGroup + // The new DataRegions wouldn't be allocated to the Removing and ReadOnly DataNode + // But the new DataRegion can be allocated to the Unknown DataNode + // Current cluster: 1C6D + // Status: Running, Running, Removing, ReadOnly, Unknown, Running + // RegionGroup: [0, 1, 2, 3], [0, 1, 2], [0], [1], [2, 3], [3] EnvFactory.getEnv().registerNewDataNode(false); - // Use thread sleep to replace verifying because the Unknown DataNode can not pass the // connection check TimeUnit.SECONDS.sleep(25); @@ -378,47 +398,60 @@ readOnlyCnt += 1; } } - Assert.assertEquals(9, runningCnt); + Assert.assertEquals(8, runningCnt); Assert.assertEquals(1, removingCnt); Assert.assertEquals(1, readOnlyCnt); - Assert.assertEquals(1, unknownCnt); + Assert.assertEquals(2, unknownCnt); - partitionSlotsMap = - ConfigNodeTestUtils.constructPartitionSlotsMap( - sg, - 4, - 4 + testSeriesPartitionBatchSize, - 4, - 4 + testTimePartitionBatchSize, - testTimePartitionInterval); - dataPartitionReq = new TDataPartitionReq(partitionSlotsMap); - for (int retry = 0; retry < 5; retry++) { - // Build new Client since it's unstable in Win8 environment - try (SyncConfigNodeIServiceClient configNodeClient = - (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { - dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq); - if (dataPartitionTableResp != null) { - break; - } - } catch (Exception e) { - // Retry sometimes in order to avoid request timeout - LOGGER.error(e.getMessage()); - TimeUnit.SECONDS.sleep(1); + // Restart 1 DataNode + // Current cluster: 1C6D + // Status: Running, Running, Removing, ReadOnly, Running, Running + // RegionGroup: [0, 1, 2, 3], [0, 1, 2], [0], [1], [2, 3], [3] + EnvFactory.getEnv().startDataNode(4); + // Wait for restart check + while (true) { + AtomicBoolean containUnknown = new AtomicBoolean(false); + TShowDataNodesResp showDataNodesResp = client.showDataNodes(); + showDataNodesResp + .getDataNodesInfoList() + .forEach( + dataNodeInfo -> { + if (NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) { + containUnknown.set(true); + } + }); + + if (!containUnknown.get()) { + break; } + TimeUnit.SECONDS.sleep(1); } - Assert.assertNotNull(dataPartitionTableResp); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), - dataPartitionTableResp.getStatus().getCode()); - Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable()); - ConfigNodeTestUtils.checkDataPartitionTable( - sg, - 4, - 4 + testSeriesPartitionBatchSize, - 4, - 4 + testTimePartitionBatchSize, - testTimePartitionInterval, - dataPartitionTableResp.getDataPartitionTable()); + // Check Region count and status + for (int i = 0; i < 10; i++) { + runningCnt = 0; + unknownCnt = 0; + readOnlyCnt = 0; + removingCnt = 0; + showRegionResp = client.showRegion(new TShowRegionReq()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode()); + for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) { + if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) { + runningCnt += 1; + } else if (RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) { + unknownCnt += 1; + } else if (RegionStatus.Removing.getStatus().equals(regionInfo.getStatus())) { + removingCnt += 1; + } else if (RegionStatus.ReadOnly.getStatus().equals(regionInfo.getStatus())) { + readOnlyCnt += 1; + } + } + if (runningCnt == 10 && unknownCnt == 0 && readOnlyCnt == 1 && removingCnt == 1) { + return; + } + TimeUnit.SECONDS.sleep(1); + } + Assert.fail("Region status is not correct after 10s of recovery"); } } }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java index 31cb976..c9965fe 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -74,18 +74,15 @@ Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType) throws NotEnoughDataNodeException, DatabaseNotExistsException { - // The new RegionGroups will occupy online DataNodes firstly - List<TDataNodeConfiguration> onlineDataNodes = - getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running); // Some new RegionGroups will have to occupy unknown DataNodes // if the number of online DataNodes is insufficient List<TDataNodeConfiguration> availableDataNodes = getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Unknown); // Make sure the number of available DataNodes is enough for allocating new RegionGroups - for (String storageGroup : allotmentMap.keySet()) { + for (String database : allotmentMap.keySet()) { int replicationFactor = - getClusterSchemaManager().getReplicationFactor(storageGroup, consensusGroupType); + getClusterSchemaManager().getReplicationFactor(database, consensusGroupType); if (availableDataNodes.size() < replicationFactor) { throw new NotEnoughDataNodeException(); } @@ -97,18 +94,16 @@ getPartitionManager().getAllReplicaSets(consensusGroupType); for (Map.Entry<String, Integer> entry : allotmentMap.entrySet()) { - String storageGroup = entry.getKey(); + String database = entry.getKey(); int allotment = entry.getValue(); int replicationFactor = - getClusterSchemaManager().getReplicationFactor(storageGroup, consensusGroupType); - List<TDataNodeConfiguration> targetDataNodes = - onlineDataNodes.size() >= replicationFactor ? onlineDataNodes : availableDataNodes; + getClusterSchemaManager().getReplicationFactor(database, consensusGroupType); for (int i = 0; i < allotment; i++) { // Prepare input data Map<Integer, TDataNodeConfiguration> availableDataNodeMap = new ConcurrentHashMap<>(); Map<Integer, Double> freeDiskSpaceMap = new ConcurrentHashMap<>(); - targetDataNodes.forEach( + availableDataNodes.forEach( dataNodeConfiguration -> { int dataNodeId = dataNodeConfiguration.getLocation().getDataNodeId(); availableDataNodeMap.put(dataNodeId, dataNodeConfiguration); @@ -124,7 +119,7 @@ replicationFactor, new TConsensusGroupId( consensusGroupType, getPartitionManager().generateNextRegionGroupId())); - createRegionGroupsPlan.addRegionGroup(storageGroup, newRegionGroup); + createRegionGroupsPlan.addRegionGroup(database, newRegionGroup); // Mark the new RegionGroup as allocated allocatedRegionGroups.add(newRegionGroup);
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java index a395c10..0bce99e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -59,7 +59,10 @@ // TODO: Optimize judge logic RegionStatus status; - if (System.currentTimeMillis() - lastSample.getSendTimestamp() > HEARTBEAT_TIMEOUT_TIME) { + if (RegionStatus.Removing.equals(lastSample.getStatus())) { + status = RegionStatus.Removing; + } else if (System.currentTimeMillis() - lastSample.getSendTimestamp() + > HEARTBEAT_TIMEOUT_TIME) { status = RegionStatus.Unknown; } else { status = lastSample.getStatus();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java index b2a6cb4..55c4e3b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse; +import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; import org.apache.iotdb.db.exception.DataRegionException; @@ -121,6 +122,11 @@ SchemaRegionConsensusImpl.getInstance().createPeer(schemaRegionId, peers); if (consensusGenericResponse.isSuccess()) { tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } else if (consensusGenericResponse.getException() + instanceof ConsensusGroupAlreadyExistException) { + tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + tsStatus.setMessage( + String.format("SchemaRegion %d already exists.", schemaRegionId.getId())); } else { tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode()); tsStatus.setMessage(consensusGenericResponse.getException().getMessage()); @@ -157,6 +163,10 @@ DataRegionConsensusImpl.getInstance().createPeer(dataRegionId, peers); if (consensusGenericResponse.isSuccess()) { tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } else if (consensusGenericResponse.getException() + instanceof ConsensusGroupAlreadyExistException) { + tsStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + tsStatus.setMessage(String.format("DataRegion %d already exists.", dataRegionId.getId())); } else { tsStatus = new TSStatus(TSStatusCode.CREATE_REGION_ERROR.getStatusCode()); tsStatus.setMessage(consensusGenericResponse.getException().getMessage());