Ratis-2031. Add peer info to response of GroupInfoCommand CLI (#1047)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index db19831..003f202 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -364,6 +364,7 @@
         b.setIsRaftStorageHealthy(reply.isRaftStorageHealthy());
         b.setRole(reply.getRoleInfoProto());
         b.addAllCommitInfos(reply.getCommitInfos());
+        b.setLogInfo(reply.getLogInfoProto());
       }
     }
     return b.build();
@@ -506,7 +507,8 @@
         ProtoUtils.toRaftGroup(replyProto.getGroup()),
         replyProto.getRole(),
         replyProto.getIsRaftStorageHealthy(),
-        replyProto.hasConf()? replyProto.getConf(): null);
+        replyProto.hasConf()? replyProto.getConf(): null,
+        replyProto.getLogInfo());
   }
 
   static Message toMessage(final ClientMessageEntryProto p) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
index 632fa65..bfac81a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
@@ -19,6 +19,7 @@
 
 import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.LogInfoProto;
 import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
 
 import java.util.Collection;
@@ -33,25 +34,27 @@
   private final RoleInfoProto roleInfoProto;
   private final boolean isRaftStorageHealthy;
   private final RaftConfigurationProto conf;
+  private final LogInfoProto logInfoProto;
 
   public GroupInfoReply(RaftClientRequest request, Collection<CommitInfoProto> commitInfos,
       RaftGroup group, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy,
-      RaftConfigurationProto conf) {
+      RaftConfigurationProto conf, LogInfoProto logInfoProto) {
     this(request.getClientId(), request.getServerId(), request.getRaftGroupId(),
         request.getCallId(), commitInfos,
-        group, roleInfoProto, isRaftStorageHealthy, conf);
+        group, roleInfoProto, isRaftStorageHealthy, conf, logInfoProto);
   }
 
   @SuppressWarnings("parameternumber")
   public GroupInfoReply(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId,
       Collection<CommitInfoProto> commitInfos,
       RaftGroup group, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy,
-      RaftConfigurationProto conf) {
+      RaftConfigurationProto conf, LogInfoProto logInfoProto) {
     super(clientId, serverId, groupId, callId, true, null, null, 0L, commitInfos);
     this.group = group;
     this.roleInfoProto = roleInfoProto;
     this.isRaftStorageHealthy = isRaftStorageHealthy;
     this.conf = conf;
+    this.logInfoProto = logInfoProto;
   }
 
   public RaftGroup getGroup() {
@@ -69,4 +72,8 @@
   public Optional<RaftConfigurationProto> getConf() {
     return Optional.ofNullable(conf);
   }
+
+  public LogInfoProto getLogInfoProto() {
+    return logInfoProto;
+  }
 }
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 586ec1b..edc57ec 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -556,4 +556,14 @@
   bool isRaftStorageHealthy = 4;
   repeated CommitInfoProto commitInfos = 5;
   RaftConfigurationProto conf = 6;
+  LogInfoProto logInfo = 7;
+}
+
+/** Add new LogInfoProto for RATIS-2030, allow GroupInfoCommand to show each server's last committed log,
+    last applied log, last snapshot log, last entry log.*/
+message LogInfoProto {
+  TermIndexProto lastSnapshot = 1;
+  TermIndexProto applied = 2;
+  TermIndexProto committed = 3;
+  TermIndexProto lastEntry = 4;
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 0885fb8..17a741e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -24,6 +24,7 @@
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.LogInfoProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
@@ -642,7 +643,20 @@
     final RaftConfigurationProto conf =
         LogProtoUtils.toRaftConfigurationProtoBuilder(getRaftConf()).build();
     return new GroupInfoReply(request, getCommitInfos(), getGroup(), getRoleInfoProto(),
-        dir.isHealthy(), conf);
+        dir.isHealthy(), conf, getLogInfo());
+  }
+
+  LogInfoProto getLogInfo(){
+    final RaftLog log = getRaftLog();
+    LogInfoProto.Builder logInfoBuilder = LogInfoProto.newBuilder()
+        .setApplied(getStateMachine().getLastAppliedTermIndex().toProto())
+        .setCommitted(log.getTermIndex(log.getLastCommittedIndex()).toProto())
+        .setLastEntry(log.getLastEntryTermIndex().toProto());
+    final SnapshotInfo snapshot = getStateMachine().getLatestSnapshot();
+    if (snapshot != null) {
+      logInfoBuilder.setLastSnapshot(snapshot.getTermIndex().toProto());
+    }
+    return logInfoBuilder.build();
   }
 
   RoleInfoProto getRoleInfoProto() {
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/group/GroupInfoCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/group/GroupInfoCommand.java
index d2c4e65..0125440 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/group/GroupInfoCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/group/GroupInfoCommand.java
@@ -54,6 +54,7 @@
       printf("leader info: %s(%s)%n%n", leader.getId().toStringUtf8(), leader.getAddress());
     }
     println(reply.getCommitInfos());
+    println(reply.getLogInfoProto());
     return 0;
   }
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/GroupCommandIntegrationTest.java b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/GroupCommandIntegrationTest.java
index 83c05bf..6900d48 100644
--- a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/GroupCommandIntegrationTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/GroupCommandIntegrationTest.java
@@ -20,6 +20,7 @@
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.MiniRaftCluster;
@@ -90,4 +91,52 @@
     String info = result.substring(0, hearder.length());
     Assertions.assertEquals(hearder, info);
   }
+
+  @Test
+  public void testGroupInfoCommandIncludesCorrectLogInfo() throws Exception {
+    // set number of server to 1 so that we can make sure which server returns the LogInfoProto
+    // since information of applied index, snapshot index, and last entry index are not shared between servers
+    runWithNewCluster(1, this::runTestGroupInfoCommandWithLogInfoVerification);
+  }
+
+  void runTestGroupInfoCommandWithLogInfoVerification(MiniRaftCluster cluster) throws Exception {
+    RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+
+    try (final RaftClient client = cluster.createClient(leader.getId())) {
+      for (int i = 0; i < RaftServerConfigKeys.Snapshot.creationGap(getProperties()); i++) {
+        RaftClientReply
+            reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
+        Assertions.assertTrue(reply.isSuccess());
+      }
+    }
+
+    leader.getStateMachine().takeSnapshot();
+
+    final String address = getClusterAddress(cluster);
+    final StringPrintStream out = new StringPrintStream();
+    RatisShell shell = new RatisShell(out.getPrintStream());
+    int ret = shell.run("group", "info", "-peers", address);
+    Assertions.assertEquals(0 , ret);
+    String result = out.toString().trim();
+    String hearder = String.format("group id: %s%sleader info: %s(%s)%s%s",
+        cluster.getGroupId().getUuid(), NEW_LINE, leader.getId(),
+        cluster.getLeader().getPeer().getAddress(), NEW_LINE, NEW_LINE);
+    String info = result.substring(0, hearder.length());
+    Assertions.assertEquals(hearder, info);
+    long currentTerm = leader.getInfo().getCurrentTerm();
+    String LogInfoProtoFormat = "%s {" + NEW_LINE + "  term: " + currentTerm + NEW_LINE + "  index: %s";
+    Assertions.assertTrue(result.contains(
+        String.format(LogInfoProtoFormat, "applied",
+            leader.getStateMachine().getLastAppliedTermIndex().getIndex())));
+    Assertions.assertTrue(result.contains(
+        String.format(LogInfoProtoFormat, "committed",
+            leader.getRaftLog().getLastCommittedIndex())));
+    Assertions.assertTrue(result.contains(
+        String.format(LogInfoProtoFormat, "lastSnapshot",
+            leader.getStateMachine().getLatestSnapshot().getIndex())));
+    Assertions.assertTrue(result.contains(
+        String.format(LogInfoProtoFormat, "lastEntry",
+            leader.getRaftLog().getLastCommittedIndex())));
+  }
+
 }