ATLAS-2786: Honor ZK server ensemble provided in configuration
Change-Id: I3153f258276b534e864fb53b8bcc1cd6af936b5b
diff --git a/common/src/main/java/org/apache/atlas/ha/HAConfiguration.java b/common/src/main/java/org/apache/atlas/ha/HAConfiguration.java
index 7cfd553..91fd4e1 100644
--- a/common/src/main/java/org/apache/atlas/ha/HAConfiguration.java
+++ b/common/src/main/java/org/apache/atlas/ha/HAConfiguration.java
@@ -20,6 +20,7 @@
import org.apache.atlas.security.SecurityProperties;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.List;
@@ -179,9 +180,11 @@
}
public static ZookeeperProperties getZookeeperProperties(Configuration configuration) {
- String zookeeperConnectString = configuration.getString("atlas.kafka." + ZOOKEEPER_PREFIX + "connect");
+ String[] zkServers;
if (configuration.containsKey(HA_ZOOKEEPER_CONNECT)) {
- zookeeperConnectString = configuration.getString(HA_ZOOKEEPER_CONNECT);
+ zkServers = configuration.getStringArray(HA_ZOOKEEPER_CONNECT);
+ } else {
+ zkServers = configuration.getStringArray("atlas.kafka." + ZOOKEEPER_PREFIX + "connect");
}
String zkRoot = configuration.getString(ATLAS_SERVER_HA_ZK_ROOT_KEY, ATLAS_SERVER_ZK_ROOT_DEFAULT);
@@ -195,7 +198,10 @@
String acl = configuration.getString(HA_ZOOKEEPER_ACL);
String auth = configuration.getString(HA_ZOOKEEPER_AUTH);
- return new ZookeeperProperties(zookeeperConnectString, zkRoot, retriesSleepTimeMillis, numRetries,
- sessionTimeout, acl, auth);
+
+ return new ZookeeperProperties(StringUtils.join(zkServers, ','),
+ zkRoot,
+ retriesSleepTimeMillis, numRetries,
+ sessionTimeout, acl, auth);
}
}