Revert "upgrade Zookeeper 3.4.5"
This reverts commit a78c6514a2db7df77957f39cdc3ff2bbb30446d4.
diff --git a/project.clj b/project.clj
index d35b551..232d098 100644
--- a/project.clj
+++ b/project.clj
@@ -8,8 +8,8 @@
[storm/libthrift7 "0.7.0"
:exclusions [org.slf4j/slf4j-api]]
[clj-time "0.4.1"]
- [com.netflix.curator/curator-framework "1.2.6"
- :exclusions [log4j/log4j org.slf4j/slf4j-log4j12]]
+ [com.netflix.curator/curator-framework "1.0.1"
+ :exclusions [log4j/log4j]]
[backtype/jzmq "2.1.0"]
[com.googlecode.json-simple/json-simple "1.1"]
[compojure "1.1.3"]
diff --git a/src/clj/backtype/storm/zookeeper.clj b/src/clj/backtype/storm/zookeeper.clj
index 8103492..76858a7 100644
--- a/src/clj/backtype/storm/zookeeper.clj
+++ b/src/clj/backtype/storm/zookeeper.clj
@@ -6,7 +6,7 @@
ZooDefs ZooDefs$Ids CreateMode WatchedEvent Watcher$Event Watcher$Event$KeeperState
Watcher$Event$EventType KeeperException$NodeExistsException])
(:import [org.apache.zookeeper.data Stat])
- (:import [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxnFactory])
+ (:import [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxn$Factory])
(:import [java.net InetSocketAddress BindException])
(:import [java.io File])
(:import [backtype.storm.utils Utils ZookeeperAuthInfo])
@@ -132,7 +132,7 @@
(let [localfile (File. localdir)
zk (ZooKeeperServer. localfile localfile 2000)
[retport factory] (loop [retport (if port port 2000)]
- (if-let [factory-tmp (try-cause (doto (NIOServerCnxnFactory.) (.configure (InetSocketAddress. retport)))
+ (if-let [factory-tmp (try-cause (NIOServerCnxn$Factory. (InetSocketAddress. retport))
(catch BindException e
(when (> (inc retport) (if port port 65535))
(throw (RuntimeException. "No port is available to launch an inprocess zookeeper.")))))]
diff --git a/src/jvm/backtype/storm/utils/Utils.java b/src/jvm/backtype/storm/utils/Utils.java
index ebec367..36d4d5c 100644
--- a/src/jvm/backtype/storm/utils/Utils.java
+++ b/src/jvm/backtype/storm/utils/Utils.java
@@ -293,16 +293,20 @@
serverPorts.add(zkServer + ":" + Utils.getInt(port));
}
String zkStr = StringUtils.join(serverPorts, ",") + root;
-
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
- .connectString(zkStr)
- .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
- .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
- .retryPolicy(new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
- if(auth!=null && auth.scheme!=null) {
- builder = builder.authorization(auth.scheme, auth.payload);
+ try {
+
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
+ .connectString(zkStr)
+ .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
+ .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
+ .retryPolicy(new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
+ if(auth!=null && auth.scheme!=null) {
+ builder = builder.authorization(auth.scheme, auth.payload);
+ }
+ return builder.build();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- return builder.build();
}
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port) {