Ratis-2031. Add peer info to response of GroupInfoCommand CLI (#1047)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index db19831..003f202 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -364,6 +364,7 @@
b.setIsRaftStorageHealthy(reply.isRaftStorageHealthy());
b.setRole(reply.getRoleInfoProto());
b.addAllCommitInfos(reply.getCommitInfos());
+ b.setLogInfo(reply.getLogInfoProto());
}
}
return b.build();
@@ -506,7 +507,8 @@
ProtoUtils.toRaftGroup(replyProto.getGroup()),
replyProto.getRole(),
replyProto.getIsRaftStorageHealthy(),
- replyProto.hasConf()? replyProto.getConf(): null);
+ replyProto.hasConf()? replyProto.getConf(): null,
+ replyProto.getLogInfo());
}
static Message toMessage(final ClientMessageEntryProto p) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
index 632fa65..bfac81a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
@@ -19,6 +19,7 @@
import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.LogInfoProto;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import java.util.Collection;
@@ -33,25 +34,27 @@
private final RoleInfoProto roleInfoProto;
private final boolean isRaftStorageHealthy;
private final RaftConfigurationProto conf;
+ private final LogInfoProto logInfoProto;
public GroupInfoReply(RaftClientRequest request, Collection<CommitInfoProto> commitInfos,
RaftGroup group, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy,
- RaftConfigurationProto conf) {
+ RaftConfigurationProto conf, LogInfoProto logInfoProto) {
this(request.getClientId(), request.getServerId(), request.getRaftGroupId(),
request.getCallId(), commitInfos,
- group, roleInfoProto, isRaftStorageHealthy, conf);
+ group, roleInfoProto, isRaftStorageHealthy, conf, logInfoProto);
}
@SuppressWarnings("parameternumber")
public GroupInfoReply(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId,
Collection<CommitInfoProto> commitInfos,
RaftGroup group, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy,
- RaftConfigurationProto conf) {
+ RaftConfigurationProto conf, LogInfoProto logInfoProto) {
super(clientId, serverId, groupId, callId, true, null, null, 0L, commitInfos);
this.group = group;
this.roleInfoProto = roleInfoProto;
this.isRaftStorageHealthy = isRaftStorageHealthy;
this.conf = conf;
+ this.logInfoProto = logInfoProto;
}
public RaftGroup getGroup() {
@@ -69,4 +72,8 @@
public Optional<RaftConfigurationProto> getConf() {
return Optional.ofNullable(conf);
}
+
+ public LogInfoProto getLogInfoProto() {
+ return logInfoProto;
+ }
}
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 586ec1b..edc57ec 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -556,4 +556,14 @@
bool isRaftStorageHealthy = 4;
repeated CommitInfoProto commitInfos = 5;
RaftConfigurationProto conf = 6;
+ LogInfoProto logInfo = 7;
+}
+
+/** Add new LogInfoProto for RATIS-2030, allow GroupInfoCommand to show each server's last committed log,
+ last applied log, last snapshot log, last entry log.*/
+message LogInfoProto {
+ TermIndexProto lastSnapshot = 1;
+ TermIndexProto applied = 2;
+ TermIndexProto committed = 3;
+ TermIndexProto lastEntry = 4;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 0885fb8..17a741e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -24,6 +24,7 @@
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.LogInfoProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
@@ -642,7 +643,20 @@
final RaftConfigurationProto conf =
LogProtoUtils.toRaftConfigurationProtoBuilder(getRaftConf()).build();
return new GroupInfoReply(request, getCommitInfos(), getGroup(), getRoleInfoProto(),
- dir.isHealthy(), conf);
+ dir.isHealthy(), conf, getLogInfo());
+ }
+
+ LogInfoProto getLogInfo(){
+ final RaftLog log = getRaftLog();
+ LogInfoProto.Builder logInfoBuilder = LogInfoProto.newBuilder()
+ .setApplied(getStateMachine().getLastAppliedTermIndex().toProto())
+ .setCommitted(log.getTermIndex(log.getLastCommittedIndex()).toProto())
+ .setLastEntry(log.getLastEntryTermIndex().toProto());
+ final SnapshotInfo snapshot = getStateMachine().getLatestSnapshot();
+ if (snapshot != null) {
+ logInfoBuilder.setLastSnapshot(snapshot.getTermIndex().toProto());
+ }
+ return logInfoBuilder.build();
}
RoleInfoProto getRoleInfoProto() {
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/group/GroupInfoCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/group/GroupInfoCommand.java
index d2c4e65..0125440 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/group/GroupInfoCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/group/GroupInfoCommand.java
@@ -54,6 +54,7 @@
printf("leader info: %s(%s)%n%n", leader.getId().toStringUtf8(), leader.getAddress());
}
println(reply.getCommitInfos());
+ println(reply.getLogInfoProto());
return 0;
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/GroupCommandIntegrationTest.java b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/GroupCommandIntegrationTest.java
index 83c05bf..6900d48 100644
--- a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/GroupCommandIntegrationTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/GroupCommandIntegrationTest.java
@@ -20,6 +20,7 @@
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
@@ -90,4 +91,52 @@
String info = result.substring(0, hearder.length());
Assertions.assertEquals(hearder, info);
}
+
+ @Test
+ public void testGroupInfoCommandIncludesCorrectLogInfo() throws Exception {
+ // set number of server to 1 so that we can make sure which server returns the LogInfoProto
+ // since information of applied index, snapshot index, and last entry index are not shared between servers
+ runWithNewCluster(1, this::runTestGroupInfoCommandWithLogInfoVerification);
+ }
+
+ void runTestGroupInfoCommandWithLogInfoVerification(MiniRaftCluster cluster) throws Exception {
+ RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+
+ try (final RaftClient client = cluster.createClient(leader.getId())) {
+ for (int i = 0; i < RaftServerConfigKeys.Snapshot.creationGap(getProperties()); i++) {
+ RaftClientReply
+ reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
+ Assertions.assertTrue(reply.isSuccess());
+ }
+ }
+
+ leader.getStateMachine().takeSnapshot();
+
+ final String address = getClusterAddress(cluster);
+ final StringPrintStream out = new StringPrintStream();
+ RatisShell shell = new RatisShell(out.getPrintStream());
+ int ret = shell.run("group", "info", "-peers", address);
+ Assertions.assertEquals(0 , ret);
+ String result = out.toString().trim();
+ String hearder = String.format("group id: %s%sleader info: %s(%s)%s%s",
+ cluster.getGroupId().getUuid(), NEW_LINE, leader.getId(),
+ cluster.getLeader().getPeer().getAddress(), NEW_LINE, NEW_LINE);
+ String info = result.substring(0, hearder.length());
+ Assertions.assertEquals(hearder, info);
+ long currentTerm = leader.getInfo().getCurrentTerm();
+ String LogInfoProtoFormat = "%s {" + NEW_LINE + " term: " + currentTerm + NEW_LINE + " index: %s";
+ Assertions.assertTrue(result.contains(
+ String.format(LogInfoProtoFormat, "applied",
+ leader.getStateMachine().getLastAppliedTermIndex().getIndex())));
+ Assertions.assertTrue(result.contains(
+ String.format(LogInfoProtoFormat, "committed",
+ leader.getRaftLog().getLastCommittedIndex())));
+ Assertions.assertTrue(result.contains(
+ String.format(LogInfoProtoFormat, "lastSnapshot",
+ leader.getStateMachine().getLatestSnapshot().getIndex())));
+ Assertions.assertTrue(result.contains(
+ String.format(LogInfoProtoFormat, "lastEntry",
+ leader.getRaftLog().getLastCommittedIndex())));
+ }
+
}