[RatisConsensus] Retry Read Requests on StatusRuntimeException (#11679)

diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 2ae303e..386f0c2 100644
--- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -81,6 +81,7 @@
 import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.function.CheckedSupplier;
 import org.slf4j.Logger;
@@ -376,7 +377,27 @@
         RatisMetricsManager.getInstance().startReadTimer(consensusGroupType)) {
       reply =
           Retriable.attempt(
-              () -> server.submitClientRequest(request),
+              () -> {
+                try {
+                  return server.submitClientRequest(request);
+                } catch (
+                    IOException
+                        ioe) { // IOE indicates some unexpected errors, say StatusRuntimeException
+                  if (ioe.getCause() instanceof StatusRuntimeException) {
+                    // StatusRuntimeException will be thrown if the leader is offline (gRPC cannot
+                    // connect to the peer)
+                    // We can still retry in case it's a temporary network partition.
+                    return RaftClientReply.newBuilder()
+                        .setException(
+                            new ReadIndexException(
+                                "internal GRPC connection error:", ioe.getCause()))
+                        .setSuccess(false)
+                        .build();
+                  } else {
+                    throw ioe;
+                  }
+                }
+              },
               readRetryPolicy,
               () -> readRequest,
               logger);