RATIS-1967. Do not store CommitInfoProto in CommitInfoCache. (#988)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
index aa0bb05..04210d7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
@@ -24,30 +24,45 @@
import org.apache.ratis.util.ProtoUtils;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/** Caching the commit information. */
class CommitInfoCache {
- private final ConcurrentMap<RaftPeerId, CommitInfoProto> map = new ConcurrentHashMap<>();
+ private final ConcurrentMap<RaftPeerId, Long> map = new ConcurrentHashMap<>();
- CommitInfoProto get(RaftPeerId id) {
- return map.get(id);
+ Optional<Long> get(RaftPeerId id) {
+ return Optional.ofNullable(map.get(id));
}
CommitInfoProto update(RaftPeer peer, long newCommitIndex) {
Objects.requireNonNull(peer, "peer == null");
- return map.compute(peer.getId(), (id, old) ->
- old == null || newCommitIndex > old.getCommitIndex()? ProtoUtils.toCommitInfoProto(peer, newCommitIndex): old);
+ final long updated = update(peer.getId(), newCommitIndex);
+ return ProtoUtils.toCommitInfoProto(peer, updated);
}
- CommitInfoProto update(CommitInfoProto newInfo) {
- return map.compute(RaftPeerId.valueOf(newInfo.getServer().getId()),
- (id, old) -> old == null || newInfo.getCommitIndex() > old.getCommitIndex()? newInfo: old);
+ long update(RaftPeerId peerId, long newCommitIndex) {
+ Objects.requireNonNull(peerId, "peerId == null");
+ return map.compute(peerId, (id, oldCommitIndex) -> {
+ if (oldCommitIndex != null) {
+ // get around BX_UNBOXING_IMMEDIATELY_REBOXED
+ final long old = oldCommitIndex;
+ if (old >= newCommitIndex) {
+ return old;
+ }
+ }
+ return newCommitIndex;
+ });
+ }
+
+ void update(CommitInfoProto newInfo) {
+ final RaftPeerId id = RaftPeerId.valueOf(newInfo.getServer().getId());
+ update(id, newInfo.getCommitIndex());
}
@Override
public String toString() {
- return JavaUtils.getClassSimpleName(getClass()) + ":" + map.values();
+ return JavaUtils.getClassSimpleName(getClass()) + ":" + map;
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
index ed51a65..0e020b7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
@@ -17,6 +17,9 @@
*/
package org.apache.ratis.server.impl;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
@@ -31,6 +34,7 @@
* entries.
*/
public class ConfigurationManager {
+ private final RaftPeerId id;
private final RaftConfigurationImpl initialConf;
private final NavigableMap<Long, RaftConfigurationImpl> configurations = new TreeMap<>();
/**
@@ -38,10 +42,21 @@
* the last entry of the map. Otherwise is initialConf.
*/
private volatile RaftConfigurationImpl currentConf;
+ /** Cache the peer corresponding to {@link #id}. */
+ private volatile RaftPeer currentPeer;
- ConfigurationManager(RaftConfigurationImpl initialConf) {
+ ConfigurationManager(RaftPeerId id, RaftConfigurationImpl initialConf) {
+ this.id = id;
this.initialConf = initialConf;
- this.currentConf = initialConf;
+ setCurrentConf(initialConf);
+ }
+
+ private void setCurrentConf(RaftConfigurationImpl currentConf) {
+ this.currentConf = currentConf;
+ final RaftPeer peer = currentConf.getPeer(id, RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER);
+ if (peer != null) {
+ this.currentPeer = peer;
+ }
}
synchronized void addConfiguration(RaftConfiguration conf) {
@@ -57,7 +72,7 @@
private void addRaftConfigurationImpl(long logIndex, RaftConfigurationImpl conf) {
configurations.put(logIndex, conf);
if (logIndex == configurations.lastEntry().getKey()) {
- currentConf = conf;
+ setCurrentConf(conf);
}
}
@@ -65,21 +80,24 @@
return currentConf;
}
+ RaftPeer getCurrentPeer() {
+ return currentPeer;
+ }
+
/**
* Remove all the configurations whose log index is >= the given index.
+ *
* @param index The given index. All the configurations whose log index is >=
* this value will be removed.
- * @return The configuration with largest log index < the given index.
*/
- synchronized RaftConfiguration removeConfigurations(long index) {
+ synchronized void removeConfigurations(long index) {
// remove all configurations starting at the index
- for(final Iterator<?> iter = configurations.tailMap(index).entrySet().iterator(); iter.hasNext();) {
- iter.next();
- iter.remove();
+ final SortedMap<Long, RaftConfigurationImpl> tail = configurations.tailMap(index);
+ if (tail.isEmpty()) {
+ return;
}
- currentConf = configurations.isEmpty() ? initialConf :
- configurations.lastEntry().getValue();
- return currentConf;
+ tail.clear();
+ setCurrentConf(configurations.isEmpty() ? initialConf : configurations.lastEntry().getValue());
}
synchronized int numOfConf() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 5e6c812..ed5457b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -275,7 +275,7 @@
this.leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics(
getMemberId(), state::getLastLeaderElapsedTimeMs);
this.raftServerMetrics = RaftServerMetricsImpl.computeIfAbsentRaftServerMetrics(
- getMemberId(), () -> commitInfoCache::get, retryCache::getStatistics);
+ getMemberId(), this::getCommitIndex, retryCache::getStatistics);
this.startComplete = new AtomicBoolean(false);
this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString());
@@ -294,6 +294,10 @@
id + "-client");
}
+ private long getCommitIndex(RaftPeerId id) {
+ return commitInfoCache.get(id).orElse(0L);
+ }
+
@Override
public DivisionProperties properties() {
return divisionProperties;
@@ -453,6 +457,12 @@
}
@Override
+ public RaftPeer getPeer() {
+ return Optional.ofNullable(getState().getCurrentPeer())
+ .orElseGet(() -> getRaftServer().getPeer());
+ }
+
+ @Override
public DivisionInfo getInfo() {
return info;
}
@@ -622,7 +632,8 @@
public Collection<CommitInfoProto> getCommitInfos() {
final List<CommitInfoProto> infos = new ArrayList<>();
// add the commit info of this server
- infos.add(updateCommitInfoCache());
+ final long commitIndex = updateCommitInfoCache();
+ infos.add(ProtoUtils.toCommitInfoProto(getPeer(), commitIndex));
// add the commit infos of other servers
if (getInfo().isLeader()) {
@@ -633,9 +644,10 @@
Stream.concat(
raftConf.getAllPeers(RaftPeerRole.FOLLOWER).stream(),
raftConf.getAllPeers(RaftPeerRole.LISTENER).stream())
- .map(RaftPeer::getId)
- .filter(id -> !id.equals(getId()))
- .map(commitInfoCache::get)
+ .filter(peer -> !peer.getId().equals(getId()))
+ .map(peer -> commitInfoCache.get(peer.getId())
+ .map(index -> ProtoUtils.toCommitInfoProto(peer, index))
+ .orElse(null))
.filter(Objects::nonNull)
.forEach(infos::add);
}
@@ -1534,8 +1546,8 @@
}
}
- private CommitInfoProto updateCommitInfoCache() {
- return commitInfoCache.update(getPeer(), state.getLog().getLastCommittedIndex());
+ private long updateCommitInfoCache() {
+ return commitInfoCache.update(getId(), state.getLog().getLastCommittedIndex());
}
ExecutorService getServerExecutor() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 07c0e6c..e21f63c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -109,7 +109,7 @@
final RaftConfigurationImpl initialConf = RaftConfigurationImpl.newBuilder()
.setConf(followerPeers, listenerPeers)
.build();
- configurationManager = new ConfigurationManager(initialConf);
+ configurationManager = new ConfigurationManager(id, initialConf);
LOG.info("{}: {}", getMemberId(), configurationManager);
final String storageDirName = group.getGroupId().getUuid().toString();
@@ -196,6 +196,10 @@
return configurationManager.getCurrent();
}
+ RaftPeer getCurrentPeer() {
+ return configurationManager.getCurrentPeer();
+ }
+
long getCurrentTerm() {
return currentTerm.get();
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
index 1714517..cdbce6e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
@@ -18,11 +18,11 @@
package org.apache.ratis.server.metrics;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.function.ToLongFunction;
import org.apache.ratis.metrics.LongCounter;
import org.apache.ratis.metrics.Timekeeper;
@@ -30,7 +30,6 @@
import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
-import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.protocol.RaftClientRequest.Type;
import org.apache.ratis.protocol.RaftGroupMemberId;
@@ -98,7 +97,7 @@
/** Follower Id -> heartbeat elapsed */
private final Map<RaftPeerId, Long> followerLastHeartbeatElapsedTimeMap = new ConcurrentHashMap<>();
- private final Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache;
+ private final ToLongFunction<RaftPeerId> commitInfoCache;
/** id -> metric */
private static final Map<RaftGroupMemberId, RaftServerMetricsImpl> METRICS = new ConcurrentHashMap<>();
@@ -111,7 +110,7 @@
}
public static RaftServerMetricsImpl computeIfAbsentRaftServerMetrics(RaftGroupMemberId serverId,
- Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache,
+ ToLongFunction<RaftPeerId> commitInfoCache,
Supplier<RetryCache.Statistics> retryCacheStatistics) {
return METRICS.computeIfAbsent(serverId,
key -> new RaftServerMetricsImpl(serverId, commitInfoCache, retryCacheStatistics));
@@ -122,7 +121,7 @@
}
public RaftServerMetricsImpl(RaftGroupMemberId serverId,
- Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache,
+ ToLongFunction<RaftPeerId> commitInfoCache,
Supplier<RetryCache.Statistics> retryCacheStatistics) {
super(createRegistry(serverId.toString()));
this.commitInfoCache = commitInfoCache;
@@ -183,10 +182,8 @@
* Register a commit index tracker for the peer in cluster.
*/
private void addPeerCommitIndexGauge(RaftPeerId peerId) {
- getRegistry().gauge(getPeerCommitIndexGaugeKey(peerId), () -> () -> Optional.ofNullable(commitInfoCache.get())
- .map(cache -> cache.apply(peerId))
- .map(CommitInfoProto::getCommitIndex)
- .orElse(0L));
+ getRegistry().gauge(getPeerCommitIndexGaugeKey(peerId),
+ () -> () -> commitInfoCache.applyAsLong(peerId));
}
@VisibleForTesting
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
index e5222d2..b25a50b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
@@ -52,10 +52,10 @@
retryCache = new RetryCacheImpl(RaftServerConfigKeys.RetryCache.EXPIRY_TIME_DEFAULT, null);
final RaftServerMetricsImpl raftServerMetrics = RaftServerMetricsImpl.computeIfAbsentRaftServerMetrics(
- raftGroupMemberId, () -> null, retryCache::getStatistics);
+ raftGroupMemberId, id -> 0L, retryCache::getStatistics);
ratisMetricRegistry = (RatisMetricRegistryImpl) raftServerMetrics.getRegistry();
}
-
+
@After
public void tearDown() {
retryCache.close();
@@ -92,23 +92,23 @@
}
private static void checkHit(long count, double rate) {
- Long hitCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
+ final long hitCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
s.contains(RETRY_CACHE_HIT_COUNT_METRIC)).values().iterator().next().getValue();
- assertEquals(hitCount.longValue(), count);
+ assertEquals(hitCount, count);
- Double hitRate = (Double) ratisMetricRegistry.getGauges((s, metric) ->
+ final double hitRate = (Double) ratisMetricRegistry.getGauges((s, metric) ->
s.contains(RETRY_CACHE_HIT_RATE_METRIC)).values().iterator().next().getValue();
- assertEquals(hitRate.doubleValue(), rate, 0.0);
+ assertEquals(hitRate, rate, 0.0);
}
private static void checkMiss(long count, double rate) {
- Long missCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
+ final long missCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
s.contains(RETRY_CACHE_MISS_COUNT_METRIC)).values().iterator().next().getValue();
- assertEquals(missCount.longValue(), count);
+ assertEquals(missCount, count);
- Double missRate = (Double) ratisMetricRegistry.getGauges((s, metric) ->
+ final double missRate = (Double) ratisMetricRegistry.getGauges((s, metric) ->
s.contains(RETRY_CACHE_MISS_RATE_METRIC)).values().iterator().next().getValue();
- assertEquals(missRate.doubleValue(), rate, 0.0);
+ assertEquals(missRate, rate, 0.0);
}
private static void checkEntryCount(long expected) {