Ratis-2040. Fix RaftPeerId generated by command of "raftMetaConf" to use real PeerId (#1060)

diff --git a/ratis-docs/src/site/markdown/cli.md b/ratis-docs/src/site/markdown/cli.md
index 60958fc..ab9f899 100644
--- a/ratis-docs/src/site/markdown/cli.md
+++ b/ratis-docs/src/site/markdown/cli.md
@@ -182,5 +182,5 @@
 ### local raftMetaConf
 Generate a new raft-meta.conf file based on original raft-meta.conf and new peers, which is used to move a raft node to a new node.
 ```
-$ ratis sh local raftMetaConf -peers <P0_HOST:P0_PORT,P1_HOST:P1_PORT,P2_HOST:P2_PORT> -path <PARENT_PATH_OF_RAFT_META_CONF>
+$ ratis sh local raftMetaConf -peers <[P0_ID|]P0_HOST:P0_PORT,[P1_ID|]P1_HOST:P1_PORT,[P2_ID|]P2_HOST:P2_PORT> -path <PARENT_PATH_OF_RAFT_META_CONF>
 ```
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/local/RaftMetaConfCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/local/RaftMetaConfCommand.java
index 231c643..9f0558c 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/local/RaftMetaConfCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/local/RaftMetaConfCommand.java
@@ -24,6 +24,7 @@
 import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
 import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.shell.cli.RaftUtils;
 import org.apache.ratis.shell.cli.sh.command.AbstractCommand;
 import org.apache.ratis.shell.cli.sh.command.Context;
@@ -32,11 +33,14 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Command for generate a new raft-meta.conf file based on original raft-meta.conf and new peers,
@@ -49,6 +53,7 @@
   private static final String RAFT_META_CONF = "raft-meta.conf";
   private static final String NEW_RAFT_META_CONF = "new-raft-meta.conf";
 
+  private static final String SEPARATOR = "\\|";
   /**
    * @param context command context
    */
@@ -69,11 +74,49 @@
       printf("peers or path can't be empty.");
       return -1;
     }
+    Set<String> addresses = new HashSet<>();
+    Set<String> ids = new HashSet<>();
     List<RaftPeerProto> raftPeerProtos = new ArrayList<>();
-    for (String address : peersStr.split(",")) {
-      String peerId = RaftUtils.getPeerId(parseInetSocketAddress(address)).toString();
+    for (String idWithAddress : peersStr.split(",")) {
+      String[] peerIdWithAddressArray = idWithAddress.split(SEPARATOR);
+
+      if (peerIdWithAddressArray.length < 1 || peerIdWithAddressArray.length > 2) {
+        String message =
+            "Failed to parse peer's ID and address for: %s, " +
+                "from option: -peers %s. \n" +
+                "Please make sure to provide list of peers" +
+                " in format <[P0_ID|]P0_HOST:P0_PORT,[P1_ID|]P1_HOST:P1_PORT,[P2_ID|]P2_HOST:P2_PORT>";
+        printf(message, idWithAddress, peersStr);
+        return -1;
+      }
+      InetSocketAddress inetSocketAddress = parseInetSocketAddress(
+          peerIdWithAddressArray[peerIdWithAddressArray.length - 1]);
+      String addressString = inetSocketAddress.toString();
+      if (addresses.contains(addressString)) {
+        printf("Found duplicated address: %s. Please make sure the address of peer have no duplicated value.",
+            addressString);
+        return -1;
+      }
+      addresses.add(addressString);
+
+      String peerId;
+      if (peerIdWithAddressArray.length == 2) {
+        // Peer ID is provided
+        peerId = RaftPeerId.getRaftPeerId(peerIdWithAddressArray[0]).toString();
+
+        if (ids.contains(peerId)) {
+          printf("Found duplicated ID: %s. Please make sure the ID of peer have no duplicated value.", peerId);
+          return -1;
+        }
+        ids.add(peerId);
+      } else {
+        // If peer ID is not provided, use host address as peerId value
+        peerId = RaftUtils.getPeerId(inetSocketAddress).toString();
+      }
+
       raftPeerProtos.add(RaftPeerProto.newBuilder()
-          .setId(ByteString.copyFrom(peerId.getBytes(StandardCharsets.UTF_8))).setAddress(address)
+          .setId(ByteString.copyFrom(peerId.getBytes(StandardCharsets.UTF_8)))
+          .setAddress(addressString)
           .setStartupRole(RaftPeerRole.FOLLOWER).build());
     }
     try (InputStream in = Files.newInputStream(Paths.get(path, RAFT_META_CONF));
@@ -93,7 +136,7 @@
   @Override
   public String getUsage() {
     return String.format("%s"
-            + " -%s <PEER0_HOST:PEER0_PORT,PEER1_HOST:PEER1_PORT,PEER2_HOST:PEER2_PORT>"
+            + " -%s <[P0_ID|]P0_HOST:P0_PORT,[P1_ID|]P1_HOST:P1_PORT,[P2_ID|]P2_HOST:P2_PORT>"
             + " -%s <PARENT_PATH_OF_RAFT_META_CONF>",
         getCommandName(), PEER_OPTION_NAME, PATH_OPTION_NAME);
   }
diff --git a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/LocalCommandIntegrationTest.java b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/LocalCommandIntegrationTest.java
new file mode 100644
index 0000000..4a07e37
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/LocalCommandIntegrationTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.shell.cli.sh;
+
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class LocalCommandIntegrationTest {
+
+  private static final String RAFT_META_CONF = "raft-meta.conf";
+  private static final String NEW_RAFT_META_CONF = "new-raft-meta.conf";
+  private static Pattern p = Pattern.compile("(?:\\w+\\|\\w+:\\d+,?)+");
+
+
+  @Test
+  public void testDuplicatedPeerAddresses() throws Exception {
+    String[] duplicatedAddressesList = {"peer1_ID1|host1:9872,peer2_ID|host2:9872,peer1_ID2|host1:9872",
+        "host1:9872,host2:9872,host1:9872"};
+
+    testDuplicatedPeers(duplicatedAddressesList, "address", "host1:9872");
+  }
+
+  @Test
+  public void testDuplicatedPeerIds() throws Exception {
+    String[] duplicatedIdsList = {"peer1_ID1|host1:9872,peer2_ID|host2:9872,peer1_ID1|host3:9872"};
+
+    testDuplicatedPeers(duplicatedIdsList, "ID", "peer1_ID1");
+  }
+
+  private void testDuplicatedPeers(String[] peersList, String expectedErrorMessagePart, String expectedDuplicatedValue) throws Exception {
+    for (String peersStr : peersList) {
+      StringPrintStream out = new StringPrintStream();
+      RatisShell shell = new RatisShell(out.getPrintStream());
+      int ret = shell.run("local", "raftMetaConf", "-peers", peersStr, "-path", "test");
+      Assertions.assertEquals(-1, ret);
+      String message = out.toString().trim();
+      Assertions.assertEquals(String.format("Found duplicated %s: %s. Please make sure the %s of peer have no duplicated value.",
+          expectedErrorMessagePart, expectedDuplicatedValue, expectedErrorMessagePart), message);
+    }
+  }
+
+  @Test
+  public void testRunMethod(@TempDir Path tempDir) throws Exception {
+    int index = 1;
+    generateRaftConf(tempDir.resolve(RAFT_META_CONF), index);
+
+     String[] testPeersListArray = {"peer1_ID|host1:9872,peer2_ID|host2:9872,peer3_ID|host3:9872",
+      "host1:9872,host2:9872,host3:9872"};
+
+    for (String peersListStr : testPeersListArray) {
+      generateRaftConf(tempDir, index);
+      StringPrintStream out = new StringPrintStream();
+      RatisShell shell = new RatisShell(out.getPrintStream());
+      int ret = shell.run("local", "raftMetaConf", "-peers", peersListStr, "-path", tempDir.toString());
+      Assertions.assertEquals(0, ret);
+
+      // read & verify the contents of the new-raft-meta.conf file
+      long indexFromNewConf;
+      List<RaftPeerProto> peers;
+      try (InputStream in = Files.newInputStream(tempDir.resolve(NEW_RAFT_META_CONF))) {
+        LogEntryProto logEntry = LogEntryProto.newBuilder().mergeFrom(in).build();
+        indexFromNewConf = logEntry.getIndex();
+        peers = logEntry.getConfigurationEntry().getPeersList();
+      }
+
+      Assertions.assertEquals(index + 1, indexFromNewConf);
+
+      String peersListStrFromNewMetaConf;
+      if (containsPeerId(peersListStr)) {
+        peersListStrFromNewMetaConf = peers.stream()
+            .map(peer -> peer.getId().toStringUtf8() + "|" + peer.getAddress())
+            .collect(Collectors.joining(","));
+      } else {
+        peersListStrFromNewMetaConf = peers.stream().map(RaftPeerProto::getAddress)
+            .collect(Collectors.joining(","));
+      }
+
+      Assertions.assertEquals(peersListStr, peersListStrFromNewMetaConf);
+    }
+  }
+
+
+  private void generateRaftConf(Path path, int index) throws IOException {
+    Map<String, String> map = new HashMap<>();
+    map.put("peer1_ID", "host1:9872");
+    map.put("peer2_ID", "host2:9872");
+    map.put("peer3_ID", "host3:9872");
+    map.put("peer4_ID", "host4:9872");
+    List<RaftPeerProto> raftPeerProtos = new ArrayList<>();
+    for (Map.Entry<String, String> en : map.entrySet()) {
+      raftPeerProtos.add(RaftPeerProto.newBuilder()
+          .setId(ByteString.copyFrom(en.getKey().getBytes(StandardCharsets.UTF_8))).setAddress(en.getValue())
+          .setStartupRole(RaftPeerRole.FOLLOWER).build());
+    }
+
+    LogEntryProto generateLogEntryProto = LogEntryProto.newBuilder()
+        .setConfigurationEntry(RaftConfigurationProto.newBuilder().addAllPeers(raftPeerProtos).build())
+        .setIndex(index).build();
+    try (OutputStream out = Files.newOutputStream(path)) {
+      generateLogEntryProto.writeTo(out);
+    }
+  }
+
+  private boolean containsPeerId(String str) {
+    return p.matcher(str).find();
+  }
+
+}