RATIS-1967. Do not store CommitInfoProto in CommitInfoCache. (#988)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
index aa0bb05..04210d7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
@@ -24,30 +24,45 @@
 import org.apache.ratis.util.ProtoUtils;
 
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 /** Caching the commit information. */
 class CommitInfoCache {
-  private final ConcurrentMap<RaftPeerId, CommitInfoProto> map = new ConcurrentHashMap<>();
+  private final ConcurrentMap<RaftPeerId, Long> map = new ConcurrentHashMap<>();
 
-  CommitInfoProto get(RaftPeerId id) {
-    return map.get(id);
+  Optional<Long> get(RaftPeerId id) {
+    return Optional.ofNullable(map.get(id));
   }
 
   CommitInfoProto update(RaftPeer peer, long newCommitIndex) {
     Objects.requireNonNull(peer, "peer == null");
-    return map.compute(peer.getId(), (id, old) ->
-        old == null || newCommitIndex > old.getCommitIndex()? ProtoUtils.toCommitInfoProto(peer, newCommitIndex): old);
+    final long updated = update(peer.getId(), newCommitIndex);
+    return ProtoUtils.toCommitInfoProto(peer, updated);
   }
 
-  CommitInfoProto update(CommitInfoProto newInfo) {
-    return map.compute(RaftPeerId.valueOf(newInfo.getServer().getId()),
-        (id, old) -> old == null || newInfo.getCommitIndex() > old.getCommitIndex()? newInfo: old);
+  long update(RaftPeerId peerId, long newCommitIndex) {
+    Objects.requireNonNull(peerId, "peerId == null");
+    return map.compute(peerId, (id, oldCommitIndex) -> {
+      if (oldCommitIndex != null) {
+        // get around BX_UNBOXING_IMMEDIATELY_REBOXED
+        final long old = oldCommitIndex;
+        if (old >= newCommitIndex) {
+          return old;
+        }
+      }
+      return newCommitIndex;
+    });
+  }
+
+  void update(CommitInfoProto newInfo) {
+    final RaftPeerId id = RaftPeerId.valueOf(newInfo.getServer().getId());
+    update(id, newInfo.getCommitIndex());
   }
 
   @Override
   public String toString() {
-    return JavaUtils.getClassSimpleName(getClass()) + ":" + map.values();
+    return JavaUtils.getClassSimpleName(getClass()) + ":" + map;
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
index ed51a65..0e020b7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
@@ -17,6 +17,9 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftConfiguration;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
@@ -31,6 +34,7 @@
  * entries.
  */
 public class ConfigurationManager {
+  private final RaftPeerId id;
   private final RaftConfigurationImpl initialConf;
   private final NavigableMap<Long, RaftConfigurationImpl> configurations = new TreeMap<>();
   /**
@@ -38,10 +42,21 @@
    * the last entry of the map. Otherwise is initialConf.
    */
   private volatile RaftConfigurationImpl currentConf;
+  /** Cache the peer corresponding to {@link #id}. */
+  private volatile RaftPeer currentPeer;
 
-  ConfigurationManager(RaftConfigurationImpl initialConf) {
+  ConfigurationManager(RaftPeerId id, RaftConfigurationImpl initialConf) {
+    this.id = id;
     this.initialConf = initialConf;
-    this.currentConf = initialConf;
+    setCurrentConf(initialConf);
+  }
+
+  private void setCurrentConf(RaftConfigurationImpl currentConf) {
+    this.currentConf = currentConf;
+    final RaftPeer peer = currentConf.getPeer(id, RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER);
+    if (peer != null) {
+      this.currentPeer = peer;
+    }
   }
 
   synchronized void addConfiguration(RaftConfiguration conf) {
@@ -57,7 +72,7 @@
   private void addRaftConfigurationImpl(long logIndex, RaftConfigurationImpl conf) {
     configurations.put(logIndex, conf);
     if (logIndex == configurations.lastEntry().getKey()) {
-      currentConf = conf;
+      setCurrentConf(conf);
     }
   }
 
@@ -65,21 +80,24 @@
     return currentConf;
   }
 
+  RaftPeer getCurrentPeer() {
+    return currentPeer;
+  }
+
   /**
    * Remove all the configurations whose log index is >= the given index.
+   *
    * @param index The given index. All the configurations whose log index is >=
    *              this value will be removed.
-   * @return The configuration with largest log index < the given index.
    */
-  synchronized RaftConfiguration removeConfigurations(long index) {
+  synchronized void removeConfigurations(long index) {
     // remove all configurations starting at the index
-    for(final Iterator<?> iter = configurations.tailMap(index).entrySet().iterator(); iter.hasNext();) {
-      iter.next();
-      iter.remove();
+    final SortedMap<Long, RaftConfigurationImpl> tail = configurations.tailMap(index);
+    if (tail.isEmpty()) {
+      return;
     }
-    currentConf = configurations.isEmpty() ? initialConf :
-        configurations.lastEntry().getValue();
-    return currentConf;
+    tail.clear();
+    setCurrentConf(configurations.isEmpty() ? initialConf : configurations.lastEntry().getValue());
   }
 
   synchronized int numOfConf() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 5e6c812..ed5457b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -275,7 +275,7 @@
     this.leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics(
         getMemberId(), state::getLastLeaderElapsedTimeMs);
     this.raftServerMetrics = RaftServerMetricsImpl.computeIfAbsentRaftServerMetrics(
-        getMemberId(), () -> commitInfoCache::get, retryCache::getStatistics);
+        getMemberId(), this::getCommitIndex, retryCache::getStatistics);
 
     this.startComplete = new AtomicBoolean(false);
     this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString());
@@ -294,6 +294,10 @@
         id + "-client");
   }
 
+  private long getCommitIndex(RaftPeerId id) {
+    return commitInfoCache.get(id).orElse(0L);
+  }
+
   @Override
   public DivisionProperties properties() {
     return divisionProperties;
@@ -453,6 +457,12 @@
   }
 
   @Override
+  public RaftPeer getPeer() {
+    return Optional.ofNullable(getState().getCurrentPeer())
+        .orElseGet(() -> getRaftServer().getPeer());
+  }
+
+  @Override
   public DivisionInfo getInfo() {
     return info;
   }
@@ -622,7 +632,8 @@
   public Collection<CommitInfoProto> getCommitInfos() {
     final List<CommitInfoProto> infos = new ArrayList<>();
     // add the commit info of this server
-    infos.add(updateCommitInfoCache());
+    final long commitIndex = updateCommitInfoCache();
+    infos.add(ProtoUtils.toCommitInfoProto(getPeer(), commitIndex));
 
     // add the commit infos of other servers
     if (getInfo().isLeader()) {
@@ -633,9 +644,10 @@
       Stream.concat(
               raftConf.getAllPeers(RaftPeerRole.FOLLOWER).stream(),
               raftConf.getAllPeers(RaftPeerRole.LISTENER).stream())
-          .map(RaftPeer::getId)
-          .filter(id -> !id.equals(getId()))
-          .map(commitInfoCache::get)
+          .filter(peer -> !peer.getId().equals(getId()))
+          .map(peer -> commitInfoCache.get(peer.getId())
+              .map(index -> ProtoUtils.toCommitInfoProto(peer, index))
+              .orElse(null))
           .filter(Objects::nonNull)
           .forEach(infos::add);
     }
@@ -1534,8 +1546,8 @@
     }
   }
 
-  private CommitInfoProto updateCommitInfoCache() {
-    return commitInfoCache.update(getPeer(), state.getLog().getLastCommittedIndex());
+  private long updateCommitInfoCache() {
+    return commitInfoCache.update(getId(), state.getLog().getLastCommittedIndex());
   }
 
   ExecutorService getServerExecutor() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 07c0e6c..e21f63c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -109,7 +109,7 @@
     final RaftConfigurationImpl initialConf = RaftConfigurationImpl.newBuilder()
         .setConf(followerPeers, listenerPeers)
         .build();
-    configurationManager = new ConfigurationManager(initialConf);
+    configurationManager = new ConfigurationManager(id, initialConf);
     LOG.info("{}: {}", getMemberId(), configurationManager);
 
     final String storageDirName = group.getGroupId().getUuid().toString();
@@ -196,6 +196,10 @@
     return configurationManager.getCurrent();
   }
 
+  RaftPeer getCurrentPeer() {
+    return configurationManager.getCurrentPeer();
+  }
+
   long getCurrentTerm() {
     return currentTerm.get();
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
index 1714517..cdbce6e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
@@ -18,11 +18,11 @@
 
 package org.apache.ratis.server.metrics;
 
-import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.function.ToLongFunction;
 
 import org.apache.ratis.metrics.LongCounter;
 import org.apache.ratis.metrics.Timekeeper;
@@ -30,7 +30,6 @@
 import org.apache.ratis.metrics.MetricRegistryInfo;
 import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
-import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
 import org.apache.ratis.protocol.RaftClientRequest.Type;
 import org.apache.ratis.protocol.RaftGroupMemberId;
@@ -98,7 +97,7 @@
 
   /** Follower Id -> heartbeat elapsed */
   private final Map<RaftPeerId, Long> followerLastHeartbeatElapsedTimeMap = new ConcurrentHashMap<>();
-  private final Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache;
+  private final ToLongFunction<RaftPeerId> commitInfoCache;
 
   /** id -> metric */
   private static final Map<RaftGroupMemberId, RaftServerMetricsImpl> METRICS = new ConcurrentHashMap<>();
@@ -111,7 +110,7 @@
   }
 
   public static RaftServerMetricsImpl computeIfAbsentRaftServerMetrics(RaftGroupMemberId serverId,
-      Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache,
+      ToLongFunction<RaftPeerId> commitInfoCache,
       Supplier<RetryCache.Statistics> retryCacheStatistics) {
     return METRICS.computeIfAbsent(serverId,
         key -> new RaftServerMetricsImpl(serverId, commitInfoCache, retryCacheStatistics));
@@ -122,7 +121,7 @@
   }
 
   public RaftServerMetricsImpl(RaftGroupMemberId serverId,
-      Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache,
+      ToLongFunction<RaftPeerId> commitInfoCache,
       Supplier<RetryCache.Statistics> retryCacheStatistics) {
     super(createRegistry(serverId.toString()));
     this.commitInfoCache = commitInfoCache;
@@ -183,10 +182,8 @@
    * Register a commit index tracker for the peer in cluster.
    */
   private void addPeerCommitIndexGauge(RaftPeerId peerId) {
-    getRegistry().gauge(getPeerCommitIndexGaugeKey(peerId), () -> () -> Optional.ofNullable(commitInfoCache.get())
-        .map(cache -> cache.apply(peerId))
-        .map(CommitInfoProto::getCommitIndex)
-        .orElse(0L));
+    getRegistry().gauge(getPeerCommitIndexGaugeKey(peerId),
+        () -> () -> commitInfoCache.applyAsLong(peerId));
   }
 
   @VisibleForTesting
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
index e5222d2..b25a50b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
@@ -52,10 +52,10 @@
       retryCache = new RetryCacheImpl(RaftServerConfigKeys.RetryCache.EXPIRY_TIME_DEFAULT, null);
 
       final RaftServerMetricsImpl raftServerMetrics = RaftServerMetricsImpl.computeIfAbsentRaftServerMetrics(
-          raftGroupMemberId, () -> null, retryCache::getStatistics);
+          raftGroupMemberId, id -> 0L, retryCache::getStatistics);
       ratisMetricRegistry = (RatisMetricRegistryImpl) raftServerMetrics.getRegistry();
     }
-    
+
     @After
     public void tearDown() {
         retryCache.close();
@@ -92,23 +92,23 @@
     }
 
     private static void checkHit(long count, double rate) {
-      Long hitCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
+      final long hitCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
           s.contains(RETRY_CACHE_HIT_COUNT_METRIC)).values().iterator().next().getValue();
-      assertEquals(hitCount.longValue(), count);
+      assertEquals(hitCount, count);
 
-      Double hitRate = (Double) ratisMetricRegistry.getGauges((s, metric) ->
+      final double hitRate = (Double) ratisMetricRegistry.getGauges((s, metric) ->
           s.contains(RETRY_CACHE_HIT_RATE_METRIC)).values().iterator().next().getValue();
-      assertEquals(hitRate.doubleValue(), rate, 0.0);
+      assertEquals(hitRate, rate, 0.0);
     }
 
     private static void checkMiss(long count, double rate) {
-      Long missCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
+      final long missCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
           s.contains(RETRY_CACHE_MISS_COUNT_METRIC)).values().iterator().next().getValue();
-      assertEquals(missCount.longValue(), count);
+      assertEquals(missCount, count);
 
-      Double missRate = (Double) ratisMetricRegistry.getGauges((s, metric) ->
+      final double missRate = (Double) ratisMetricRegistry.getGauges((s, metric) ->
           s.contains(RETRY_CACHE_MISS_RATE_METRIC)).values().iterator().next().getValue();
-      assertEquals(missRate.doubleValue(), rate, 0.0);
+      assertEquals(missRate, rate, 0.0);
     }
 
     private static void checkEntryCount(long expected) {