[IOTDB-4378] Negtive feedback Region extension policy (#7400)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
index 7f017c6..ded73de 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
@@ -20,6 +20,7 @@
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.node.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.node.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.partition.RegionGroupCache;
@@ -68,7 +69,9 @@
heartbeatResp.getHeartbeatTimestamp(),
receiveTime,
dataNodeLocation.getDataNodeId(),
- isLeader)));
+ isLeader,
+ // Region will inherit DataNode's status
+ RegionStatus.parse(heartbeatResp.getStatus()))));
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/exception/NotAvailableRegionGroupException.java b/confignode/src/main/java/org/apache/iotdb/confignode/exception/NotAvailableRegionGroupException.java
new file mode 100644
index 0000000..a9687a8
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/exception/NotAvailableRegionGroupException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.exception;
+
+public class NotAvailableRegionGroupException extends ConfigNodeException {
+
+ public NotAvailableRegionGroupException() {
+ super(
+ "There are no available RegionGroups currently, please check the status of cluster DataNodes");
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index e950b9a..257a68d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -346,6 +346,23 @@
}
/**
+ * Only leader use this interface
+ *
+ * @param storageGroup StorageGroupName
+ * @param consensusGroupType SchemaRegion for SchemaReplicationFactor and DataRegion for
+ * DataReplicationFactor
+ * @return SchemaReplicationFactor or DataReplicationFactor
+ * @throws StorageGroupNotExistsException When the specific StorageGroup doesn't exist
+ */
+ public int getReplicationFactor(String storageGroup, TConsensusGroupType consensusGroupType)
+ throws StorageGroupNotExistsException {
+ TStorageGroupSchema storageGroupSchema = getStorageGroupSchemaByName(storageGroup);
+ return consensusGroupType == TConsensusGroupType.SchemaRegion
+ ? storageGroupSchema.getSchemaReplicationFactor()
+ : storageGroupSchema.getDataReplicationFactor();
+ }
+
+ /**
* Only leader use this interface.
*
* @param rawPathList List<StorageGroupName>
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 5e3d112..63be337 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -706,7 +706,7 @@
}
if (req.getDiskSpaceWarningThreshold()
!= CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold()) {
- return errorStatus.setMessage(errorPrefix + "disk_full_threshold" + errorSuffix);
+ return errorStatus.setMessage(errorPrefix + "disk_space_warning_threshold" + errorSuffix);
}
return null;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 1ef7b97..7f01dcd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -34,6 +34,7 @@
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
+import org.apache.iotdb.confignode.exception.NotAvailableRegionGroupException;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
@@ -118,7 +119,8 @@
* @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
*/
public Map<String, SchemaPartitionTable> allocateSchemaPartition(
- Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) {
+ Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
+ throws NotAvailableRegionGroupException {
return partitionBalancer.allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
}
@@ -130,7 +132,8 @@
*/
public Map<String, DataPartitionTable> allocateDataPartition(
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- unassignedDataPartitionSlotsMap) {
+ unassignedDataPartitionSlotsMap)
+ throws NotAvailableRegionGroupException {
return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
index 2c71f84..f51b0db 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
@@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
+import org.apache.iotdb.confignode.exception.NotAvailableRegionGroupException;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.balancer.partition.GreedyPartitionAllocator;
import org.apache.iotdb.confignode.manager.load.balancer.partition.IPartitionAllocator;
@@ -48,7 +49,8 @@
* @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
*/
public Map<String, SchemaPartitionTable> allocateSchemaPartition(
- Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) {
+ Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
+ throws NotAvailableRegionGroupException {
return genPartitionAllocator().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
}
@@ -60,7 +62,8 @@
*/
public Map<String, DataPartitionTable> allocateDataPartition(
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- unassignedDataPartitionSlotsMap) {
+ unassignedDataPartitionSlotsMap)
+ throws NotAvailableRegionGroupException {
return genPartitionAllocator().allocateDataPartition(unassignedDataPartitionSlotsMap);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 010b2ac..43dcf46 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -35,7 +35,6 @@
import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionAllocator;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
-import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import java.util.List;
import java.util.Map;
@@ -55,7 +54,7 @@
}
/**
- * Generate a Regions allocation plan(CreateRegionsPlan)
+ * Generate a Regions' allocation plan(CreateRegionsPlan)
*
* @param allotmentMap Map<StorageGroupName, Region allotment>
* @param consensusGroupType TConsensusGroupType of the new Regions
@@ -66,11 +65,27 @@
public CreateRegionGroupsPlan genRegionsAllocationPlan(
Map<String, Integer> allotmentMap, TConsensusGroupType consensusGroupType)
throws NotEnoughDataNodeException, StorageGroupNotExistsException {
- CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
- IRegionAllocator regionAllocator = genRegionAllocator();
+ // The new Regions will occupy online DataNodes firstly
List<TDataNodeConfiguration> onlineDataNodes =
getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running);
+ // Some new Regions will have to occupy unknown DataNodes
+ // if the number of online DataNodes is insufficient
+ List<TDataNodeConfiguration> availableDataNodes =
+ getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Unknown);
+
+ // Make sure the number of available DataNodes is enough for allocating new Regions
+ for (String storageGroup : allotmentMap.keySet()) {
+ int replicationFactor =
+ getClusterSchemaManager().getReplicationFactor(storageGroup, consensusGroupType);
+ if (availableDataNodes.size() < replicationFactor) {
+ throw new NotEnoughDataNodeException();
+ }
+ }
+
+ CreateRegionGroupsPlan createRegionGroupsPlan = new CreateRegionGroupsPlan();
+ IRegionAllocator regionAllocator = genRegionAllocator();
+ // Only considering the specified ConsensusGroupType when doing allocation
List<TRegionReplicaSet> allocatedRegions = getPartitionManager().getAllReplicaSets();
allocatedRegions.removeIf(
allocateRegion -> allocateRegion.getRegionId().getType() != consensusGroupType);
@@ -78,25 +93,16 @@
for (Map.Entry<String, Integer> entry : allotmentMap.entrySet()) {
String storageGroup = entry.getKey();
int allotment = entry.getValue();
-
- // Get schema
- TStorageGroupSchema storageGroupSchema =
- getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup);
int replicationFactor =
- consensusGroupType == TConsensusGroupType.SchemaRegion
- ? storageGroupSchema.getSchemaReplicationFactor()
- : storageGroupSchema.getDataReplicationFactor();
-
- // Check validity
- if (onlineDataNodes.size() < replicationFactor) {
- throw new NotEnoughDataNodeException();
- }
+ getClusterSchemaManager().getReplicationFactor(storageGroup, consensusGroupType);
+ List<TDataNodeConfiguration> targetDataNodes =
+ onlineDataNodes.size() >= replicationFactor ? onlineDataNodes : availableDataNodes;
for (int i = 0; i < allotment; i++) {
// Generate allocation plan
TRegionReplicaSet newRegion =
regionAllocator.allocateRegion(
- onlineDataNodes,
+ targetDataNodes,
allocatedRegions,
replicationFactor,
new TConsensusGroupId(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
index 0800c97..73b5263 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/GreedyPartitionAllocator.java
@@ -26,6 +26,7 @@
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.commons.partition.SeriesPartitionTable;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.exception.NotAvailableRegionGroupException;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -50,27 +51,30 @@
@Override
public Map<String, SchemaPartitionTable> allocateSchemaPartition(
- Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap) {
+ Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
+ throws NotAvailableRegionGroupException {
Map<String, SchemaPartitionTable> result = new ConcurrentHashMap<>();
- unassignedSchemaPartitionSlotsMap.forEach(
- (storageGroup, unassignedPartitionSlots) -> {
- // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
- List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
- getPartitionManager()
- .getSortedRegionSlotsCounter(storageGroup, TConsensusGroupType.SchemaRegion);
+ for (Map.Entry<String, List<TSeriesPartitionSlot>> slotsMapEntry :
+ unassignedSchemaPartitionSlotsMap.entrySet()) {
+ final String storageGroup = slotsMapEntry.getKey();
+ final List<TSeriesPartitionSlot> unassignedPartitionSlots = slotsMapEntry.getValue();
- // Enumerate SeriesPartitionSlot
- Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap =
- new ConcurrentHashMap<>();
- for (TSeriesPartitionSlot seriesPartitionSlot : unassignedPartitionSlots) {
- // Greedy allocation
- schemaPartitionMap.put(seriesPartitionSlot, regionSlotsCounter.get(0).getRight());
- // Bubble sort
- bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
- }
- result.put(storageGroup, new SchemaPartitionTable(schemaPartitionMap));
- });
+ // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
+ List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
+ getPartitionManager()
+ .getSortedRegionGroupSlotsCounter(storageGroup, TConsensusGroupType.SchemaRegion);
+
+ // Enumerate SeriesPartitionSlot
+ Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap = new ConcurrentHashMap<>();
+ for (TSeriesPartitionSlot seriesPartitionSlot : unassignedPartitionSlots) {
+ // Greedy allocation
+ schemaPartitionMap.put(seriesPartitionSlot, regionSlotsCounter.get(0).getRight());
+ // Bubble sort
+ bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
+ }
+ result.put(storageGroup, new SchemaPartitionTable(schemaPartitionMap));
+ }
return result;
}
@@ -78,74 +82,79 @@
@Override
public Map<String, DataPartitionTable> allocateDataPartition(
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- unassignedDataPartitionSlotsMap) {
+ unassignedDataPartitionSlotsMap)
+ throws NotAvailableRegionGroupException {
Map<String, DataPartitionTable> result = new ConcurrentHashMap<>();
- unassignedDataPartitionSlotsMap.forEach(
- (storageGroup, unassignedPartitionSlotsMap) -> {
- // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
- List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
- getPartitionManager()
- .getSortedRegionSlotsCounter(storageGroup, TConsensusGroupType.DataRegion);
+ for (Map.Entry<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> slotsMapEntry :
+ unassignedDataPartitionSlotsMap.entrySet()) {
+ final String storageGroup = slotsMapEntry.getKey();
+ final Map<TSeriesPartitionSlot, List<TTimePartitionSlot>> unassignedPartitionSlotsMap =
+ slotsMapEntry.getValue();
- DataPartitionTable dataPartitionTable = new DataPartitionTable();
+ // List<Pair<allocatedSlotsNum, TConsensusGroupId>>
+ List<Pair<Long, TConsensusGroupId>> regionSlotsCounter =
+ getPartitionManager()
+ .getSortedRegionGroupSlotsCounter(storageGroup, TConsensusGroupType.DataRegion);
- // Enumerate SeriesPartitionSlot
- for (Map.Entry<TSeriesPartitionSlot, List<TTimePartitionSlot>> seriesPartitionEntry :
- unassignedPartitionSlotsMap.entrySet()) {
- SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
+ DataPartitionTable dataPartitionTable = new DataPartitionTable();
- // Enumerate TimePartitionSlot in ascending order
- List<TTimePartitionSlot> timePartitionSlots = seriesPartitionEntry.getValue();
- timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
- for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
+ // Enumerate SeriesPartitionSlot
+ for (Map.Entry<TSeriesPartitionSlot, List<TTimePartitionSlot>> seriesPartitionEntry :
+ unassignedPartitionSlotsMap.entrySet()) {
+ SeriesPartitionTable seriesPartitionTable = new SeriesPartitionTable();
- /* Check if the current DataPartition has predecessor firstly, and inherit it if exists */
+ // Enumerate TimePartitionSlot in ascending order
+ List<TTimePartitionSlot> timePartitionSlots = seriesPartitionEntry.getValue();
+ timePartitionSlots.sort(Comparator.comparingLong(TTimePartitionSlot::getStartTime));
+ for (TTimePartitionSlot timePartitionSlot : timePartitionSlots) {
- // Check if the current Partition's predecessor is allocated
- // in the same batch of Partition creation
- TConsensusGroupId predecessor =
- seriesPartitionTable.getPrecededDataPartition(
- timePartitionSlot, TIME_PARTITION_INTERVAL);
- if (predecessor != null) {
- seriesPartitionTable
- .getSeriesPartitionMap()
- .put(timePartitionSlot, Collections.singletonList(predecessor));
- bubbleSort(predecessor, regionSlotsCounter);
- continue;
- }
+ /* Check if the current DataPartition has predecessor firstly, and inherit it if exists */
- // Check if the current Partition's predecessor was allocated
- // in the former Partition creation
- predecessor =
- getPartitionManager()
- .getPrecededDataPartition(
- storageGroup,
- seriesPartitionEntry.getKey(),
- timePartitionSlot,
- TIME_PARTITION_INTERVAL);
- if (predecessor != null) {
- seriesPartitionTable
- .getSeriesPartitionMap()
- .put(timePartitionSlot, Collections.singletonList(predecessor));
- bubbleSort(predecessor, regionSlotsCounter);
- continue;
- }
-
- /* Greedy allocation */
- seriesPartitionTable
- .getSeriesPartitionMap()
- .put(
- timePartitionSlot,
- Collections.singletonList(regionSlotsCounter.get(0).getRight()));
- bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
- }
- dataPartitionTable
- .getDataPartitionMap()
- .put(seriesPartitionEntry.getKey(), seriesPartitionTable);
+ // Check if the current Partition's predecessor is allocated
+ // in the same batch of Partition creation
+ TConsensusGroupId predecessor =
+ seriesPartitionTable.getPrecededDataPartition(
+ timePartitionSlot, TIME_PARTITION_INTERVAL);
+ if (predecessor != null) {
+ seriesPartitionTable
+ .getSeriesPartitionMap()
+ .put(timePartitionSlot, Collections.singletonList(predecessor));
+ bubbleSort(predecessor, regionSlotsCounter);
+ continue;
}
- result.put(storageGroup, dataPartitionTable);
- });
+
+ // Check if the current Partition's predecessor was allocated
+ // in the former Partition creation
+ predecessor =
+ getPartitionManager()
+ .getPrecededDataPartition(
+ storageGroup,
+ seriesPartitionEntry.getKey(),
+ timePartitionSlot,
+ TIME_PARTITION_INTERVAL);
+ if (predecessor != null) {
+ seriesPartitionTable
+ .getSeriesPartitionMap()
+ .put(timePartitionSlot, Collections.singletonList(predecessor));
+ bubbleSort(predecessor, regionSlotsCounter);
+ continue;
+ }
+
+ /* Greedy allocation */
+ seriesPartitionTable
+ .getSeriesPartitionMap()
+ .put(
+ timePartitionSlot,
+ Collections.singletonList(regionSlotsCounter.get(0).getRight()));
+ bubbleSort(regionSlotsCounter.get(0).getRight(), regionSlotsCounter);
+ }
+ dataPartitionTable
+ .getDataPartitionMap()
+ .put(seriesPartitionEntry.getKey(), seriesPartitionTable);
+ }
+ result.put(storageGroup, dataPartitionTable);
+ }
return result;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
index 93d0a40..f957c0b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/partition/IPartitionAllocator.java
@@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
+import org.apache.iotdb.confignode.exception.NotAvailableRegionGroupException;
import java.util.List;
import java.util.Map;
@@ -39,7 +40,8 @@
* @return Map<StorageGroupName, SchemaPartitionTable>, the allocating result
*/
Map<String, SchemaPartitionTable> allocateSchemaPartition(
- Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap);
+ Map<String, List<TSeriesPartitionSlot>> unassignedSchemaPartitionSlotsMap)
+ throws NotAvailableRegionGroupException;
/**
* Allocate DataPartitions
@@ -49,5 +51,6 @@
*/
Map<String, DataPartitionTable> allocateDataPartition(
Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>>
- unassignedDataPartitionSlotsMap);
+ unassignedDataPartitionSlotsMap)
+ throws NotAvailableRegionGroupException;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionAllocator.java
index 990a08c..2259717 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/CopySetRegionAllocator.java
@@ -50,14 +50,14 @@
@Override
public TRegionReplicaSet allocateRegion(
- List<TDataNodeConfiguration> onlineDataNodes,
+ List<TDataNodeConfiguration> targetDataNodes,
List<TRegionReplicaSet> allocatedRegions,
int replicationFactor,
TConsensusGroupId consensusGroupId) {
TRegionReplicaSet result = null;
// Build weightList for weighted random
- buildWeightList(onlineDataNodes, allocatedRegions);
+ buildWeightList(targetDataNodes, allocatedRegions);
boolean accepted = false;
while (true) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocator.java
index bbdab4c..97ac37f 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionAllocator.java
@@ -37,12 +37,12 @@
@Override
public TRegionReplicaSet allocateRegion(
- List<TDataNodeConfiguration> onlineDataNodes,
+ List<TDataNodeConfiguration> targetDataNodes,
List<TRegionReplicaSet> allocatedRegions,
int replicationFactor,
TConsensusGroupId consensusGroupId) {
// Build weightList order by number of regions allocated asc
- List<TDataNodeLocation> weightList = buildWeightList(onlineDataNodes, allocatedRegions);
+ List<TDataNodeLocation> weightList = buildWeightList(targetDataNodes, allocatedRegions);
return new TRegionReplicaSet(
consensusGroupId,
weightList.stream().limit(replicationFactor).collect(Collectors.toList()));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionAllocator.java
index aa397c9..315ef74 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionAllocator.java
@@ -34,14 +34,14 @@
* Calculating the next optimal TRegionReplicaSet based on the current online DataNodes and
* allocated Regions
*
- * @param onlineDataNodes DataNodes that currently communicable
+ * @param targetDataNodes DataNodes that can be used for allocation
* @param allocatedRegions Allocated Regions
* @param replicationFactor Replication factor of TRegionReplicaSet
* @param consensusGroupId TConsensusGroupId of result TRegionReplicaSet
* @return The optimal TRegionReplicaSet derived by the specific algorithm
*/
TRegionReplicaSet allocateRegion(
- List<TDataNodeConfiguration> onlineDataNodes,
+ List<TDataNodeConfiguration> targetDataNodes,
List<TRegionReplicaSet> allocatedRegions,
int replicationFactor,
TConsensusGroupId consensusGroupId);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 23c16a4..2f2f658 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -517,7 +517,7 @@
/** loop body of the heartbeat thread */
private void heartbeatLoopBody() {
- // the consensusManager of configManager may not be fully initialized at this time
+ // The consensusManager of configManager may not be fully initialized at this time
Optional.ofNullable(getConsensusManager())
.ifPresent(
consensusManager -> {
@@ -638,13 +638,14 @@
* @param status The specific NodeStatus
* @return Filtered ConfigNodes with the specific NodeStatus
*/
- public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus status) {
+ public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus... status) {
return getRegisteredConfigNodes().stream()
.filter(
registeredConfigNode -> {
int configNodeId = registeredConfigNode.getConfigNodeId();
return nodeCacheMap.containsKey(configNodeId)
- && status.equals(nodeCacheMap.get(configNodeId).getNodeStatus());
+ && Arrays.stream(status)
+ .anyMatch(s -> s.equals(nodeCacheMap.get(configNodeId).getNodeStatus()));
})
.collect(Collectors.toList());
}
@@ -659,10 +660,10 @@
return getRegisteredDataNodes().stream()
.filter(
registeredDataNode -> {
- int id = registeredDataNode.getLocation().getDataNodeId();
- return nodeCacheMap.containsKey(id)
+ int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
+ return nodeCacheMap.containsKey(dataNodeId)
&& Arrays.stream(status)
- .anyMatch(s -> s.equals(nodeCacheMap.get(id).getNodeStatus()));
+ .anyMatch(s -> s.equals(nodeCacheMap.get(dataNodeId).getNodeStatus()));
})
.collect(Collectors.toList());
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 5a81ffd..0c04b34 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -26,6 +26,7 @@
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.cluster.RegionRoleType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.partition.DataPartitionTable;
@@ -51,6 +52,7 @@
import org.apache.iotdb.confignode.consensus.response.RegionInfoListResp;
import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
+import org.apache.iotdb.confignode.exception.NotAvailableRegionGroupException;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
@@ -76,6 +78,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -84,6 +89,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
/** The PartitionManager Manages cluster PartitionTable read and write requests. */
public class PartitionManager {
@@ -201,8 +207,18 @@
} else {
// Allocate SchemaPartitions only if
// the current ConfigNode still holds its leadership
- Map<String, SchemaPartitionTable> assignedSchemaPartition =
- getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
+ Map<String, SchemaPartitionTable> assignedSchemaPartition;
+ try {
+ assignedSchemaPartition =
+ getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
+ } catch (NotAvailableRegionGroupException e) {
+ LOGGER.error(e.getMessage());
+ resp.setStatus(
+ new TSStatus(TSStatusCode.NOT_AVAILABLE_REGION_GROUP.getStatusCode())
+ .setMessage(e.getMessage()));
+ return resp;
+ }
+
// Cache allocating result
CreateSchemaPartitionPlan createPlan = new CreateSchemaPartitionPlan();
createPlan.setAssignedSchemaPartition(assignedSchemaPartition);
@@ -268,8 +284,18 @@
} else {
// Allocate DataPartitions only if
// the current ConfigNode still holds its leadership
- Map<String, DataPartitionTable> assignedDataPartition =
- getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
+ Map<String, DataPartitionTable> assignedDataPartition;
+ try {
+ assignedDataPartition =
+ getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
+ } catch (NotAvailableRegionGroupException e) {
+ LOGGER.error(e.getMessage());
+ resp.setStatus(
+ new TSStatus(TSStatusCode.NOT_AVAILABLE_REGION_GROUP.getStatusCode())
+ .setMessage(e.getMessage()));
+ return resp;
+ }
+
// Cache allocating result
CreateDataPartitionPlan createPlan = new CreateDataPartitionPlan();
createPlan.setAssignedDataPartition(assignedDataPartition);
@@ -302,18 +328,20 @@
Map<String, Integer> allotmentMap = new ConcurrentHashMap<>();
for (Map.Entry<String, Integer> entry : unassignedPartitionSlotsCountMap.entrySet()) {
- float allocatedRegionCount =
- partitionInfo.getRegionCount(entry.getKey(), consensusGroupType);
+ final String storageGroup = entry.getKey();
+ final int unassignedPartitionSlotsCount = entry.getValue();
+
+ float allocatedRegionCount = partitionInfo.getRegionCount(storageGroup, consensusGroupType);
// The slotCount equals to the sum of assigned slot count and unassigned slot count
float slotCount =
- (float) partitionInfo.getAssignedSeriesPartitionSlotsCount(entry.getKey())
- + entry.getValue();
+ (float) partitionInfo.getAssignedSeriesPartitionSlotsCount(storageGroup)
+ + unassignedPartitionSlotsCount;
float maxRegionCount =
- getClusterSchemaManager().getMaxRegionGroupCount(entry.getKey(), consensusGroupType);
+ getClusterSchemaManager().getMaxRegionGroupCount(storageGroup, consensusGroupType);
float maxSlotCount =
ConfigNodeDescriptor.getInstance().getConf().getSeriesPartitionSlotNum();
- /* Region extension is required in the following two cases */
+ /* Region extension is required in the following cases */
// 1. There are no Region has been created for the current StorageGroup
if (allocatedRegionCount == 0) {
// The delta is equal to the smallest integer solution that satisfies the inequality:
@@ -322,7 +350,7 @@
Math.min(
(int) maxRegionCount,
Math.max(1, (int) Math.ceil(slotCount * maxRegionCount / maxSlotCount)));
- allotmentMap.put(entry.getKey(), delta);
+ allotmentMap.put(storageGroup, delta);
continue;
}
@@ -340,7 +368,15 @@
(int)
Math.ceil(
slotCount * maxRegionCount / maxSlotCount - allocatedRegionCount)));
- allotmentMap.put(entry.getKey(), delta);
+ allotmentMap.put(storageGroup, delta);
+ continue;
+ }
+
+ // 3. All RegionGroups in the specified StorageGroup are disabled currently
+ if (allocatedRegionCount
+ == filterRegionGroupThroughStatus(storageGroup, RegionGroupStatus.Disabled).size()
+ && allocatedRegionCount < maxRegionCount) {
+ allotmentMap.put(storageGroup, 1);
}
}
@@ -400,7 +436,7 @@
/**
* Only leader use this interface
*
- * @return All Regions' RegionReplicaSet
+ * @return Deep copy of all Regions' RegionReplicaSet
*/
public List<TRegionReplicaSet> getAllReplicaSets() {
return partitionInfo.getAllReplicaSets();
@@ -436,10 +472,31 @@
* @param storageGroup StorageGroupName
* @param type SchemaRegion or DataRegion
* @return The specific StorageGroup's Regions that sorted by the number of allocated slots
+ * @throws NotAvailableRegionGroupException When all RegionGroups within the specified
+ * StorageGroup are unavailable currently
*/
- public List<Pair<Long, TConsensusGroupId>> getSortedRegionSlotsCounter(
- String storageGroup, TConsensusGroupType type) {
- return partitionInfo.getSortedRegionSlotsCounter(storageGroup, type);
+ public List<Pair<Long, TConsensusGroupId>> getSortedRegionGroupSlotsCounter(
+ String storageGroup, TConsensusGroupType type) throws NotAvailableRegionGroupException {
+ // Collect static data
+ List<Pair<Long, TConsensusGroupId>> regionGroupSlotsCounter =
+ partitionInfo.getRegionGroupSlotsCounter(storageGroup, type);
+
+ // Filter RegionGroups that have Disabled status
+ List<Pair<Long, TConsensusGroupId>> result = new ArrayList<>();
+ for (Pair<Long, TConsensusGroupId> slotsCounter : regionGroupSlotsCounter) {
+ // Use Running or Available RegionGroups
+ RegionGroupStatus status = getRegionGroupStatus(slotsCounter.getRight());
+ if (RegionGroupStatus.Running.equals(status) || RegionGroupStatus.Available.equals(status)) {
+ result.add(slotsCounter);
+ }
+ }
+
+ if (result.isEmpty()) {
+ throw new NotAvailableRegionGroupException();
+ }
+
+ result.sort(Comparator.comparingLong(Pair::getLeft));
+ return result;
}
/**
@@ -498,9 +555,7 @@
.forEach(
regionInfo -> {
regionInfo.setStatus(
- regionGroupCacheMap
- .get(regionInfo.getConsensusGroupId())
- .getRegionStatus(regionInfo.getDataNodeId())
+ getRegionStatus(regionInfo.getConsensusGroupId(), regionInfo.getDataNodeId())
.getStatus());
String regionType =
@@ -709,6 +764,62 @@
return result;
}
+ /**
+ * Filter the RegionGroups in the specified StorageGroup through the RegionGroupStatus
+ *
+ * @param storageGroup The specified StorageGroup
+ * @param status The specified RegionGroupStatus
+ * @return Filtered RegionGroups with the specific RegionGroupStatus
+ */
+ public List<TRegionReplicaSet> filterRegionGroupThroughStatus(
+ String storageGroup, RegionGroupStatus... status) {
+ return getAllReplicaSets(storageGroup).stream()
+ .filter(
+ regionReplicaSet -> {
+ TConsensusGroupId regionGroupId = regionReplicaSet.getRegionId();
+ return regionGroupCacheMap.containsKey(regionGroupId)
+ && Arrays.stream(status)
+ .anyMatch(
+ s ->
+ s.equals(
+ regionGroupCacheMap.get(regionGroupId).getRegionGroupStatus()));
+ })
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Safely get RegionStatus
+ *
+ * @param consensusGroupId Specified RegionGroupId
+ * @param dataNodeId Specified RegionReplicaId
+ * @return Corresponding RegionStatus if cache exists, Unknown otherwise
+ */
+ public RegionStatus getRegionStatus(TConsensusGroupId consensusGroupId, int dataNodeId) {
+ return regionGroupCacheMap.containsKey(consensusGroupId)
+ ? regionGroupCacheMap.get(consensusGroupId).getRegionStatus(dataNodeId)
+ : RegionStatus.Unknown;
+ }
+
+ /**
+ * Safely get RegionGroupStatus
+ *
+ * @param consensusGroupId Specified RegionGroupId
+ * @return Corresponding RegionGroupStatus if cache exists, Disabled otherwise
+ */
+ public RegionGroupStatus getRegionGroupStatus(TConsensusGroupId consensusGroupId) {
+ return regionGroupCacheMap.containsKey(consensusGroupId)
+ ? regionGroupCacheMap.get(consensusGroupId).getRegionGroupStatus()
+ : RegionGroupStatus.Disabled;
+ }
+
+ public void cacheHeartbeatSample(
+ TConsensusGroupId regionGroupId, RegionHeartbeatSample regionHeartbeatSample) {
+ regionGroupCacheMap
+ .computeIfAbsent(regionGroupId, empty -> new RegionGroupCache(regionGroupId))
+ .cacheHeartbeatSample(regionHeartbeatSample);
+ regionGroupCacheMap.get(regionGroupId).updateRegionStatistics();
+ }
+
public ScheduledExecutorService getRegionMaintainer() {
return regionMaintainer;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionCache.java
index 8f99211..c537b33 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionCache.java
@@ -66,12 +66,13 @@
if (lastSample.getSendTimestamp() > versionTimestamp) {
versionTimestamp = lastSample.getSendTimestamp();
isLeader = lastSample.isLeader();
+ status = lastSample.getStatus();
}
+ }
- status =
- System.currentTimeMillis() - versionTimestamp > HEARTBEAT_TIMEOUT_TIME
- ? RegionStatus.Unknown
- : RegionStatus.Running;
+ // TODO: Optimize judge logic
+ if (System.currentTimeMillis() - versionTimestamp > HEARTBEAT_TIMEOUT_TIME) {
+ status = RegionStatus.Unknown;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCache.java
index 4afef5f..c8140fc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCache.java
@@ -85,12 +85,37 @@
return leaderDataNodeId;
}
+ /**
+ * Get the specified Region's status
+ *
+ * @param dataNodeId Where the Region resides
+ * @return Region's latest status if received heartbeat recently, Unknown otherwise
+ */
public RegionStatus getRegionStatus(int dataNodeId) {
return regionCacheMap.containsKey(dataNodeId)
? regionCacheMap.get(dataNodeId).getStatus()
: RegionStatus.Unknown;
}
+ public RegionGroupStatus getRegionGroupStatus() {
+ int unknownCount = 0;
+ for (RegionCache regionCache : regionCacheMap.values()) {
+ if (RegionStatus.ReadOnly.equals(regionCache.getStatus())
+ || RegionStatus.Removing.equals(regionCache.getStatus())) {
+ return RegionGroupStatus.Disabled;
+ }
+ unknownCount += RegionStatus.Unknown.equals(regionCache.getStatus()) ? 1 : 0;
+ }
+
+ if (unknownCount == 0) {
+ return RegionGroupStatus.Running;
+ } else {
+ return unknownCount <= ((regionCacheMap.size() - 1) / 2)
+ ? RegionGroupStatus.Available
+ : RegionGroupStatus.Disabled;
+ }
+ }
+
public TConsensusGroupId getConsensusGroupId() {
return consensusGroupId;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
new file mode 100644
index 0000000..73b8f55
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionGroupStatus.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.partition;
+
+public enum RegionGroupStatus {
+
+ /** All Regions in RegionGroup are in the Running status */
+ Running("Running"),
+
+ /**
+ * All Regions in RegionGroup are in the Running or Unknown status, and the number of Regions in
+ * the Unknown status is less than half
+ */
+ Available("Available"),
+
+ /**
+ * The following cases will lead to Disabled RegionGroup:
+ *
+ * <p>1. There is a Region in ReadOnly or Removing status
+ *
+ * <p>2. More than half of the Regions are in Unknown status
+ */
+ Disabled("Disabled");
+
+ private final String status;
+
+ RegionGroupStatus(String status) {
+ this.status = status;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public static RegionGroupStatus parse(String status) {
+ for (RegionGroupStatus regionGroupStatus : RegionGroupStatus.values()) {
+ if (regionGroupStatus.status.equals(status)) {
+ return regionGroupStatus;
+ }
+ }
+ throw new RuntimeException(String.format("RegionGroupStatus %s doesn't exist.", status));
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionHeartbeatSample.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionHeartbeatSample.java
index cff2fa7..f8685f2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionHeartbeatSample.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/RegionHeartbeatSample.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.confignode.manager.partition;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+
public class RegionHeartbeatSample {
// Unit: ms
@@ -26,16 +28,22 @@
private final int belongedDataNodeId;
private final boolean isLeader;
+ private final RegionStatus status;
// TODO: Add load sample
public RegionHeartbeatSample(
- long sendTimestamp, long receiveTimestamp, int belongedDataNodeId, boolean isLeader) {
+ long sendTimestamp,
+ long receiveTimestamp,
+ int belongedDataNodeId,
+ boolean isLeader,
+ RegionStatus status) {
this.sendTimestamp = sendTimestamp;
this.receiveTimestamp = receiveTimestamp;
this.belongedDataNodeId = belongedDataNodeId;
this.isLeader = isLeader;
+ this.status = status;
}
public long getSendTimestamp() {
@@ -53,4 +61,8 @@
public boolean isLeader() {
return isLeader;
}
+
+ public RegionStatus getStatus() {
+ return status;
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 0432dae..c94a903 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -286,6 +286,8 @@
// Remove empty Map
schemaPartition.remove(storageGroup);
}
+ } else {
+ isAllPartitionsExist.set(false);
}
});
}
@@ -323,6 +325,8 @@
// Remove empty Map
dataPartition.remove(storageGroup);
}
+ } else {
+ isAllPartitionsExist.set(false);
}
});
@@ -550,7 +554,7 @@
/**
* Only leader use this interface.
*
- * @return All Regions' RegionReplicaSet
+ * @return Deep copy of all Regions' RegionReplicaSet
*/
public List<TRegionReplicaSet> getAllReplicaSets() {
List<TRegionReplicaSet> result = new ArrayList<>();
@@ -615,11 +619,12 @@
*
* @param storageGroup StorageGroupName
* @param type SchemaRegion or DataRegion
- * @return The specific StorageGroup's Regions that sorted by the number of allocated slots
+ * @return The StorageGroup's Running or Available Regions that sorted by the number of allocated
+ * slots
*/
- public List<Pair<Long, TConsensusGroupId>> getSortedRegionSlotsCounter(
+ public List<Pair<Long, TConsensusGroupId>> getRegionGroupSlotsCounter(
String storageGroup, TConsensusGroupType type) {
- return storageGroupPartitionTables.get(storageGroup).getSortedRegionGroupSlotsCounter(type);
+ return storageGroupPartitionTables.get(storageGroup).getRegionGroupSlotsCounter(type);
}
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index e45c7dd..5ba8345 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -41,7 +41,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -102,7 +101,7 @@
replicaSets.forEach(replicaSet -> regionGroupMap.remove(replicaSet.getRegionId()));
}
- /** @return All Regions' RegionReplicaSet within one StorageGroup */
+ /** @return Deep copy of all Regions' RegionReplicaSet within one StorageGroup */
public List<TRegionReplicaSet> getAllReplicaSets() {
List<TRegionReplicaSet> result = new ArrayList<>();
@@ -276,20 +275,22 @@
* Only leader use this interface.
*
* @param type SchemaRegion or DataRegion
- * @return RegionGroups' indexes that sorted by the number of allocated slots
+ * @return RegionGroups' slot count and index
*/
- public List<Pair<Long, TConsensusGroupId>> getSortedRegionGroupSlotsCounter(
- TConsensusGroupType type) {
+ public List<Pair<Long, TConsensusGroupId>> getRegionGroupSlotsCounter(TConsensusGroupType type) {
List<Pair<Long, TConsensusGroupId>> result = new Vector<>();
regionGroupMap.forEach(
(consensusGroupId, regionGroup) -> {
- if (consensusGroupId.getType().equals(type)) {
- result.add(new Pair<>(regionGroup.getSeriesSlotCount(), consensusGroupId));
+ if (type.equals(consensusGroupId.getType())) {
+ long slotCount =
+ type.equals(TConsensusGroupType.SchemaRegion)
+ ? regionGroup.getSeriesSlotCount()
+ : regionGroup.getTimeSlotCount();
+ result.add(new Pair<>(slotCount, consensusGroupId));
}
});
- result.sort(Comparator.comparingLong(Pair::getLeft));
return result;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index dfee4dd..82dffe0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -25,6 +25,7 @@
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
@@ -43,6 +44,7 @@
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
+import org.apache.iotdb.confignode.manager.partition.RegionHeartbeatSample;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
@@ -343,6 +345,19 @@
getLoadManager().broadcastLatestRegionRouteMap();
}
+ public void buildRegionGroupCache(
+ TConsensusGroupId regionGroupId, Map<Integer, RegionStatus> regionStatusMap) {
+ long currentTime = System.currentTimeMillis();
+ regionStatusMap.forEach(
+ (dataNodeId, regionStatus) ->
+ getPartitionManager()
+ .cacheHeartbeatSample(
+ regionGroupId,
+ new RegionHeartbeatSample(
+ currentTime, currentTime, dataNodeId, false, regionStatus)));
+ getPartitionManager().getRegionGroupCacheMap().get(regionGroupId).updateRegionStatistics();
+ }
+
public List<TRegionReplicaSet> getAllReplicaSets(String storageGroup) {
return getPartitionManager().getAllReplicaSets(storageGroup);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
index 4329b5f..4e7bf3a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateRegionGroupsProcedure.java
@@ -21,6 +21,7 @@
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
@@ -41,6 +42,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
public class CreateRegionGroupsProcedure
extends StateMachineProcedure<ConfigNodeProcedureEnv, CreateRegionGroupsState> {
@@ -139,6 +141,43 @@
env.persistAndBroadcastRegionGroup(persistPlan);
env.getConfigManager().getConsensusManager().write(offerPlan);
+ setNextState(CreateRegionGroupsState.BUILD_REGION_GROUP_CACHE);
+ break;
+ case BUILD_REGION_GROUP_CACHE:
+ // Build RegionGroupCache immediately to make these successfully built RegionGroup available
+ createRegionGroupsPlan
+ .getRegionGroupMap()
+ .forEach(
+ (storageGroup, regionReplicaSets) ->
+ regionReplicaSets.forEach(
+ regionReplicaSet -> {
+ Map<Integer, RegionStatus> statusMap = new ConcurrentHashMap<>();
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ statusMap.put(
+ dataNodeLocation.getDataNodeId(), RegionStatus.Running));
+
+ if (!failedRegionReplicaSets.containsKey(
+ regionReplicaSet.getRegionId())) {
+ env.buildRegionGroupCache(regionReplicaSet.getRegionId(), statusMap);
+ } else {
+ TRegionReplicaSet failedRegionReplicas =
+ failedRegionReplicaSets.get(regionReplicaSet.getRegionId());
+ if (failedRegionReplicas.getDataNodeLocationsSize()
+ <= (regionReplicaSet.getDataNodeLocationsSize() - 1) / 2) {
+ failedRegionReplicas
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation ->
+ statusMap.replace(
+ dataNodeLocation.getDataNodeId(),
+ RegionStatus.Unknown));
+ env.buildRegionGroupCache(regionReplicaSet.getRegionId(), statusMap);
+ }
+ }
+ }));
setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH);
break;
case CREATE_REGION_GROUPS_FINISH:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
index 87a1a84..a332f0c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionGroupsState.java
@@ -27,5 +27,6 @@
// RegionReplicas created successfully on the same RegionGroup
// 3. Delete redundant RegionReplicas in contrast to case 2.
SHUNT_REGION_REPLICAS,
+ BUILD_REGION_GROUP_CACHE,
CREATE_REGION_GROUPS_FINISH
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
index 82bb08b..ef040a1 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
@@ -24,6 +24,7 @@
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.node.BaseNodeCache;
import org.apache.iotdb.confignode.manager.node.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.node.NodeHeartbeatSample;
@@ -102,22 +103,22 @@
/* Simulate ratis consensus protocol(only one leader) */
regionGroupCacheMap
.get(groupId1)
- .cacheHeartbeatSample(new RegionHeartbeatSample(10, 10, 0, false));
+ .cacheHeartbeatSample(new RegionHeartbeatSample(10, 10, 0, false, RegionStatus.Running));
regionGroupCacheMap
.get(groupId1)
- .cacheHeartbeatSample(new RegionHeartbeatSample(11, 11, 1, true));
+ .cacheHeartbeatSample(new RegionHeartbeatSample(11, 11, 1, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId1)
- .cacheHeartbeatSample(new RegionHeartbeatSample(12, 12, 2, false));
+ .cacheHeartbeatSample(new RegionHeartbeatSample(12, 12, 2, false, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
- .cacheHeartbeatSample(new RegionHeartbeatSample(13, 13, 3, false));
+ .cacheHeartbeatSample(new RegionHeartbeatSample(13, 13, 3, false, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
- .cacheHeartbeatSample(new RegionHeartbeatSample(14, 14, 4, true));
+ .cacheHeartbeatSample(new RegionHeartbeatSample(14, 14, 4, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
- .cacheHeartbeatSample(new RegionHeartbeatSample(15, 15, 5, false));
+ .cacheHeartbeatSample(new RegionHeartbeatSample(15, 15, 5, false, RegionStatus.Running));
// Get leaderMap
Map<TConsensusGroupId, Integer> leaderMap = new HashMap<>();
@@ -148,22 +149,28 @@
for (int i = 2; i <= 1000; i++) {
regionGroupCacheMap
.get(groupId1)
- .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10, i * 10, 0, true));
+ .cacheHeartbeatSample(
+ new RegionHeartbeatSample(i * 10, i * 10, 0, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId1)
- .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 1, i * 10 + 1, 1, true));
+ .cacheHeartbeatSample(
+ new RegionHeartbeatSample(i * 10 + 1, i * 10 + 1, 1, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId1)
- .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 2, i * 10 + 2, 2, true));
+ .cacheHeartbeatSample(
+ new RegionHeartbeatSample(i * 10 + 2, i * 10 + 2, 2, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
- .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 3, i * 10 + 3, 3, true));
+ .cacheHeartbeatSample(
+ new RegionHeartbeatSample(i * 10 + 3, i * 10 + 3, 3, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
- .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 4, i * 10 + 4, 4, true));
+ .cacheHeartbeatSample(
+ new RegionHeartbeatSample(i * 10 + 4, i * 10 + 4, 4, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
- .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 5, i * 10 + 5, 5, true));
+ .cacheHeartbeatSample(
+ new RegionHeartbeatSample(i * 10 + 5, i * 10 + 5, 5, true, RegionStatus.Running));
// Get leaderMap
leaderMap.clear();
@@ -191,16 +198,20 @@
/* Simulate multiLeader consensus protocol with a DataNode fails down */
regionGroupCacheMap
.get(groupId1)
- .cacheHeartbeatSample(new RegionHeartbeatSample(10030, 10030, 0, true));
+ .cacheHeartbeatSample(
+ new RegionHeartbeatSample(10030, 10030, 0, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId1)
- .cacheHeartbeatSample(new RegionHeartbeatSample(10031, 10031, 1, true));
+ .cacheHeartbeatSample(
+ new RegionHeartbeatSample(10031, 10031, 1, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
- .cacheHeartbeatSample(new RegionHeartbeatSample(10033, 10033, 3, true));
+ .cacheHeartbeatSample(
+ new RegionHeartbeatSample(10033, 10033, 3, true, RegionStatus.Running));
regionGroupCacheMap
.get(groupId2)
- .cacheHeartbeatSample(new RegionHeartbeatSample(10034, 10034, 4, true));
+ .cacheHeartbeatSample(
+ new RegionHeartbeatSample(10034, 10034, 4, true, RegionStatus.Running));
// Get leaderMap
leaderMap.clear();
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
new file mode 100644
index 0000000..6f30202
--- /dev/null
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/partition/RegionGroupCacheTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.partition;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RegionGroupCacheTest {
+
+ @Test
+ public void getLeaderDataNodeIdTest() {
+ final int leaderId = 1;
+ long currentTime = System.currentTimeMillis();
+ RegionGroupCache regionGroupCache =
+ new RegionGroupCache(new TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
+ for (int i = 0; i < 3; i++) {
+ regionGroupCache.cacheHeartbeatSample(
+ new RegionHeartbeatSample(
+ currentTime, currentTime, i, i == leaderId, RegionStatus.Running));
+ }
+ Assert.assertTrue(regionGroupCache.updateRegionStatistics());
+ Assert.assertEquals(leaderId, regionGroupCache.getLeaderDataNodeId());
+ }
+
+ @Test
+ public void getRegionStatusTest() {
+ long currentTime = System.currentTimeMillis();
+ RegionGroupCache regionGroupCache =
+ new RegionGroupCache(new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1));
+ regionGroupCache.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 0, false, RegionStatus.Running));
+ regionGroupCache.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 1, false, RegionStatus.Unknown));
+ regionGroupCache.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 2, false, RegionStatus.Removing));
+ regionGroupCache.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 3, false, RegionStatus.ReadOnly));
+ regionGroupCache.updateRegionStatistics();
+
+ Assert.assertEquals(RegionStatus.Running, regionGroupCache.getRegionStatus(0));
+ Assert.assertEquals(RegionStatus.Unknown, regionGroupCache.getRegionStatus(1));
+ Assert.assertEquals(RegionStatus.Removing, regionGroupCache.getRegionStatus(2));
+ Assert.assertEquals(RegionStatus.ReadOnly, regionGroupCache.getRegionStatus(3));
+ }
+
+ @Test
+ public void getRegionGroupStatusTest() {
+ long currentTime = System.currentTimeMillis();
+ RegionGroupCache runningRegionGroup =
+ new RegionGroupCache(new TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
+ runningRegionGroup.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 0, false, RegionStatus.Running));
+ runningRegionGroup.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 1, false, RegionStatus.Running));
+ runningRegionGroup.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 2, false, RegionStatus.Running));
+ runningRegionGroup.updateRegionStatistics();
+ Assert.assertEquals(RegionGroupStatus.Running, runningRegionGroup.getRegionGroupStatus());
+
+ RegionGroupCache availableRegionGroup =
+ new RegionGroupCache(new TConsensusGroupId(TConsensusGroupType.DataRegion, 1));
+ availableRegionGroup.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 0, false, RegionStatus.Running));
+ availableRegionGroup.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 1, false, RegionStatus.Unknown));
+ availableRegionGroup.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 2, false, RegionStatus.Running));
+ availableRegionGroup.updateRegionStatistics();
+ Assert.assertEquals(RegionGroupStatus.Available, availableRegionGroup.getRegionGroupStatus());
+
+ RegionGroupCache disabledRegionGroup0 =
+ new RegionGroupCache(new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 2));
+ disabledRegionGroup0.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 0, false, RegionStatus.Running));
+ disabledRegionGroup0.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 1, false, RegionStatus.ReadOnly));
+ disabledRegionGroup0.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 2, false, RegionStatus.Running));
+ disabledRegionGroup0.updateRegionStatistics();
+ Assert.assertEquals(RegionGroupStatus.Disabled, disabledRegionGroup0.getRegionGroupStatus());
+
+ RegionGroupCache disabledRegionGroup1 =
+ new RegionGroupCache(new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 3));
+ disabledRegionGroup1.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 0, false, RegionStatus.Running));
+ disabledRegionGroup1.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 1, false, RegionStatus.Unknown));
+ disabledRegionGroup1.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 2, false, RegionStatus.Unknown));
+ disabledRegionGroup1.updateRegionStatistics();
+ Assert.assertEquals(RegionGroupStatus.Disabled, disabledRegionGroup1.getRegionGroupStatus());
+
+ RegionGroupCache disabledRegionGroup2 =
+ new RegionGroupCache(new TConsensusGroupId(TConsensusGroupType.DataRegion, 4));
+ disabledRegionGroup2.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 0, false, RegionStatus.Running));
+ disabledRegionGroup2.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 1, false, RegionStatus.Running));
+ disabledRegionGroup2.cacheHeartbeatSample(
+ new RegionHeartbeatSample(currentTime, currentTime, 2, false, RegionStatus.Removing));
+ disabledRegionGroup2.updateRegionStatistics();
+ Assert.assertEquals(RegionGroupStatus.Disabled, disabledRegionGroup2.getRegionGroupStatus());
+ }
+}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index 9f8a839..b68c004 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -388,4 +388,13 @@
}
throw new IOException("Failed to get config node connection");
}
+
+ @Override
+ public void restartDataNode(int index) {
+ dataNodeWrapperList.get(index).start();
+ }
+
+ public void shutdownDataNode(int index) {
+ dataNodeWrapperList.get(index).stop();
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
index c35c3c4..1c6f257 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/MppConfig.java
@@ -228,6 +228,20 @@
}
@Override
+ public BaseConfig setSchemaReplicationFactor(int schemaReplicationFactor) {
+ confignodeProperties.setProperty(
+ "schema_replication_factor", String.valueOf(schemaReplicationFactor));
+ return this;
+ }
+
+ @Override
+ public BaseConfig setDataReplicationFactor(int dataReplicationFactor) {
+ confignodeProperties.setProperty(
+ "data_replication_factor", String.valueOf(dataReplicationFactor));
+ return this;
+ }
+
+ @Override
public BaseConfig setTimePartitionInterval(long timePartitionInterval) {
confignodeProperties.setProperty(
"time_partition_interval", String.valueOf(timePartitionInterval));
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index e086ce4..a85905b 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -152,4 +152,14 @@
session.open();
return session;
}
+
+ @Override
+ public void restartDataNode(int index) {
+ getDataNodeWrapperList().get(index).start();
+ }
+
+ @Override
+ public void shutdownDataNode(int index) {
+ getDataNodeWrapperList().get(index).stop();
+ }
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
index 6ecaad0..c1273b3 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseConfig.java
@@ -231,6 +231,22 @@
return "org.apache.iotdb.consensus.standalone.StandAloneConsensus";
}
+ default BaseConfig setSchemaReplicationFactor(int schemaReplicationFactor) {
+ return this;
+ }
+
+ default int getSchemaReplicationFactor() {
+ return 1;
+ }
+
+ default BaseConfig setDataReplicationFactor(int dataReplicationFactor) {
+ return this;
+ }
+
+ default int getDataReplicationFactor() {
+ return 1;
+ }
+
default BaseConfig setTimePartitionInterval(long timePartitionInterval) {
return this;
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 35ff793..79550d2 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -129,4 +129,8 @@
session.open();
return session;
}
+
+ void restartDataNode(int index);
+
+ void shutdownDataNode(int index);
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBClusterPartitionTableTest.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
similarity index 61%
rename from integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBClusterPartitionTableTest.java
rename to integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
index 39907d1..fff68f9 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBClusterPartitionTableTest.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBClusterPartitionIT.java
@@ -16,21 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.it.confignode;
+package org.apache.iotdb.confignode;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
@@ -48,6 +53,8 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -55,24 +62,31 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
-public class IoTDBClusterPartitionTableTest {
+public class IoTDBClusterPartitionIT {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBClusterPartitionIT.class);
protected static String originalConfigNodeConsensusProtocolClass;
protected static String originalSchemaRegionConsensusProtocolClass;
protected static String originalDataRegionConsensusProtocolClass;
+ private static final String testConsensusProtocolClass =
+ "org.apache.iotdb.consensus.ratis.RatisConsensus";
+
+ protected static int originSchemaReplicationFactor;
+ protected static int originalDataReplicationFactor;
+ private static final int testReplicationFactor = 3;
protected static long originalTimePartitionInterval;
-
private static final long testTimePartitionInterval = 86400;
+
private static final String sg = "root.sg";
private static final int storageGroupNum = 5;
private static final int seriesPartitionSlotsNum = 10000;
- private static final int seriesPartitionBatchSize = 1000;
private static final int timePartitionSlotsNum = 10;
- private static final int timePartitionBatchSize = 10;
@Before
public void setUp() throws Exception {
@@ -82,14 +96,16 @@
ConfigFactory.getConfig().getSchemaRegionConsensusProtocolClass();
originalDataRegionConsensusProtocolClass =
ConfigFactory.getConfig().getDataRegionConsensusProtocolClass();
- originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
+ ConfigFactory.getConfig().setConfigNodeConsesusProtocolClass(testConsensusProtocolClass);
+ ConfigFactory.getConfig().setSchemaRegionConsensusProtocolClass(testConsensusProtocolClass);
+ ConfigFactory.getConfig().setDataRegionConsensusProtocolClass(testConsensusProtocolClass);
- ConfigFactory.getConfig()
- .setConfigNodeConsesusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus");
- ConfigFactory.getConfig()
- .setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus");
- ConfigFactory.getConfig()
- .setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus");
+ originSchemaReplicationFactor = ConfigFactory.getConfig().getSchemaReplicationFactor();
+ originalDataReplicationFactor = ConfigFactory.getConfig().getDataReplicationFactor();
+ ConfigFactory.getConfig().setSchemaReplicationFactor(testReplicationFactor);
+ ConfigFactory.getConfig().setDataReplicationFactor(testReplicationFactor);
+
+ originalTimePartitionInterval = ConfigFactory.getConfig().getTimePartitionInterval();
ConfigFactory.getConfig().setTimePartitionInterval(testTimePartitionInterval);
EnvFactory.getEnv().initBeforeClass();
@@ -243,14 +259,14 @@
Assert.assertTrue(dataPartitionTable.containsKey(storageGroup));
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TConsensusGroupId>>>
seriesPartitionTable = dataPartitionTable.get(storageGroup);
- Assert.assertEquals(seriesPartitionBatchSize, seriesPartitionTable.size());
+ Assert.assertEquals(seriesSlotEnd - seriesSlotStart, seriesPartitionTable.size());
for (int i = seriesSlotStart; i < seriesSlotEnd; i++) {
TSeriesPartitionSlot seriesPartitionSlot = new TSeriesPartitionSlot(i);
Assert.assertTrue(seriesPartitionTable.containsKey(seriesPartitionSlot));
Map<TTimePartitionSlot, List<TConsensusGroupId>> timePartitionTable =
seriesPartitionTable.get(seriesPartitionSlot);
- Assert.assertEquals(timePartitionBatchSize, timePartitionTable.size());
+ Assert.assertEquals(timeSlotEnd - timeSlotStart, timePartitionTable.size());
for (long j = timeSlotStart; j < timeSlotEnd; j++) {
TTimePartitionSlot timePartitionSlot =
@@ -269,6 +285,9 @@
@Test
public void testGetAndCreateDataPartition() throws TException, IOException {
+ final int seriesPartitionBatchSize = 100;
+ final int timePartitionBatchSize = 10;
+
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
TSStatus status;
@@ -349,4 +368,193 @@
});
}
}
+
+ @Test
+ public void testPartitionDurable() throws IOException, TException, InterruptedException {
+ final int testDataNodeId = 0;
+ final int seriesPartitionBatchSize = 10;
+ final int timePartitionBatchSize = 10;
+
+ // Shutdown the first DataNode
+ EnvFactory.getEnv().shutdownDataNode(testDataNodeId);
+
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ final String sg0 = sg + 0;
+ final String sg1 = sg + 1;
+
+ // Set StorageGroup, the result should be success
+ TSetStorageGroupReq setStorageGroupReq =
+ new TSetStorageGroupReq(new TStorageGroupSchema(sg0));
+ TSStatus status = client.setStorageGroup(setStorageGroupReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+ setStorageGroupReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg1));
+ status = client.setStorageGroup(setStorageGroupReq);
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
+
+ // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
+ Map<String, Map<TSeriesPartitionSlot, List<TTimePartitionSlot>>> partitionSlotsMap =
+ constructPartitionSlotsMap(sg0, 0, seriesPartitionBatchSize, 0, timePartitionBatchSize);
+ TDataPartitionReq dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
+ TDataPartitionTableResp dataPartitionTableResp = null;
+ for (int retry = 0; retry < 5; retry++) {
+ // Build new Client since it's unstable in Win8 environment
+ try (SyncConfigNodeIServiceClient configNodeClient =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+ if (dataPartitionTableResp != null) {
+ break;
+ }
+ } catch (Exception e) {
+ // Retry sometimes in order to avoid request timeout
+ LOGGER.error(e.getMessage());
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ Assert.assertNotNull(dataPartitionTableResp);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+ checkDataPartitionMap(
+ sg0,
+ 0,
+ seriesPartitionBatchSize,
+ 0,
+ timePartitionBatchSize,
+ dataPartitionTableResp.getDataPartitionTable());
+
+ // Check Region count
+ int runningCnt = 0;
+ int unknownCnt = 0;
+ TShowRegionResp showRegionResp = client.showRegion(new TShowRegionReq());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
+ for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+ if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
+ runningCnt += 1;
+ } else if (RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) {
+ unknownCnt += 1;
+ }
+ }
+ // The runningCnt should be exactly twice as the unknownCnt
+ // since there exists one DataNode is shutdown
+ Assert.assertEquals(unknownCnt * 2, runningCnt);
+
+ // Wait for shutdown check
+ TShowClusterResp showClusterResp;
+ while (true) {
+ boolean containUnknown = false;
+ showClusterResp = client.showCluster();
+ for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
+ if (NodeStatus.Unknown.getStatus()
+ .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
+ containUnknown = true;
+ break;
+ }
+ }
+ if (containUnknown) {
+ break;
+ }
+ }
+ runningCnt = 0;
+ unknownCnt = 0;
+ showClusterResp = client.showCluster();
+ for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
+ if (NodeStatus.Running.getStatus()
+ .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
+ runningCnt += 1;
+ } else if (NodeStatus.Unknown.getStatus()
+ .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
+ unknownCnt += 1;
+ }
+ }
+ Assert.assertEquals(2, runningCnt);
+ Assert.assertEquals(1, unknownCnt);
+
+ // Test getOrCreateDataPartition, ConfigNode should create DataPartition and return
+ partitionSlotsMap =
+ constructPartitionSlotsMap(sg1, 0, seriesPartitionBatchSize, 0, timePartitionBatchSize);
+ dataPartitionReq = new TDataPartitionReq(partitionSlotsMap);
+ for (int retry = 0; retry < 5; retry++) {
+ // Build new Client since it's unstable in Win8 environment
+ try (SyncConfigNodeIServiceClient configNodeClient =
+ (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getConfigNodeConnection()) {
+ dataPartitionTableResp = configNodeClient.getOrCreateDataPartitionTable(dataPartitionReq);
+ if (dataPartitionTableResp != null) {
+ break;
+ }
+ } catch (Exception e) {
+ // Retry sometimes in order to avoid request timeout
+ LOGGER.error(e.getMessage());
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ Assert.assertNotNull(dataPartitionTableResp);
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ dataPartitionTableResp.getStatus().getCode());
+ Assert.assertNotNull(dataPartitionTableResp.getDataPartitionTable());
+ checkDataPartitionMap(
+ sg1,
+ 0,
+ seriesPartitionBatchSize,
+ 0,
+ timePartitionBatchSize,
+ dataPartitionTableResp.getDataPartitionTable());
+
+ // Check Region count and status
+ runningCnt = 0;
+ unknownCnt = 0;
+ showRegionResp = client.showRegion(new TShowRegionReq());
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), showRegionResp.getStatus().getCode());
+ for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+ if (RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
+ runningCnt += 1;
+ } else if (RegionStatus.Unknown.getStatus().equals(regionInfo.getStatus())) {
+ unknownCnt += 1;
+ }
+ }
+ // The runningCnt should be exactly twice as the unknownCnt
+ // since there exists one DataNode is shutdown
+ Assert.assertEquals(unknownCnt * 2, runningCnt);
+
+ EnvFactory.getEnv().restartDataNode(testDataNodeId);
+ // Wait for heartbeat check
+ while (true) {
+ boolean containUnknown = false;
+ showClusterResp = client.showCluster();
+ for (TDataNodeLocation dataNodeLocation : showClusterResp.getDataNodeList()) {
+ if (NodeStatus.Unknown.getStatus()
+ .equals(showClusterResp.getNodeStatus().get(dataNodeLocation.getDataNodeId()))) {
+ containUnknown = true;
+ break;
+ }
+ }
+ if (!containUnknown) {
+ break;
+ }
+ }
+
+ // All Regions should alive after the testDataNode is restarted
+ boolean allRunning = true;
+ for (int retry = 0; retry < 30; retry++) {
+ allRunning = true;
+ showRegionResp = client.showRegion(new TShowRegionReq());
+ for (TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+ if (!RegionStatus.Running.getStatus().equals(regionInfo.getStatus())) {
+ allRunning = false;
+ break;
+ }
+ }
+ if (allRunning) {
+ break;
+ }
+
+ TimeUnit.SECONDS.sleep(1);
+ }
+ Assert.assertTrue(allRunning);
+ }
+ }
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBConfigNodeIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeIT.java
similarity index 99%
rename from integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBConfigNodeIT.java
rename to integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeIT.java
index ebdc7f8..aba7da3 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/confignode/IoTDBConfigNodeIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/IoTDBConfigNodeIT.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.it.confignode;
+package org.apache.iotdb.confignode;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
index 70f4cb2..4c45167 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/env/StandaloneEnv.java
@@ -172,4 +172,14 @@
session.open();
return session;
}
+
+ @Override
+ public void restartDataNode(int index) {
+ // Do nothing
+ }
+
+ @Override
+ public void shutdownDataNode(int index) {
+ // Do nothing
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
index 6d60ec6..bc81946 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
@@ -25,7 +25,13 @@
Running("Running"),
/** Region connection failure */
- Unknown("Unknown");
+ Unknown("Unknown"),
+
+ /** Region is in removing */
+ Removing("Removing"),
+
+ /** Only query statements are permitted */
+ ReadOnly("ReadOnly");
private final String status;
@@ -36,4 +42,13 @@
public String getStatus() {
return status;
}
+
+ public static RegionStatus parse(String status) {
+ for (RegionStatus regionStatus : RegionStatus.values()) {
+ if (regionStatus.status.equals(status)) {
+ return regionStatus;
+ }
+ }
+ throw new RuntimeException(String.format("RegionStatus %s doesn't exist.", status));
+ }
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 5e1afe6..1bca6b5 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -164,7 +164,8 @@
DATANODE_STOP_ERROR(917),
REGION_LEADER_CHANGE_FAILED(918),
REMOVE_DATANODE_FAILED(919),
- OVERLAP_WITH_EXISTING_DELETE_TIMESERIES_TASK(920);
+ OVERLAP_WITH_EXISTING_DELETE_TIMESERIES_TASK(920),
+ NOT_AVAILABLE_REGION_GROUP(921);
private int statusCode;