HADOOP-18363. Fix bug preventing hadoop-metrics2 from emitting metrics to > 1 Ganglia servers (#4627)

* HADOOP-18363. Fix bug preventing hadoop-metrics2 from emitting metrics to > 1 Ganglia servers
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
index d3d794f..620f2f2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.net.*;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -78,6 +79,10 @@
   private int offset;
   private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT;
 
+  public List<? extends SocketAddress> getMetricsServers() {
+    return metricsServers;
+  }
+
   /**
    * Used for visiting Metrics
    */
@@ -133,8 +138,11 @@
     }
 
     // load the gannglia servers from properties
-    metricsServers = Servers.parse(conf.getString(SERVERS_PROPERTY),
-        DEFAULT_PORT);
+    List<String> serversFromConf =
+        conf.getList(String.class, SERVERS_PROPERTY, new ArrayList<String>());
+    metricsServers =
+        Servers.parse(serversFromConf.size() > 0 ? String.join(",", serversFromConf) : null,
+            DEFAULT_PORT);
     multicastEnabled = conf.getBoolean(MULTICAST_ENABLED_PROPERTY,
             DEFAULT_MULTICAST_ENABLED);
     multicastTtl = conf.getInt(MULTICAST_TTL_PROPERTY, DEFAULT_MULTICAST_TTL);
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/ganglia/TestGangliaSink.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/ganglia/TestGangliaSink.java
index 30e8961..59ba188 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/ganglia/TestGangliaSink.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/ganglia/TestGangliaSink.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,7 @@
 
 import org.apache.commons.configuration2.SubsetConfiguration;
 import org.apache.hadoop.metrics2.impl.ConfigBuilder;
+
 import org.junit.Test;
 
 import java.net.DatagramSocket;
@@ -30,52 +31,61 @@
 import static org.junit.Assert.assertTrue;
 
 public class TestGangliaSink {
-    @Test
-    public void testShouldCreateDatagramSocketByDefault() throws Exception {
-        SubsetConfiguration conf = new ConfigBuilder()
-                .subset("test.sink.ganglia");
+  @Test
+  public void testShouldCreateDatagramSocketByDefault() throws Exception {
+    SubsetConfiguration conf = new ConfigBuilder().subset("test.sink.ganglia");
 
-        GangliaSink30 gangliaSink = new GangliaSink30();
-        gangliaSink.init(conf);
-        DatagramSocket socket = gangliaSink.getDatagramSocket();
-        assertFalse("Did not create DatagramSocket", socket == null || socket instanceof MulticastSocket);
-    }
+    GangliaSink30 gangliaSink = new GangliaSink30();
+    gangliaSink.init(conf);
+    DatagramSocket socket = gangliaSink.getDatagramSocket();
+    assertFalse("Did not create DatagramSocket",
+        socket == null || socket instanceof MulticastSocket);
+  }
 
-    @Test
-    public void testShouldCreateDatagramSocketIfMulticastIsDisabled() throws Exception {
-        SubsetConfiguration conf = new ConfigBuilder()
-                .add("test.sink.ganglia.multicast", false)
-                .subset("test.sink.ganglia");
-        GangliaSink30 gangliaSink = new GangliaSink30();
-        gangliaSink.init(conf);
-        DatagramSocket socket = gangliaSink.getDatagramSocket();
-        assertFalse("Did not create DatagramSocket", socket == null || socket instanceof MulticastSocket);
-    }
+  @Test
+  public void testShouldCreateDatagramSocketIfMulticastIsDisabled() throws Exception {
+    SubsetConfiguration conf =
+        new ConfigBuilder().add("test.sink.ganglia.multicast", false).subset("test.sink.ganglia");
+    GangliaSink30 gangliaSink = new GangliaSink30();
+    gangliaSink.init(conf);
+    DatagramSocket socket = gangliaSink.getDatagramSocket();
+    assertFalse("Did not create DatagramSocket",
+        socket == null || socket instanceof MulticastSocket);
+  }
 
-    @Test
-    public void testShouldCreateMulticastSocket() throws Exception {
-        SubsetConfiguration conf = new ConfigBuilder()
-                .add("test.sink.ganglia.multicast", true)
-                .subset("test.sink.ganglia");
-        GangliaSink30 gangliaSink = new GangliaSink30();
-        gangliaSink.init(conf);
-        DatagramSocket socket = gangliaSink.getDatagramSocket();
-        assertTrue("Did not create MulticastSocket", socket != null && socket instanceof MulticastSocket);
-        int ttl = ((MulticastSocket) socket).getTimeToLive();
-        assertEquals("Did not set default TTL", 1, ttl);
-    }
+  @Test
+  public void testShouldCreateMulticastSocket() throws Exception {
+    SubsetConfiguration conf =
+        new ConfigBuilder().add("test.sink.ganglia.multicast", true).subset("test.sink.ganglia");
+    GangliaSink30 gangliaSink = new GangliaSink30();
+    gangliaSink.init(conf);
+    DatagramSocket socket = gangliaSink.getDatagramSocket();
+    assertTrue("Did not create MulticastSocket",
+        socket != null && socket instanceof MulticastSocket);
+    int ttl = ((MulticastSocket) socket).getTimeToLive();
+    assertEquals("Did not set default TTL", 1, ttl);
+  }
 
-    @Test
-    public void testShouldSetMulticastSocketTtl() throws Exception {
-        SubsetConfiguration conf = new ConfigBuilder()
-                .add("test.sink.ganglia.multicast", true)
-                .add("test.sink.ganglia.multicast.ttl", 3)
-                .subset("test.sink.ganglia");
-        GangliaSink30 gangliaSink = new GangliaSink30();
-        gangliaSink.init(conf);
-        DatagramSocket socket = gangliaSink.getDatagramSocket();
-        assertTrue("Did not create MulticastSocket", socket != null && socket instanceof MulticastSocket);
-        int ttl = ((MulticastSocket) socket).getTimeToLive();
-        assertEquals("Did not set TTL", 3, ttl);
-    }
+  @Test
+  public void testShouldSetMulticastSocketTtl() throws Exception {
+    SubsetConfiguration conf = new ConfigBuilder().add("test.sink.ganglia.multicast", true)
+        .add("test.sink.ganglia.multicast.ttl", 3).subset("test.sink.ganglia");
+    GangliaSink30 gangliaSink = new GangliaSink30();
+    gangliaSink.init(conf);
+    DatagramSocket socket = gangliaSink.getDatagramSocket();
+    assertTrue("Did not create MulticastSocket",
+        socket != null && socket instanceof MulticastSocket);
+    int ttl = ((MulticastSocket) socket).getTimeToLive();
+    assertEquals("Did not set TTL", 3, ttl);
+  }
+
+  @Test
+  public void testMultipleMetricsServers() {
+    SubsetConfiguration conf =
+        new ConfigBuilder().add("test.sink.ganglia.servers", "server1,server2")
+            .subset("test.sink.ganglia");
+    GangliaSink30 gangliaSink = new GangliaSink30();
+    gangliaSink.init(conf);
+    assertEquals(2, gangliaSink.getMetricsServers().size());
+  }
 }