CASSANDRASC-78: Fix token-ranges endpoint to handle gossip-info responses without 'status'

Patch by Arjun Ashok; Reviewed by Dinesh Joshi, Francisco Guerrero, Yifan Cai for CASSANDRASC-78
diff --git a/CHANGES.txt b/CHANGES.txt
index 62b2f77..c87d1ad 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Fix token-ranges endpoint to handle gossip-info responses without 'status' (CASSANDRASC-78)
  * Upgrade vertx to version 4.4.6 to bring hot reloading and traffic shaping options (CASSANDRASC-77)
  * Fix unable to stream secondary index files (CASSANDRASC-74)
  * Updates token-ranges endpoint to return additional instance metadata (CASSANDRASC-73)
diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java
index 24162f1..bef8b39 100644
--- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProvider.java
@@ -285,7 +285,9 @@
                 {
                     LOGGER.debug("Found gossipInfoEntry={}", gossipInfoEntry);
                     String hostStatus = gossipInfoEntry.status();
-                    if (hostStatus != null && hostStatus.startsWith("BOOT_REPLACE,"))
+                    String hostStatusWithPort = gossipInfoEntry.statusWithPort();
+                    if ((hostStatus != null && hostStatus.startsWith("BOOT_REPLACE,")) ||
+                        (hostStatusWithPort != null && hostStatusWithPort.startsWith("BOOT_REPLACE,")))
                     {
                         return NodeState.REPLACING.displayName();
                     }
diff --git a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java
index dee9935..3f3d6c5 100644
--- a/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java
+++ b/adapters/base/src/test/java/org/apache/cassandra/sidecar/adapters/base/TokenRangeReplicaProviderTest.java
@@ -442,6 +442,59 @@
 
     }
 
+    @Test
+    public void tokenRangeAfterNodeJoinsGossipVariant() throws UnknownHostException
+    {
+        Map<List<String>, List<String>> rangeToEndpointWithPortMap = ImmutableMap.of(
+        Arrays.asList("6148914691236517204", "-9223372036854775808"),
+        Arrays.asList("127.0.0.1:7000", "127.0.0.2:7000", "127.0.0.3:7000"),
+        Arrays.asList("3074457345618258602", "6148914691236517204"),
+        Arrays.asList("127.0.0.4:7000", "127.0.0.1:7000", "127.0.0.2:7000"),
+        Arrays.asList("-3074457345618258603", "3074457345618258602"),
+        Arrays.asList("127.0.0.3:7000", "127.0.0.4:7000", "127.0.0.1:7000"),
+        Arrays.asList("-9223372036854775808", "-3074457345618258603"),
+        Arrays.asList("127.0.0.2:7000", "127.0.0.3:7000", "127.0.0.4:7000")
+        );
+        Map<List<String>, List<String>> pendingRangeToEndpointWithPortMap = Collections.emptyMap();
+
+        when(storageOperations.getLiveNodesWithPort())
+        .thenReturn(Arrays.asList("127.0.0.1:7000", "127.0.0.2:7000", "127.0.0.3:7000", "127.0.0.4:7000"));
+        when(storageOperations.getUnreachableNodesWithPort()).thenReturn(Collections.emptyList());
+
+        when(storageOperations.getRangeToEndpointWithPortMap(TEST_KEYSPACE)).thenReturn(rangeToEndpointWithPortMap);
+        when(storageOperations.getPendingRangeToEndpointWithPortMap(TEST_KEYSPACE))
+        .thenReturn(pendingRangeToEndpointWithPortMap);
+        when(endpointOperations.getDatacenter(anyString())).thenReturn(TEST_DC1);
+        when(storageOperations.getLeavingNodesWithPort()).thenReturn(Arrays.asList("127.0.0.1:7000", "128.0.0.1:7000"));
+        when(storageOperations.getJoiningNodesWithPort()).thenReturn(Collections.singletonList("127.0.0.4:7000"));
+        when(clusterMembershipOperations.getAllEndpointStatesWithPort()).thenReturn(generateSampleGossip("LEAVING",
+                                                                                                         "NORMAL",
+                                                                                                         "NORMAL",
+                                                                                                         "BOOT_REPLACE",
+                                                                                                         "NORMAL",
+                                                                                                         "NORMAL",
+                                                                                                         true));
+
+        TokenRangeReplicasResponse result = instance.tokenRangeReplicas(TEST_KEYSPACE, Partitioner.Murmur3);
+        assertThat(result).isNotNull();
+        assertThat(result.readReplicas()).hasSize(4);
+        assertThat(result.writeReplicas()).hasSize(4);
+        assertThat(validateRangeExists(result.readReplicas(), "6148914691236517204",
+                                       Long.toString(Long.MAX_VALUE))).isTrue();
+        assertThat(validateRangeExists(result.writeReplicas(), "6148914691236517204",
+                                       Long.toString(Long.MAX_VALUE))).isTrue();
+
+        assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.1", 7000)
+                   .state()).isEqualTo("Leaving");
+        assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.2", 7000)
+                   .state()).isEqualTo("Normal");
+        assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.3", 7000)
+                   .state()).isEqualTo("Normal");
+        assertThat(filterReplicaMetadata(result.replicaMetadata(), "127.0.0.4", 7000)
+                   .state()).isEqualTo("Replacing");
+
+    }
+
     private boolean validateRangeExists(List<TokenRangeReplicasResponse.ReplicaInfo> ranges, String start, String end)
     {
         return ranges.stream().anyMatch(r -> (r.start().equals(start) && r.end().equals(end)));
@@ -454,32 +507,54 @@
                                         String dc2Node1Status,
                                         String dc2Node2Status)
     {
+        return generateSampleGossip(dc1Node1Status,
+                                    dc1Node2Status,
+                                    dc1Node3Status,
+                                    dc1Node4Status,
+                                    dc2Node1Status,
+                                    dc2Node2Status,
+                                    false);
+    }
+
+    private String generateSampleGossip(String dc1Node1Status,
+                                        String dc1Node2Status,
+                                        String dc1Node3Status,
+                                        String dc1Node4Status,
+                                        String dc2Node1Status,
+                                        String dc2Node2Status,
+                                        boolean excludeStatus)
+    {
         return String.format("/127.0.0.1:7000%n" +
-                             "  STATUS:16:%s,9223372036854775805%n" +
+                             getStatus(excludeStatus, "  STATUS:16:%s,9223372036854775805%n", dc1Node1Status) +
                              "  HOST_ID:21:00000000-0000-4000-8000-000000000003%n" +
                              "  STATUS_WITH_PORT:17:%s,9223372036854775805%n" +
                              "/127.0.0.2:7000%n" +
-                             "  STATUS:9:%s,3074457345618258601%n" +
+                             getStatus(excludeStatus, "  STATUS:9:%s,3074457345618258601%n", dc1Node2Status) +
                              "  HOST_ID:14:00000000-0000-4000-8000-000000000002%n" +
                              "  STATUS_WITH_PORT:10:%s,3074457345618258601%n" +
                              "/127.0.0.3:7000%n" +
-                             "  STATUS:2:%s,-3074457345618258603%n" +
+                             getStatus(excludeStatus, "  STATUS:2:%s,-3074457345618258603%n", dc1Node3Status) +
                              "  HOST_ID:7:00000000-0000-4000-8000-000000000001%n" +
                              "  STATUS_WITH_PORT:3:%s,-3074457345618258603%n" +
                              "/127.0.0.4:7000%n" +
-                             "  STATUS:2:%s,-3074457345618258603%n" +
+                             getStatus(excludeStatus, "  STATUS:2:%s,-3074457345618258603%n", dc1Node4Status) +
                              "  HOST_ID:7:00000000-0000-4000-8000-000000000004%n" +
                              "  STATUS_WITH_PORT:3:%s,-3074457345618258603%n" +
                              "/128.0.0.1:7000%n" +
-                             "  STATUS:2:%s,-3074457345618258603%n" +
+                             getStatus(excludeStatus, "  STATUS:2:%s,-3074457345618258603%n", dc2Node1Status) +
                              "  HOST_ID:7:00000000-0000-4000-8000-000000000001%n" +
                              "  STATUS_WITH_PORT:3:%s,-3074457345618258603%n" +
                              "/128.0.0.2:7000%n" +
-                             "  STATUS:2:%s,-3074457345618258603%n" +
+                             getStatus(excludeStatus, "  STATUS:2:%s,-3074457345618258603%n", dc2Node2Status) +
                              "  HOST_ID:7:00000000-0000-4000-8000-000000000002%n" +
                              "  STATUS_WITH_PORT:3:%s,-3074457345618258603%n",
-                             dc1Node1Status, dc1Node1Status, dc1Node2Status, dc1Node2Status,
-                             dc1Node3Status, dc1Node3Status, dc1Node4Status, dc1Node4Status,
-                             dc2Node1Status, dc2Node1Status, dc2Node2Status, dc1Node2Status);
+                             dc1Node1Status, dc1Node2Status,
+                             dc1Node3Status, dc1Node4Status,
+                             dc2Node1Status, dc1Node2Status);
+    }
+
+    private String getStatus(boolean exclude, String status, String value)
+    {
+        return exclude ? "" : String.format(status, value);
     }
 }
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementBaseTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementBaseTest.java
index a5b3576..14e3a17 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementBaseTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementBaseTest.java
@@ -127,7 +127,7 @@
                 List<Integer> nodeNums = newNodes.stream().map(i -> i.config().num()).collect(Collectors.toList());
                 validateNodeStates(mappingResponse,
                                    dcReplication,
-                                   nodeNumber -> nodeNums.contains(nodeNumber) ? "Joining" : "Normal");
+                                   nodeNumber -> nodeNums.contains(nodeNumber) ? "Replacing" : "Normal");
 
                 int nodeCount = annotation.nodesPerDc() * annotation.numDcs();
                 validateTokenRanges(mappingResponse, generateExpectedRanges(nodeCount));