[IOTDB-4419] Maintain RegionStatus through cluster heartbeat (#7345)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
index 890b448..2ed85ef 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
@@ -21,7 +21,6 @@
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.heartbeat.RegionGroupCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.RegionHeartbeatSample;
@@ -36,12 +35,12 @@
// Update DataNodeHeartbeatCache when success
private final TDataNodeLocation dataNodeLocation;
private final DataNodeHeartbeatCache dataNodeHeartbeatCache;
- private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
+ private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
public DataNodeHeartbeatHandler(
TDataNodeLocation dataNodeLocation,
DataNodeHeartbeatCache dataNodeHeartbeatCache,
- Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap) {
+ Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap) {
this.dataNodeLocation = dataNodeLocation;
this.dataNodeHeartbeatCache = dataNodeHeartbeatCache;
this.regionGroupCacheMap = regionGroupCacheMap;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index ec119bf..33fb5ed 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -21,7 +21,6 @@
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
@@ -29,7 +28,6 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -80,14 +78,12 @@
import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -102,7 +98,6 @@
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,7 +112,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
@@ -792,24 +786,7 @@
public RegionInfoListResp showRegion(GetRegionInfoListPlan getRegionInfoListPlan) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- RegionInfoListResp regionInfoListResp =
- (RegionInfoListResp) partitionManager.getRegionInfoList(getRegionInfoListPlan);
- regionInfoListResp
- .getRegionInfoList()
- .forEach(
- regionInfo -> {
- Map<TConsensusGroupId, Integer> allLeadership =
- getPartitionManager().getAllLeadership();
- if (!allLeadership.isEmpty()) {
- String regionType =
- regionInfo.getDataNodeId()
- == allLeadership.getOrDefault(regionInfo.getConsensusGroupId(), -1)
- ? RegionRoleType.Leader.toString()
- : RegionRoleType.Follower.toString();
- regionInfo.setRoleType(regionType);
- }
- });
- return regionInfoListResp;
+ return (RegionInfoListResp) partitionManager.getRegionInfoList(getRegionInfoListPlan);
} else {
RegionInfoListResp regionResp = new RegionInfoListResp();
regionResp.setStatus(status);
@@ -822,48 +799,8 @@
TSStatus status = confirmLeader();
TShowDataNodesResp resp = new TShowDataNodesResp();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- List<TDataNodeInfo> registeredDataNodesInfoList = nodeManager.getRegisteredDataNodeInfoList();
-
- // Map<DataNodeId, DataRegionNum>
- Map<Integer, AtomicInteger> dataRegionNumMap = new HashMap<>();
- // Map<DataNodeId, SchemaRegionNum>
- Map<Integer, AtomicInteger> schemaRegionNumMap = new HashMap<>();
-
- List<TRegionInfo> regionInfoList =
- ((RegionInfoListResp) partitionManager.getRegionInfoList(new GetRegionInfoListPlan()))
- .getRegionInfoList();
- if (CollectionUtils.isNotEmpty(regionInfoList)) {
-
- regionInfoList.forEach(
- (regionInfo) -> {
- int dataNodeId = regionInfo.getDataNodeId();
- int regionTypeValue = regionInfo.getConsensusGroupId().getType().getValue();
- int dataRegionNum =
- regionTypeValue == TConsensusGroupType.DataRegion.getValue() ? 1 : 0;
- int schemaRegionNum =
- regionTypeValue == TConsensusGroupType.SchemaRegion.getValue() ? 1 : 0;
- dataRegionNumMap
- .computeIfAbsent(dataNodeId, key -> new AtomicInteger())
- .addAndGet(dataRegionNum);
- schemaRegionNumMap
- .computeIfAbsent(dataNodeId, key -> new AtomicInteger())
- .addAndGet(schemaRegionNum);
- });
-
- registeredDataNodesInfoList.forEach(
- (dataNodesInfo -> {
- if (dataRegionNumMap.containsKey(dataNodesInfo.getDataNodeId())) {
- dataNodesInfo.setDataRegionNum(
- dataRegionNumMap.get(dataNodesInfo.getDataNodeId()).get());
- }
- if (schemaRegionNumMap.containsKey(dataNodesInfo.getDataNodeId())) {
- dataNodesInfo.setSchemaRegionNum(
- schemaRegionNumMap.get(dataNodesInfo.getDataNodeId()).get());
- }
- }));
- }
-
- return resp.setDataNodesInfoList(registeredDataNodesInfoList).setStatus(StatusUtils.OK);
+ return resp.setDataNodesInfoList(nodeManager.getRegisteredDataNodeInfoList())
+ .setStatus(StatusUtils.OK);
} else {
return resp.setStatus(status);
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 218bbc2..89197da 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -23,6 +23,7 @@
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionRoleType;
@@ -70,6 +71,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -275,6 +277,43 @@
dataNodeInfoList.add(info);
});
}
+
+ // Map<DataNodeId, DataRegionNum>
+ Map<Integer, AtomicInteger> dataRegionNumMap = new HashMap<>();
+ // Map<DataNodeId, SchemaRegionNum>
+ Map<Integer, AtomicInteger> schemaRegionNumMap = new HashMap<>();
+ List<TRegionReplicaSet> regionReplicaSets = getPartitionManager().getAllReplicaSets();
+ regionReplicaSets.forEach(
+ regionReplicaSet ->
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation -> {
+ switch (regionReplicaSet.getRegionId().getType()) {
+ case SchemaRegion:
+ schemaRegionNumMap
+ .computeIfAbsent(
+ dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
+ .getAndIncrement();
+ break;
+ case DataRegion:
+ default:
+ dataRegionNumMap
+ .computeIfAbsent(
+ dataNodeLocation.getDataNodeId(), key -> new AtomicInteger())
+ .getAndIncrement();
+ }
+ }));
+ AtomicInteger zero = new AtomicInteger(0);
+ dataNodeInfoList.forEach(
+ (dataNodesInfo -> {
+ dataNodesInfo.setSchemaRegionNum(
+ schemaRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
+ dataNodesInfo.setDataRegionNum(
+ dataRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
+ }));
+
+ dataNodeInfoList.sort(Comparator.comparingInt(TDataNodeInfo::getDataNodeId));
return dataNodeInfoList;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index cffdc97..1d54772 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -25,6 +25,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.partition.DataPartitionTable;
@@ -45,12 +46,13 @@
import org.apache.iotdb.confignode.consensus.request.write.PreDeleteStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
+import org.apache.iotdb.confignode.consensus.response.RegionInfoListResp;
import org.apache.iotdb.confignode.consensus.response.SchemaNodeManagementResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.confignode.manager.load.LoadManager;
-import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.RegionGroupCache;
import org.apache.iotdb.confignode.persistence.metric.PartitionInfoMetrics;
import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -93,7 +95,7 @@
private Future<?> currentRegionCleanerFuture;
// Map<RegionId, RegionGroupCache>
- private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
+ private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap;
public PartitionManager(IManager configManager, PartitionInfo partitionInfo) {
this.configManager = configManager;
@@ -460,7 +462,31 @@
}
public DataSet getRegionInfoList(GetRegionInfoListPlan req) {
- return getConsensusManager().read(req).getDataset();
+ // Get static result
+ RegionInfoListResp regionInfoListResp =
+ (RegionInfoListResp) getConsensusManager().read(req).getDataset();
+ Map<TConsensusGroupId, Integer> allLeadership = getAllLeadership();
+
+ // Get cached result
+ regionInfoListResp
+ .getRegionInfoList()
+ .forEach(
+ regionInfo -> {
+ regionInfo.setStatus(
+ regionGroupCacheMap
+ .get(regionInfo.getConsensusGroupId())
+ .getRegionStatus(regionInfo.getDataNodeId())
+ .getStatus());
+
+ String regionType =
+ regionInfo.getDataNodeId()
+ == allLeadership.getOrDefault(regionInfo.getConsensusGroupId(), -1)
+ ? RegionRoleType.Leader.toString()
+ : RegionRoleType.Follower.toString();
+ regionInfo.setRoleType(regionType);
+ });
+
+ return regionInfoListResp;
}
/**
@@ -540,7 +566,7 @@
}
}
- public Map<TConsensusGroupId, IRegionGroupCache> getRegionGroupCacheMap() {
+ public Map<TConsensusGroupId, RegionGroupCache> getRegionGroupCacheMap() {
return regionGroupCacheMap;
}
@@ -549,12 +575,16 @@
}
/**
- * Get the leadership of each RegionGroup If a node is in unknown or removing status, this node
- * can't be leader
+ * Get the leadership of each RegionGroup.
*
- * @return Map<RegionGroupId, leader location>
+ * @return Map<RegionGroupId, DataNodeId where the leader located>
+ * <p>Some RegionGroups that supposed to be occurred in the result map might be nonexistent
+ * and some leaderId might be -1(leader unknown yet) due to heartbeat latency
*/
public Map<TConsensusGroupId, Integer> getAllLeadership() {
+
+ // TODO: Will be optimized by IOTDB-4341
+
Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>();
if (ConfigNodeDescriptor.getInstance()
.getConf()
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 63ab1e5..072e575 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -196,7 +196,7 @@
.values()
.forEach(
regionGroupCache -> {
- boolean updateResult = regionGroupCache.updateLoadStatistic();
+ boolean updateResult = regionGroupCache.updateRegionStatistics();
switch (regionGroupCache.getConsensusGroupId().getType()) {
// Check if some RegionGroups change their leader
case SchemaRegion:
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/BaseNodeCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/BaseNodeCache.java
index 6bb0cb2..7e70717 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/BaseNodeCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/BaseNodeCache.java
@@ -26,10 +26,10 @@
public abstract class BaseNodeCache {
/** When the response time of heartbeat is more than 20s, the node is considered as down */
- static final int HEARTBEAT_TIMEOUT_TIME = 20_000;
+ public static final int HEARTBEAT_TIMEOUT_TIME = 20_000;
/** Max heartbeat cache samples store size */
- static final int MAXIMUM_WINDOW_SIZE = 100;
+ public static final int MAXIMUM_WINDOW_SIZE = 100;
/** SlidingWindow stores the heartbeat sample data */
final LinkedList<NodeHeartbeatSample> slidingWindow = new LinkedList<>();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
deleted file mode 100644
index 19968a7..0000000
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.manager.load.heartbeat;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-
-public interface IRegionGroupCache {
-
- /**
- * Cache the newest HeartbeatSample
- *
- * @param newHeartbeatSample The newest HeartbeatSample
- */
- void cacheHeartbeatSample(RegionHeartbeatSample newHeartbeatSample);
-
- /**
- * Remove the specific sample cache if exists
- *
- * @param dataNodeId DataNodeId
- */
- void removeCacheIfExists(Integer dataNodeId);
-
- /**
- * Invoking periodically to update RegionGroups' load statistics
- *
- * @return true if some load statistic changed
- */
- boolean updateLoadStatistic();
-
- /**
- * Get RegionGroup's latest leader
- *
- * @return The DataNodeId of the latest leader
- */
- int getLeaderDataNodeId();
-
- /**
- * Get RegionGroup's ConsensusGroupId
- *
- * @return TConsensusGroupId
- */
- TConsensusGroupId getConsensusGroupId();
-}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionCache.java
new file mode 100644
index 0000000..aea08a7
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionCache.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.heartbeat;
+
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.iotdb.confignode.manager.load.heartbeat.BaseNodeCache.HEARTBEAT_TIMEOUT_TIME;
+import static org.apache.iotdb.confignode.manager.load.heartbeat.BaseNodeCache.MAXIMUM_WINDOW_SIZE;
+
+public class RegionCache {
+
+ private final List<RegionHeartbeatSample> slidingWindow;
+
+ // Indicates the version of the statistics
+ private volatile long versionTimestamp;
+ private volatile RegionStatus status;
+ private volatile boolean isLeader;
+
+ public RegionCache() {
+ this.slidingWindow = Collections.synchronizedList(new LinkedList<>());
+
+ this.versionTimestamp = 0;
+ this.status = RegionStatus.Unknown;
+ this.isLeader = false;
+ }
+
+ public void cacheHeartbeatSample(RegionHeartbeatSample newHeartbeatSample) {
+ synchronized (slidingWindow) {
+ // Only sequential HeartbeatSamples are accepted.
+ // And un-sequential HeartbeatSamples will be discarded.
+ if (slidingWindow.size() == 0
+ || getLastSample().getSendTimestamp() < newHeartbeatSample.getSendTimestamp()) {
+ slidingWindow.add(newHeartbeatSample);
+ }
+
+ if (slidingWindow.size() > MAXIMUM_WINDOW_SIZE) {
+ slidingWindow.remove(0);
+ }
+ }
+ }
+
+ public void updateStatistics() {
+ synchronized (slidingWindow) {
+ RegionHeartbeatSample lastSample = getLastSample();
+ if (lastSample.getSendTimestamp() > versionTimestamp) {
+ versionTimestamp = lastSample.getSendTimestamp();
+ isLeader = lastSample.isLeader();
+ }
+
+ status =
+ System.currentTimeMillis() - versionTimestamp > HEARTBEAT_TIMEOUT_TIME
+ ? RegionStatus.Unknown
+ : RegionStatus.Running;
+ }
+ }
+
+ public RegionStatus getStatus() {
+ return status;
+ }
+
+ /** @return Pair<Last update time, is leader> */
+ public Pair<Long, Boolean> isLeader() {
+ return new Pair<>(versionTimestamp, isLeader);
+ }
+
+ private RegionHeartbeatSample getLastSample() {
+ return slidingWindow.get(slidingWindow.size() - 1);
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
index 8cfde19..d770d21 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
@@ -19,103 +19,78 @@
package org.apache.iotdb.confignode.manager.load.heartbeat;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-public class RegionGroupCache implements IRegionGroupCache {
+public class RegionGroupCache {
private static final Logger LOGGER = LoggerFactory.getLogger(RegionGroupCache.class);
- // TODO: This class might be split into SchemaRegionGroupCache and DataRegionGroupCache
-
- private static final int maximumWindowSize = 100;
-
private final TConsensusGroupId consensusGroupId;
- // Map<DataNodeId(where a RegionReplica resides), LinkedList<RegionHeartbeatSample>>
- private final Map<Integer, LinkedList<RegionHeartbeatSample>> slidingWindow;
+ // Map<DataNodeId(where a RegionReplica resides), RegionCache>
+ private final Map<Integer, RegionCache> regionCacheMap;
- // Indicates the version of the statistics
- private final AtomicLong versionTimestamp;
// The DataNode where the leader resides
- private final AtomicInteger leaderDataNodeId;
+ private volatile int leaderDataNodeId;
public RegionGroupCache(TConsensusGroupId consensusGroupId) {
this.consensusGroupId = consensusGroupId;
-
- this.slidingWindow = new ConcurrentHashMap<>();
-
- this.versionTimestamp = new AtomicLong(0);
- this.leaderDataNodeId = new AtomicInteger(-1);
+ this.regionCacheMap = new ConcurrentHashMap<>();
+ this.leaderDataNodeId = -1;
}
- @Override
public void cacheHeartbeatSample(RegionHeartbeatSample newHeartbeatSample) {
- slidingWindow.putIfAbsent(newHeartbeatSample.getBelongedDataNodeId(), new LinkedList<>());
- synchronized (slidingWindow.get(newHeartbeatSample.getBelongedDataNodeId())) {
- LinkedList<RegionHeartbeatSample> samples =
- slidingWindow.get(newHeartbeatSample.getBelongedDataNodeId());
-
- // Only sequential HeartbeatSamples are accepted.
- // And un-sequential HeartbeatSamples will be discarded.
- if (samples.size() == 0
- || samples.getLast().getSendTimestamp() < newHeartbeatSample.getSendTimestamp()) {
- samples.add(newHeartbeatSample);
- }
-
- if (samples.size() > maximumWindowSize) {
- samples.removeFirst();
- }
- }
+ regionCacheMap
+ .computeIfAbsent(newHeartbeatSample.getBelongedDataNodeId(), empty -> new RegionCache())
+ .cacheHeartbeatSample(newHeartbeatSample);
}
- @Override
- public boolean updateLoadStatistic() {
+ /**
+ * Update RegionReplicas' statistics, including:
+ *
+ * <p>1. RegionStatus
+ *
+ * <p>2. Leadership
+ *
+ * @return True if the leader changed, false otherwise
+ */
+ public boolean updateRegionStatistics() {
long updateVersion = Long.MIN_VALUE;
- int updateLeaderDataNodeId = -1;
- int originLeaderDataNodeId = leaderDataNodeId.get();
+ int originLeaderDataNodeId = leaderDataNodeId;
- synchronized (slidingWindow) {
- for (LinkedList<RegionHeartbeatSample> samples : slidingWindow.values()) {
- if (samples.size() > 0) {
- RegionHeartbeatSample lastSample = samples.getLast();
- if (lastSample.getSendTimestamp() > updateVersion && lastSample.isLeader()) {
- updateVersion = lastSample.getSendTimestamp();
- updateLeaderDataNodeId = lastSample.getBelongedDataNodeId();
- }
- }
+ for (Map.Entry<Integer, RegionCache> cacheEntry : regionCacheMap.entrySet()) {
+ cacheEntry.getValue().updateStatistics();
+ Pair<Long, Boolean> isLeader = cacheEntry.getValue().isLeader();
+ if (isLeader.getLeft() > updateVersion && isLeader.getRight()) {
+ updateVersion = isLeader.getLeft();
+ leaderDataNodeId = cacheEntry.getKey();
}
}
- if (updateVersion > versionTimestamp.get()) {
- // Only update when the leadership information is latest
- versionTimestamp.set(updateVersion);
- leaderDataNodeId.set(updateLeaderDataNodeId);
- }
-
- return originLeaderDataNodeId != leaderDataNodeId.get();
+ return originLeaderDataNodeId != leaderDataNodeId;
}
- @Override
- public void removeCacheIfExists(Integer dataNodeId) {
- synchronized (slidingWindow) {
- slidingWindow.remove(dataNodeId);
- }
+ public void removeCacheIfExists(int dataNodeId) {
+ regionCacheMap.remove(dataNodeId);
}
- @Override
public int getLeaderDataNodeId() {
- return leaderDataNodeId.get();
+ return leaderDataNodeId;
}
- @Override
+ public RegionStatus getRegionStatus(int dataNodeId) {
+ return regionCacheMap.containsKey(dataNodeId)
+ ? regionCacheMap.get(dataNodeId).getStatus()
+ : RegionStatus.Unknown;
+ }
+
public TConsensusGroupId getConsensusGroupId() {
return consensusGroupId;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 7cd539b..84abd26 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -71,7 +71,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -459,7 +458,10 @@
regionInfoList.addAll(storageGroupPartitionTable.getRegionInfoList(regionsInfoPlan));
});
regionInfoList.sort(
- Comparator.comparingInt(regionId -> regionId.getConsensusGroupId().getId()));
+ (o1, o2) ->
+ o1.getConsensusGroupId().getId() != o2.getConsensusGroupId().getId()
+ ? o1.getConsensusGroupId().getId() - o2.getConsensusGroupId().getId()
+ : o1.getDataNodeId() - o2.getDataNodeId());
regionResp.setRegionInfoList(regionInfoList);
regionResp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
return regionResp;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index f527ea1..e45c7dd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -24,7 +24,6 @@
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.confignode.consensus.request.read.GetRegionInfoListPlan;
@@ -300,9 +299,9 @@
regionGroupMap.forEach(
(consensusGroupId, regionGroup) -> {
- if (showRegionReq == null || showRegionReq.getConsensusGroupType() == null) {
- regionInfoList.addAll(buildRegionInfoList(regionGroup));
- } else if (showRegionReq.getConsensusGroupType().equals(regionGroup.getId().getType())) {
+ if (showRegionReq == null
+ || showRegionReq.getConsensusGroupType() == null
+ || showRegionReq.getConsensusGroupType().equals(regionGroup.getId().getType())) {
regionInfoList.addAll(buildRegionInfoList(regionGroup));
}
});
@@ -327,8 +326,6 @@
regionInfo.setDataNodeId(dataNodeLocation.getDataNodeId());
regionInfo.setClientRpcIp(dataNodeLocation.getClientRpcEndPoint().getIp());
regionInfo.setClientRpcPort(dataNodeLocation.getClientRpcEndPoint().getPort());
- // TODO: Maintain Region status
- regionInfo.setStatus(RegionStatus.Up.getStatus());
regionInfoList.add(regionInfo);
});
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
index de50103..97de8d4 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
@@ -26,7 +26,6 @@
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.manager.load.heartbeat.BaseNodeCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.heartbeat.RegionGroupCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.RegionHeartbeatSample;
@@ -96,7 +95,7 @@
List<TRegionReplicaSet> regionReplicaSets = Arrays.asList(regionReplicaSet1, regionReplicaSet2);
// Build regionGroupCacheMap
- Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap = new HashMap<>();
+ Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap = new HashMap<>();
regionGroupCacheMap.put(groupId1, new RegionGroupCache(groupId1));
regionGroupCacheMap.put(groupId2, new RegionGroupCache(groupId2));
@@ -124,7 +123,7 @@
Map<TConsensusGroupId, Integer> leaderMap = new HashMap<>();
regionGroupCacheMap
.values()
- .forEach(regionGroupCache -> Assert.assertTrue(regionGroupCache.updateLoadStatistic()));
+ .forEach(regionGroupCache -> Assert.assertTrue(regionGroupCache.updateRegionStatistics()));
regionGroupCacheMap.forEach(
(groupId, regionGroupCache) ->
leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
@@ -168,7 +167,7 @@
// Get leaderMap
leaderMap.clear();
- regionGroupCacheMap.values().forEach(IRegionGroupCache::updateLoadStatistic);
+ regionGroupCacheMap.values().forEach(RegionGroupCache::updateRegionStatistics);
regionGroupCacheMap.forEach(
(groupId, regionGroupCache) ->
leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
@@ -207,7 +206,7 @@
leaderMap.clear();
regionGroupCacheMap
.values()
- .forEach(regionGroupCache -> Assert.assertTrue(regionGroupCache.updateLoadStatistic()));
+ .forEach(regionGroupCache -> Assert.assertTrue(regionGroupCache.updateRegionStatistics()));
regionGroupCacheMap.forEach(
(groupId, regionGroupCache) ->
leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
index ca1282b..6d60ec6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
@@ -19,10 +19,13 @@
package org.apache.iotdb.commons.cluster;
-/** Node status for showing regions */
+/** Region status for showing regions */
public enum RegionStatus {
- // Node running properly
- Up("Up");
+ /** Region running properly */
+ Running("Running"),
+
+ /** Region connection failure */
+ Unknown("Unknown");
private final String status;