ZOOKEEPER-2139: Support multiple ZooKeeper client with different configurations in a single JVM (Arshad Mohammad via fpj)



git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1742002 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index edc19ea..764be96 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -411,6 +411,9 @@
 
   ZOOKEEPER-2240 Make the three-node minimum more explicit in 
   documentation and on website (Shawn Heisey and Arshad Mohammad via phunt)
+ 
+  ZOOKEEPER-2139: Support multiple ZooKeeper client with different configurations
+  in a single JVM (Arshad Mohammad via fpj)
 
 Release 3.5.0 - 8/4/2014
 
diff --git a/src/java/main/org/apache/zookeeper/ClientCnxn.java b/src/java/main/org/apache/zookeeper/ClientCnxn.java
index f477c9c..12dd51c 100644
--- a/src/java/main/org/apache/zookeeper/ClientCnxn.java
+++ b/src/java/main/org/apache/zookeeper/ClientCnxn.java
@@ -63,6 +63,7 @@
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.ZooKeeper.States;
 import org.apache.zookeeper.ZooKeeper.WatchRegistration;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.client.HostProvider;
 import org.apache.zookeeper.client.ZooKeeperSaslClient;
 import org.apache.zookeeper.common.Time;
@@ -98,9 +99,6 @@
 public class ClientCnxn {
     private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class);
 
-    private static final String ZK_SASL_CLIENT_USERNAME =
-        "zookeeper.sasl.client.username";
-
     /* ZOOKEEPER-706: If a session has a large number of watches set then
      * attempting to re-establish those watches after a connection loss may
      * fail due to the SetWatches request exceeding the server's configured
@@ -111,22 +109,6 @@
      */
     private static final int SET_WATCHES_MAX_LENGTH = 128 * 1024;
 
-    /** This controls whether automatic watch resetting is enabled.
-     * Clients automatically reset watches during session reconnect, this
-     * option allows the client to turn off this behavior by setting
-     * the environment variable "zookeeper.disableAutoWatchReset" to "true" */
-    private static boolean disableAutoWatchReset;
-    static {
-        // this var should not be public, but otw there is no easy way
-        // to test
-        disableAutoWatchReset =
-            Boolean.getBoolean("zookeeper.disableAutoWatchReset");
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("zookeeper.disableAutoWatchReset is "
-                    + disableAutoWatchReset);
-        }
-    }
-
     static class AuthData {
         AuthData(String scheme, byte data[]) {
             this.scheme = scheme;
@@ -217,6 +199,8 @@
 
     public ZooKeeperSaslClient zooKeeperSaslClient;
 
+    private final ZKClientConfig clientConfig;
+
     public long getSessionId() {
         return sessionId;
     }
@@ -409,23 +393,9 @@
 
         sendThread = new SendThread(clientCnxnSocket);
         eventThread = new EventThread();
-
+        this.clientConfig=zooKeeper.getClientConfig();
     }
 
-    /**
-     * tests use this to check on reset of watches
-     * @return if the auto reset of watches are disabled
-     */
-    public static boolean getDisableAutoResetWatch() {
-        return disableAutoWatchReset;
-    }
-    /**
-     * tests use this to set the auto reset
-     * @param b the value to set disable watches to
-     */
-    public static void setDisableAutoResetWatch(boolean b) {
-        disableAutoWatchReset = b;
-    }
     public void start() {
         sendThread.start();
         eventThread.start();
@@ -818,9 +788,6 @@
         }
     }
     
-    public static final int packetLen = Integer.getInteger("jute.maxbuffer",
-            4096 * 1024);
-
     /**
      * This class services the outgoing request queue and generates the heart
      * beats. It also spawns the ReadThread.
@@ -989,7 +956,7 @@
             // Only send if there's a pending watch
             // TODO: here we have the only remaining use of zooKeeper in
             // this class. It's to be eliminated!
-            if (!disableAutoWatchReset) {
+            if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
                 List<String> dataWatches = zooKeeper.getDataWatches();
                 List<String> existWatches = zooKeeper.getExistWatches();
                 List<String> childWatches = zooKeeper.getChildWatches();
@@ -1107,13 +1074,12 @@
             String hostPort = addr.getHostString() + ":" + addr.getPort();
             MDC.put("myid", hostPort);
             setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
-            if (ZooKeeperSaslClient.isEnabled()) {
+            if (clientConfig.isSaslClientEnabled()) {
                 try {
-                    String principalUserName = System.getProperty(
-                            ZK_SASL_CLIENT_USERNAME, "zookeeper");
-                    zooKeeperSaslClient =
-                        new ZooKeeperSaslClient(
-                                principalUserName+"/"+addr.getHostString());
+                    if (zooKeeperSaslClient != null) {
+                        zooKeeperSaslClient.shutdown();
+                    }
+                    zooKeeperSaslClient = new ZooKeeperSaslClient(getServerPrincipal(addr), clientConfig);
                 } catch (LoginException e) {
                     // An authentication error occurred when the SASL client tried to initialize:
                     // for Kerberos this means that the client failed to authenticate with the KDC.
@@ -1132,6 +1098,13 @@
             clientCnxnSocket.connect(addr);
         }
 
+        private String getServerPrincipal(InetSocketAddress addr) {
+            String principalUserName = clientConfig.getProperty(ZKClientConfig.ZK_SASL_CLIENT_USERNAME,
+                    ZKClientConfig.ZK_SASL_CLIENT_USERNAME_DEFAULT);
+            String serverPrincipal = principalUserName + "/" + addr.getHostString();
+            return serverPrincipal;
+        }
+
         private void logStartConnect(InetSocketAddress addr) {
             String msg = "Opening socket connection to server " + addr;
             if (zooKeeperSaslClient != null) {
@@ -1428,7 +1401,7 @@
 
         public boolean tunnelAuthInProgress() {
             // 1. SASL client is disabled.
-            if (!ZooKeeperSaslClient.isEnabled()) {
+            if (!clientConfig.isSaslClientEnabled()) {
                 return false;
             }
 
@@ -1466,7 +1439,7 @@
 
         sendThread.close();
         eventThread.queueEventOfDeath();
-        if (null != zooKeeperSaslClient) {
+        if (zooKeeperSaslClient != null) {
             zooKeeperSaslClient.shutdown();
         }
     }
diff --git a/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java b/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
index 9b46756..4630829 100644
--- a/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
+++ b/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
@@ -27,6 +27,8 @@
 
 import org.apache.jute.BinaryInputArchive;
 import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ZKConfig;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.proto.ConnectResponse;
 import org.apache.zookeeper.server.ByteBufferInputStream;
@@ -63,6 +65,8 @@
     protected long now;
     protected ClientCnxn.SendThread sendThread;
     protected LinkedBlockingDeque<Packet> outgoingQueue;
+    protected ZKClientConfig clientConfig;
+    private int packetLen = ZKClientConfig.CLIENT_MAX_PACKET_LENGTH_DEFAULT;
 
     /**
      * The sessionId is only available here for Log and Exception messages.
@@ -112,7 +116,7 @@
 
     protected void readLength() throws IOException {
         int len = incomingBuffer.getInt();
-        if (len < 0 || len >= ClientCnxn.packetLen) {
+        if (len < 0 || len >= packetLen) {
             throw new IOException("Packet len" + len + " is out of range!");
         }
         incomingBuffer = ByteBuffer.allocate(len);
@@ -223,4 +227,15 @@
      * finally unblock it when finished.
      */
     abstract void sendPacket(Packet p) throws IOException;
+
+    protected void initProperties() {
+        packetLen = Integer.getInteger(
+                clientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER),
+                ZKClientConfig.CLIENT_MAX_PACKET_LENGTH_DEFAULT);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{} is {}", ZKConfig.JUTE_MAXBUFFER, packetLen);
+        }
+    }
+
+
 }
diff --git a/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java b/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
index fdfd6f0..f17a819 100644
--- a/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
+++ b/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
@@ -34,6 +34,7 @@
 import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
 import org.apache.zookeeper.ClientCnxn.Packet;
 import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,8 +50,9 @@
 
     private SocketAddress remoteSocketAddress;
 
-    ClientCnxnSocketNIO() throws IOException {
-        super();
+    ClientCnxnSocketNIO(ZKClientConfig clientConfig) throws IOException {
+        this.clientConfig = clientConfig;
+        initProperties();
     }
 
     @Override
diff --git a/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java b/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java
index 43af080..05f8a81 100644
--- a/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java
+++ b/src/java/main/org/apache/zookeeper/ClientCnxnSocketNetty.java
@@ -20,6 +20,7 @@
 
 import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
 import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.common.X509Util;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -43,6 +44,7 @@
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -76,6 +78,11 @@
     AtomicBoolean needSasl = new AtomicBoolean();
     Semaphore waitSasl = new Semaphore(0);
 
+    ClientCnxnSocketNetty(ZKClientConfig clientConfig) {
+        this.clientConfig = clientConfig;
+        initProperties();
+    }
+
     /**
      * lifecycles diagram:
      * <p/>
@@ -344,7 +351,7 @@
         @Override
         public ChannelPipeline getPipeline() throws Exception {
             ChannelPipeline pipeline = Channels.pipeline();
-            if (Boolean.getBoolean(ZooKeeper.SECURE_CLIENT)) {
+            if (clientConfig.getBoolean(ZKClientConfig.SECURE_CLIENT)) {
                 initSSL(pipeline);
             }
             pipeline.addLast("handler", new ZKClientHandler());
@@ -355,7 +362,7 @@
         // Basically we only need to create it once.
         private synchronized void initSSL(ChannelPipeline pipeline) throws SSLContextException {
             if (sslContext == null || sslEngine == null) {
-                sslContext = X509Util.createSSLContext();
+                sslContext = X509Util.createSSLContext(clientConfig);
                 sslEngine = sslContext.createSSLEngine();
                 sslEngine.setUseClientMode(true);
             }
diff --git a/src/java/main/org/apache/zookeeper/Login.java b/src/java/main/org/apache/zookeeper/Login.java
index 19a59af..3ea666b 100644
--- a/src/java/main/org/apache/zookeeper/Login.java
+++ b/src/java/main/org/apache/zookeeper/Login.java
@@ -32,7 +32,9 @@
 import javax.security.auth.login.LoginException;
 import javax.security.auth.callback.CallbackHandler;
 
-import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ZKConfig;
+import org.apache.zookeeper.server.ZooKeeperSaslServer;
 import org.apache.zookeeper.common.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +47,7 @@
 import java.util.Set;
 
 public class Login {
+    private static final String KINIT_COMMAND_DEFAULT = "/usr/bin/kinit";
     private static final Logger LOG = LoggerFactory.getLogger(Login.class);
     public CallbackHandler callbackHandler;
 
@@ -77,21 +80,27 @@
 
     // Initialize 'lastLogin' to do a login at first time
     private long lastLogin = Time.currentElapsedTime() - MIN_TIME_BEFORE_RELOGIN;
+    private final ZKConfig zkConfig;
 
     /**
-     * LoginThread constructor. The constructor starts the thread used
-     * to periodically re-login to the Kerberos Ticket Granting Server.
+     * LoginThread constructor. The constructor starts the thread used to
+     * periodically re-login to the Kerberos Ticket Granting Server.
+     * 
      * @param loginContextName
-     *               name of section in JAAS file that will be use to login.
-     *               Passed as first param to javax.security.auth.login.LoginContext().
+     *            name of section in JAAS file that will be use to login. Passed
+     *            as first param to javax.security.auth.login.LoginContext().
      *
      * @param callbackHandler
-     *               Passed as second param to javax.security.auth.login.LoginContext().
+     *            Passed as second param to
+     *            javax.security.auth.login.LoginContext().
+     * @param zkConfig
+     *            client or server configurations
      * @throws javax.security.auth.login.LoginException
-     *               Thrown if authentication fails.
+     *             Thrown if authentication fails.
      */
-    public Login(final String loginContextName, CallbackHandler callbackHandler)
+    public Login(final String loginContextName, CallbackHandler callbackHandler, final ZKConfig zkConfig)
             throws LoginException {
+        this.zkConfig=zkConfig;
         this.callbackHandler = callbackHandler;
         login = login(loginContextName);
         this.loginContextName = loginContextName;
@@ -197,10 +206,7 @@
                         break;
                     }
                     if (isUsingTicketCache) {
-                        String cmd = "/usr/bin/kinit";
-                        if (System.getProperty("zookeeper.kinit") != null) {
-                            cmd = System.getProperty("zookeeper.kinit");
-                        }
+                        String cmd = zkConfig.getProperty(ZKConfig.KINIT_COMMAND, KINIT_COMMAND_DEFAULT);
                         String kinitArgs = "-R";
                         int retry = 1;
                         while (retry >= 0) {
@@ -289,8 +295,7 @@
             throw new LoginException("loginContext name (JAAS file section header) was null. " +
                     "Please check your java.security.login.auth.config (=" +
                     System.getProperty("java.security.login.auth.config") +
-                    ") and your " + ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY + "(=" + 
-                    System.getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client") + ")");
+                    ") and your " + getLoginContextMessage());
         }
         LoginContext loginContext = new LoginContext(loginContextName,callbackHandler);
         loginContext.login();
@@ -298,6 +303,16 @@
         return loginContext;
     }
 
+    private String getLoginContextMessage() {
+        if (zkConfig instanceof ZKClientConfig) {
+            return ZKClientConfig.LOGIN_CONTEXT_NAME_KEY + "(=" + zkConfig.getProperty(
+                    ZKClientConfig.LOGIN_CONTEXT_NAME_KEY, ZKClientConfig.LOGIN_CONTEXT_NAME_KEY_DEFAULT) + ")";
+        } else {
+            return ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY + "(=" + System.getProperty(
+                    ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY, ZooKeeperSaslServer.DEFAULT_LOGIN_CONTEXT_NAME) + ")";
+        }
+    }
+
     // c.f. org.apache.hadoop.security.UserGroupInformation.
     private long getRefreshTime(KerberosTicket tgt) {
         long start = tgt.getStartTime().getTime();
diff --git a/src/java/main/org/apache/zookeeper/ZooKeeper.java b/src/java/main/org/apache/zookeeper/ZooKeeper.java
index 2dca385..f2ec3d7 100644
--- a/src/java/main/org/apache/zookeeper/ZooKeeper.java
+++ b/src/java/main/org/apache/zookeeper/ZooKeeper.java
@@ -19,6 +19,7 @@
 package org.apache.zookeeper;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
@@ -44,6 +45,7 @@
 import org.apache.zookeeper.OpResult.ErrorResult;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.client.ConnectStringParser;
 import org.apache.zookeeper.client.HostProvider;
 import org.apache.zookeeper.client.StaticHostProvider;
@@ -129,8 +131,19 @@
  */
 public class ZooKeeper {
 
+    /**
+     * @deprecated Use {@link ZKClientConfig#ZOOKEEPER_CLIENT_CNXN_SOCKET}
+     *             instead.
+     */
+    @Deprecated
     public static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket";
     // Setting this to "true" will enable encrypted client-server communication.
+
+    /**
+     * @deprecated Use {@link ZKClientConfig#SECURE_CLIENT}
+     *             instead.
+     */
+    @Deprecated
     public static final String SECURE_CLIENT = "zookeeper.client.secure";
 
     protected final ClientCnxn cnxn;
@@ -202,6 +215,12 @@
 
     private final ZKWatchManager watchManager;
 
+    private final ZKClientConfig clientConfig;
+
+    public ZKClientConfig getClientConfig() {
+        return clientConfig;
+    }
+
     List<String> getDataWatches() {
         synchronized(watchManager.dataWatches) {
             List<String> rc = new ArrayList<String>(watchManager.dataWatches.keySet());
@@ -235,6 +254,11 @@
             new HashMap<String, Set<Watcher>>();
         private final Map<String, Set<Watcher>> childWatches =
             new HashMap<String, Set<Watcher>>();
+        private boolean disableAutoWatchReset;
+
+        ZKWatchManager(boolean disableAutoWatchReset) {
+            this.disableAutoWatchReset = disableAutoWatchReset;
+        }
 
         private volatile Watcher defaultWatcher;
 
@@ -430,9 +454,7 @@
             switch (type) {
             case None:
                 result.add(defaultWatcher);
-                boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
-                        state != Watcher.Event.KeeperState.SyncConnected;
-
+                boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected;
                 synchronized(dataWatches) {
                     for(Set<Watcher> ws: dataWatches.values()) {
                         result.addAll(ws);
@@ -675,6 +697,56 @@
      * Added in 3.2.0: An optional "chroot" suffix may also be appended to the
      * connection string. This will run the client commands while interpreting
      * all paths relative to this root (similar to the unix chroot command).
+     *
+     * @param connectString
+     *            comma separated host:port pairs, each corresponding to a zk
+     *            server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If
+     *            the optional chroot suffix is used the example would look
+     *            like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
+     *            where the client would be rooted at "/app/a" and all paths
+     *            would be relative to this root - ie getting/setting/etc...
+     *            "/foo/bar" would result in operations being run on
+     *            "/app/a/foo/bar" (from the server perspective).
+     * @param sessionTimeout
+     *            session timeout in milliseconds
+     * @param watcher
+     *            a watcher object which will be notified of state changes, may
+     *            also be notified for node events
+     * @param conf
+     *            (added in 3.5.2) passing this conf object gives each client the flexibility of
+     *            configuring properties differently compared to other instances
+     * @throws IOException
+     *             in cases of network failure
+     * @throws IllegalArgumentException
+     *             if an invalid chroot path is specified
+     */
+    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
+            ZKClientConfig conf) throws IOException {
+        this(connectString, sessionTimeout, watcher, false, conf);
+    }
+
+    /**
+     * To create a ZooKeeper client object, the application needs to pass a
+     * connection string containing a comma separated list of host:port pairs,
+     * each corresponding to a ZooKeeper server.
+     * <p>
+     * Session establishment is asynchronous. This constructor will initiate
+     * connection to the server and return immediately - potentially (usually)
+     * before the session is fully established. The watcher argument specifies
+     * the watcher that will be notified of any changes in state. This
+     * notification can come at any point before or after the constructor call
+     * has returned.
+     * <p>
+     * The instantiated ZooKeeper client object will pick an arbitrary server
+     * from the connectString and attempt to connect to it. If establishment of
+     * the connection fails, another server in the connect string will be tried
+     * (the order is non-deterministic, as we random shuffle the list), until a
+     * connection is established. The client will continue attempts until the
+     * session is explicitly closed.
+     * <p>
+     * Added in 3.2.0: An optional "chroot" suffix may also be appended to the
+     * connection string. This will run the client commands while interpreting
+     * all paths relative to this root (similar to the unix chroot command).
      * <p>
      * For backward compatibility, there is another version
      * {@link #ZooKeeper(String, int, Watcher, boolean)} which uses
@@ -713,12 +785,82 @@
     public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
             boolean canBeReadOnly, HostProvider aHostProvider)
             throws IOException {
+        this(connectString, sessionTimeout, watcher, canBeReadOnly,
+                aHostProvider, null);
+    }
+
+
+    /**
+     * To create a ZooKeeper client object, the application needs to pass a
+     * connection string containing a comma separated list of host:port pairs,
+     * each corresponding to a ZooKeeper server.
+     * <p>
+     * Session establishment is asynchronous. This constructor will initiate
+     * connection to the server and return immediately - potentially (usually)
+     * before the session is fully established. The watcher argument specifies
+     * the watcher that will be notified of any changes in state. This
+     * notification can come at any point before or after the constructor call
+     * has returned.
+     * <p>
+     * The instantiated ZooKeeper client object will pick an arbitrary server
+     * from the connectString and attempt to connect to it. If establishment of
+     * the connection fails, another server in the connect string will be tried
+     * (the order is non-deterministic, as we random shuffle the list), until a
+     * connection is established. The client will continue attempts until the
+     * session is explicitly closed.
+     * <p>
+     * Added in 3.2.0: An optional "chroot" suffix may also be appended to the
+     * connection string. This will run the client commands while interpreting
+     * all paths relative to this root (similar to the unix chroot command).
+     * <p>
+     * For backward compatibility, there is another version
+     * {@link #ZooKeeper(String, int, Watcher, boolean)} which uses default
+     * {@link StaticHostProvider}
+     *
+     * @param connectString
+     *            comma separated host:port pairs, each corresponding to a zk
+     *            server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If
+     *            the optional chroot suffix is used the example would look
+     *            like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
+     *            where the client would be rooted at "/app/a" and all paths
+     *            would be relative to this root - ie getting/setting/etc...
+     *            "/foo/bar" would result in operations being run on
+     *            "/app/a/foo/bar" (from the server perspective).
+     * @param sessionTimeout
+     *            session timeout in milliseconds
+     * @param watcher
+     *            a watcher object which will be notified of state changes, may
+     *            also be notified for node events
+     * @param canBeReadOnly
+     *            (added in 3.4) whether the created client is allowed to go to
+     *            read-only mode in case of partitioning. Read-only mode
+     *            basically means that if the client can't find any majority
+     *            servers but there's partitioned server it could reach, it
+     *            connects to one in read-only mode, i.e. read requests are
+     *            allowed while write requests are not. It continues seeking for
+     *            majority in the background.
+     * @param aHostProvider
+     *            use this as HostProvider to enable custom behaviour.
+     * @param clientConfig
+     *            (added in 3.5.2) passing this conf object gives each client the flexibility of
+     *            configuring properties differently compared to other instances
+     * @throws IOException
+     *             in cases of network failure
+     * @throws IllegalArgumentException
+     *             if an invalid chroot path is specified
+     */
+    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
+            boolean canBeReadOnly, HostProvider aHostProvider,
+            ZKClientConfig clientConfig) throws IOException {
         LOG.info("Initiating client connection, connectString=" + connectString
                 + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
 
+        if (clientConfig == null) {
+            clientConfig = new ZKClientConfig();
+        }
+        this.clientConfig = clientConfig;
         watchManager = defaultWatchManager();
         watchManager.defaultWatcher = watcher;
-
         ConnectStringParser connectStringParser = new ConnectStringParser(
                 connectString);
         hostProvider = aHostProvider;
@@ -804,6 +946,66 @@
      * the connection fails, another server in the connect string will be tried
      * (the order is non-deterministic, as we random shuffle the list), until a
      * connection is established. The client will continue attempts until the
+     * session is explicitly closed.
+     * <p>
+     * Added in 3.2.0: An optional "chroot" suffix may also be appended to the
+     * connection string. This will run the client commands while interpreting
+     * all paths relative to this root (similar to the unix chroot command).
+     * <p>
+     *
+     * @param connectString
+     *            comma separated host:port pairs, each corresponding to a zk
+     *            server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If
+     *            the optional chroot suffix is used the example would look
+     *            like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
+     *            where the client would be rooted at "/app/a" and all paths
+     *            would be relative to this root - ie getting/setting/etc...
+     *            "/foo/bar" would result in operations being run on
+     *            "/app/a/foo/bar" (from the server perspective).
+     * @param sessionTimeout
+     *            session timeout in milliseconds
+     * @param watcher
+     *            a watcher object which will be notified of state changes, may
+     *            also be notified for node events
+     * @param canBeReadOnly
+     *            (added in 3.4) whether the created client is allowed to go to
+     *            read-only mode in case of partitioning. Read-only mode
+     *            basically means that if the client can't find any majority
+     *            servers but there's partitioned server it could reach, it
+     *            connects to one in read-only mode, i.e. read requests are
+     *            allowed while write requests are not. It continues seeking for
+     *            majority in the background.
+     * @param conf
+     *            (added in 3.5.2) passing this conf object gives each client the flexibility of
+     *            configuring properties differently compared to other instances
+     * @throws IOException
+     *             in cases of network failure
+     * @throws IllegalArgumentException
+     *             if an invalid chroot path is specified
+     */
+    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
+            boolean canBeReadOnly, ZKClientConfig conf) throws IOException {
+        this(connectString, sessionTimeout, watcher, canBeReadOnly,
+                createDefaultHostProvider(connectString), conf);
+    }
+
+    /**
+     * To create a ZooKeeper client object, the application needs to pass a
+     * connection string containing a comma separated list of host:port pairs,
+     * each corresponding to a ZooKeeper server.
+     * <p>
+     * Session establishment is asynchronous. This constructor will initiate
+     * connection to the server and return immediately - potentially (usually)
+     * before the session is fully established. The watcher argument specifies
+     * the watcher that will be notified of any changes in state. This
+     * notification can come at any point before or after the constructor call
+     * has returned.
+     * <p>
+     * The instantiated ZooKeeper client object will pick an arbitrary server
+     * from the connectString and attempt to connect to it. If establishment of
+     * the connection fails, another server in the connect string will be tried
+     * (the order is non-deterministic, as we random shuffle the list), until a
+     * connection is established. The client will continue attempts until the
      * session is explicitly closed (or the session is expired by the server).
      * <p>
      * Added in 3.2.0: An optional "chroot" suffix may also be appended to the
@@ -920,6 +1122,7 @@
                 + " sessionPasswd="
                 + (sessionPasswd == null ? "<null>" : "<hidden>"));
 
+        this.clientConfig = new ZKClientConfig();
         watchManager = defaultWatchManager();
         watchManager.defaultWatcher = watcher;
        
@@ -1015,7 +1218,7 @@
 
     /* Useful for testing watch handling behavior */
     protected ZKWatchManager defaultWatchManager() {
-        return new ZKWatchManager();
+        return new ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
     }
 
     /**
@@ -2719,15 +2922,16 @@
         return cnxn.sendThread.getClientCnxnSocket().getLocalSocketAddress();
     }
 
-    private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
-        String clientCnxnSocketName = System
-                .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+    private ClientCnxnSocket getClientCnxnSocket() throws IOException {
+        String clientCnxnSocketName = getClientConfig().getProperty(
+                ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
         if (clientCnxnSocketName == null) {
             clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
         }
         try {
-            return (ClientCnxnSocket) Class.forName(clientCnxnSocketName)
-                    .newInstance();
+            Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);
+            ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
+            return clientCxnSocket;
         } catch (Exception e) {
             IOException ioe = new IOException("Couldn't instantiate "
                     + clientCnxnSocketName);
diff --git a/src/java/main/org/apache/zookeeper/ZooKeeperMain.java b/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
index d082edc..f722693 100644
--- a/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
+++ b/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
@@ -38,6 +38,7 @@
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
+
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -62,6 +63,7 @@
 import org.apache.zookeeper.cli.SetQuotaCommand;
 import org.apache.zookeeper.cli.StatCommand;
 import org.apache.zookeeper.cli.SyncCommand;
+import org.apache.zookeeper.client.ZKClientConfig;
 
 /**
  * The command line client to ZooKeeper.
@@ -273,7 +275,7 @@
         host = newHost;
         boolean readOnly = cl.getOption("readonly") != null;
         if (cl.getOption("secure") != null) {
-            System.setProperty(ZooKeeper.SECURE_CLIENT, "true");
+            System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
             System.out.println("Secure connection is enabled");
         }
         zk = new ZooKeeper(host,
diff --git a/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java b/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java
new file mode 100644
index 0000000..0eab9c5
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/client/ZKClientConfig.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.client;
+
+import java.io.File;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.ZKConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+
+/**
+ * Handles client specific properties
+ * @since 3.5.2
+ */
+public class ZKClientConfig extends ZKConfig {
+    public static final String ZK_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
+    public static final String ZK_SASL_CLIENT_USERNAME_DEFAULT = "zookeeper";
+    @SuppressWarnings("deprecation")
+    public static final String LOGIN_CONTEXT_NAME_KEY = ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY;;
+    public static final String LOGIN_CONTEXT_NAME_KEY_DEFAULT = "Client";
+    @SuppressWarnings("deprecation")
+    public static final String ENABLE_CLIENT_SASL_KEY = ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY;
+    @SuppressWarnings("deprecation")
+    public static final String ENABLE_CLIENT_SASL_DEFAULT = ZooKeeperSaslClient.ENABLE_CLIENT_SASL_DEFAULT;
+    public static final String ZOOKEEPER_SERVER_REALM = "zookeeper.server.realm";
+    /**
+     * This controls whether automatic watch resetting is enabled. Clients
+     * automatically reset watches during session reconnect, this option allows
+     * the client to turn off this behavior by setting the property
+     * "zookeeper.disableAutoWatchReset" to "true"
+     */
+    public static final String DISABLE_AUTO_WATCH_RESET = "zookeeper.disableAutoWatchReset";
+    @SuppressWarnings("deprecation")
+    public static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+    /**
+     * Setting this to "true" will enable encrypted client-server communication.
+     */
+    @SuppressWarnings("deprecation")
+    public static final String SECURE_CLIENT = ZooKeeper.SECURE_CLIENT;
+    public static final int CLIENT_MAX_PACKET_LENGTH_DEFAULT = 4096 * 1024; /* 4 MB */
+
+    public ZKClientConfig() {
+        super();
+    }
+
+    public ZKClientConfig(File configFile) throws ConfigException {
+        super(configFile);
+    }
+
+    public ZKClientConfig(String configPath) throws ConfigException {
+        super(configPath);
+    }
+
+    @Override
+    protected void handleBackwardCompatibility() {
+        /**
+         * backward compatibility for properties which are common to both client
+         * and server
+         */
+        super.handleBackwardCompatibility();
+
+        /**
+         * backward compatibility for client specific properties
+         */
+        setProperty(ZK_SASL_CLIENT_USERNAME, System.getProperty(ZK_SASL_CLIENT_USERNAME));
+        setProperty(LOGIN_CONTEXT_NAME_KEY, System.getProperty(LOGIN_CONTEXT_NAME_KEY));
+        setProperty(ENABLE_CLIENT_SASL_KEY, System.getProperty(ENABLE_CLIENT_SASL_KEY));
+        setProperty(ZOOKEEPER_SERVER_REALM, System.getProperty(ZOOKEEPER_SERVER_REALM));
+        setProperty(DISABLE_AUTO_WATCH_RESET, System.getProperty(DISABLE_AUTO_WATCH_RESET));
+        setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET));
+        setProperty(SECURE_CLIENT, System.getProperty(SECURE_CLIENT));
+    }
+
+    /**
+     * Returns true if the SASL client is enabled. By default, the client is
+     * enabled but can be disabled by setting the system property
+     * <code>zookeeper.sasl.client</code> to <code>false</code>. See
+     * ZOOKEEPER-1657 for more information.
+     *
+     * @return true if the SASL client is enabled.
+     */
+    public boolean isSaslClientEnabled() {
+        return Boolean.valueOf(getProperty(ENABLE_CLIENT_SASL_KEY, ENABLE_CLIENT_SASL_DEFAULT));
+    }
+}
diff --git a/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java b/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
index aebbfa4..449e7be 100644
--- a/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
+++ b/src/java/main/org/apache/zookeeper/client/ZooKeeperSaslClient.java
@@ -40,10 +40,10 @@
 
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.ClientCnxn;
-import org.apache.zookeeper.Environment;
 import org.apache.zookeeper.Login;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.common.ZKConfig;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.GetSASLRequest;
 import org.apache.zookeeper.proto.SetSASLResponse;
@@ -58,13 +58,28 @@
 
 /**
  * This class manages SASL authentication for the client. It
- * allows ClientCnxn to authenticate using SASL with a Zookeeper server.
+ * allows ClientCnxn to authenticate using SASL with a ZooKeeper server.
  */
 public class ZooKeeperSaslClient {
+    /**
+     * @deprecated Use {@link ZKClientConfig#LOGIN_CONTEXT_NAME_KEY}
+     *             instead.
+     */
+    @Deprecated
     public static final String LOGIN_CONTEXT_NAME_KEY = "zookeeper.sasl.clientconfig";
+    /**
+     * @deprecated Use {@link ZKClientConfig#ENABLE_CLIENT_SASL_KEY}
+     *             instead.
+     */
+    @Deprecated
     public static final String ENABLE_CLIENT_SASL_KEY = "zookeeper.sasl.client";
+    /**
+     * @deprecated Use {@link ZKClientConfig#ENABLE_CLIENT_SASL_DEFAULT}
+     *             instead.
+     */
+    @Deprecated
     public static final String ENABLE_CLIENT_SASL_DEFAULT = "true";
-    private static volatile boolean initializedLogin = false; 
+    private volatile boolean initializedLogin = false; 
 
     /**
      * Returns true if the SASL client is enabled. By default, the client
@@ -72,16 +87,21 @@
      * <code>zookeeper.sasl.client</code> to <code>false</code>. See
      * ZOOKEEPER-1657 for more information.
      *
-     * @return If the SASL client is enabled.
+     * @return true if the SASL client is enabled.
+     * @deprecated Use {@link ZKClientConfig#isSaslClientEnabled} instead
      */
+    @Deprecated
     public static boolean isEnabled() {
-        return Boolean.valueOf(System.getProperty(ENABLE_CLIENT_SASL_KEY, ENABLE_CLIENT_SASL_DEFAULT));
+        return Boolean.valueOf(System.getProperty(
+                ZKClientConfig.ENABLE_CLIENT_SASL_KEY,
+                ZKClientConfig.ENABLE_CLIENT_SASL_DEFAULT));
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSaslClient.class);
-    private static Login login = null;
+    private Login login = null;
     private SaslClient saslClient;
     private boolean isSASLConfigured = true;
+    private final ZKClientConfig clientConfig;
 
     private byte[] saslToken = new byte[0];
 
@@ -105,19 +125,22 @@
         return null;
     }
 
-    public ZooKeeperSaslClient(final String serverPrincipal)
-            throws LoginException {
+    public ZooKeeperSaslClient(final String serverPrincipal, ZKClientConfig clientConfig) throws LoginException {
         /**
          * ZOOKEEPER-1373: allow system property to specify the JAAS
          * configuration section that the zookeeper client should use.
          * Default to "Client".
          */
-        String clientSection = System.getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client");
+        String clientSection = clientConfig.getProperty(
+                ZKClientConfig.LOGIN_CONTEXT_NAME_KEY,
+                ZKClientConfig.LOGIN_CONTEXT_NAME_KEY_DEFAULT);
+        this.clientConfig = clientConfig;
         // Note that 'Configuration' here refers to javax.security.auth.login.Configuration.
         AppConfigurationEntry entries[] = null;
         RuntimeException runtimeException = null;
         try {
-            entries = Configuration.getConfiguration().getAppConfigurationEntry(clientSection);
+            entries = Configuration.getConfiguration()
+                    .getAppConfigurationEntry(clientSection);
         } catch (SecurityException e) {
             // handle below: might be harmless if the user doesn't intend to use JAAS authentication.
             runtimeException = e;
@@ -134,15 +157,18 @@
             // Handle situation of clientSection's being null: it might simply because the client does not intend to 
             // use SASL, so not necessarily an error.
             saslState = SaslState.FAILED;
-            String explicitClientSection = System.getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY);
+            String explicitClientSection = clientConfig
+                    .getProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY);
             if (explicitClientSection != null) {
                 // If the user explicitly overrides the default Login Context, they probably expected SASL to
                 // succeed. But if we got here, SASL failed.
                 if (runtimeException != null) {
-                    throw new LoginException("Zookeeper client cannot authenticate using the " + explicitClientSection +
-                            " section of the supplied JAAS configuration: '" +
-                            System.getProperty(Environment.JAAS_CONF_KEY) + "' because of a " +
-                            "RuntimeException: " + runtimeException);
+                    throw new LoginException(
+                            "Zookeeper client cannot authenticate using the "
+                                    + explicitClientSection
+                                    + " section of the supplied JAAS configuration: '"
+                                    + clientConfig.getJaasConfKey() + "' because of a "
+                                    + "RuntimeException: " + runtimeException);
                 } else {
                     throw new LoginException("Client cannot SASL-authenticate because the specified JAAS configuration " +
                             "section '" + explicitClientSection + "' could not be found.");
@@ -159,19 +185,26 @@
                 this.configStatus = msg;
                 this.isSASLConfigured = false;
             }
-            if (System.getProperty(Environment.JAAS_CONF_KEY)  != null) {
-                // Again, the user explicitly set something SASL-related, so they probably expected SASL to succeed.
+            if (clientConfig.getJaasConfKey() != null) {
+                // Again, the user explicitly set something SASL-related, so
+                // they probably expected SASL to succeed.
                 if (runtimeException != null) {
-                    throw new LoginException("Zookeeper client cannot authenticate using the '" +
-                            System.getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client") +
-                            "' section of the supplied JAAS configuration: '" +
-                            System.getProperty(Environment.JAAS_CONF_KEY) + "' because of a " +
-                            "RuntimeException: " + runtimeException);
+                    throw new LoginException(
+                            "Zookeeper client cannot authenticate using the '"
+                                    + clientConfig.getProperty(
+                                            ZKClientConfig.LOGIN_CONTEXT_NAME_KEY,
+                                            ZKClientConfig.LOGIN_CONTEXT_NAME_KEY_DEFAULT)
+                                    + "' section of the supplied JAAS configuration: '"
+                                    + clientConfig.getJaasConfKey() + "' because of a "
+                                    + "RuntimeException: " + runtimeException);
                 } else {
-                    throw new LoginException("No JAAS configuration section named '" +
-                            System.getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "Client") +
-                            "' was found in specified JAAS configuration file: '" +
-                            System.getProperty(Environment.JAAS_CONF_KEY) + "'.");
+                    throw new LoginException(
+                            "No JAAS configuration section named '"
+                                    + clientConfig.getProperty(
+                                            ZKClientConfig.LOGIN_CONTEXT_NAME_KEY,
+                                            ZKClientConfig.LOGIN_CONTEXT_NAME_KEY_DEFAULT)
+                                    + "' was found in specified JAAS configuration file: '"
+                                    + clientConfig.getJaasConfKey() + "'.");
                 }
             }
         }
@@ -215,18 +248,18 @@
         }
     }
 
-    private SaslClient createSaslClient(final String servicePrincipal,
-                                                     final String loginContext) throws LoginException {
+    private SaslClient createSaslClient(final String servicePrincipal, final String loginContext)
+            throws LoginException {
         try {
             if (!initializedLogin) {
-                synchronized (ZooKeeperSaslClient.class) {
+                synchronized (this) {
                     if (login == null) {
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("JAAS loginContext is: " + loginContext);
                         }
                         // note that the login object is static: it's shared amongst all zookeeper-related connections.
                         // in order to ensure the login is initialized only once, it must be synchronized the code snippet.
-                        login = new Login(loginContext, new ClientCallbackHandler(null));
+                        login = new Login(loginContext, new ClientCallbackHandler(null), clientConfig);
                         login.startThreadIfNeeded();
                         initializedLogin = true;
                     }
@@ -247,8 +280,7 @@
                 return saslClient;
             }
             else { // GSSAPI.
-            	boolean usingNativeJgss =
-            			Boolean.getBoolean("sun.security.jgss.native");
+                boolean usingNativeJgss = clientConfig.getBoolean(ZKConfig.JGSS_NATIVE);
             	if (usingNativeJgss) {
             		// http://docs.oracle.com/javase/6/docs/technotes/guides/security/jgss/jgss-features.html
             		// """
@@ -280,7 +312,9 @@
                 final KerberosName clientKerberosName = new KerberosName(clientPrincipal.getName());
                 // assume that server and client are in the same realm (by default; unless the system property
                 // "zookeeper.server.realm" is set).
-                String serverRealm = System.getProperty("zookeeper.server.realm",clientKerberosName.getRealm());
+                String serverRealm = clientConfig.getProperty(
+                        ZKClientConfig.ZOOKEEPER_SERVER_REALM,
+                        clientKerberosName.getRealm());
                 KerberosName serviceKerberosName = new KerberosName(servicePrincipal+"@"+serverRealm);
                 final String serviceName = serviceKerberosName.getServiceName();
                 final String serviceHostname = serviceKerberosName.getHostName();
@@ -548,12 +582,10 @@
         // variable or method in this class to determine whether the client is
         // configured to use SASL. (see also ZOOKEEPER-1455).
         try {
-  	    if ((System.getProperty(Environment.JAAS_CONF_KEY) != null) ||
-              ((javax.security.auth.login.Configuration.getConfiguration() != null) &&
-                  (javax.security.auth.login.Configuration.getConfiguration().
-                       getAppConfigurationEntry(System.
-                       getProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,"Client")) 
-                           != null))) {
+            if ((clientConfig.getJaasConfKey() != null)
+                    || ((Configuration.getConfiguration() != null) && (Configuration.getConfiguration()
+                            .getAppConfigurationEntry(clientConfig.getProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY,
+                                    ZKClientConfig.LOGIN_CONTEXT_NAME_KEY_DEFAULT)) != null))) {
                 // Client is configured to use a valid login Configuration, so
                 // authentication is either in progress, successful, or failed.
 
diff --git a/src/java/main/org/apache/zookeeper/common/X509Util.java b/src/java/main/org/apache/zookeeper/common/X509Util.java
index a0b3041..c48d694 100644
--- a/src/java/main/org/apache/zookeeper/common/X509Util.java
+++ b/src/java/main/org/apache/zookeeper/common/X509Util.java
@@ -43,18 +43,53 @@
 public class X509Util {
     private static final Logger LOG = LoggerFactory.getLogger(X509Util.class);
 
+    /**
+     * @deprecated Use {@link ZKConfig#SSL_KEYSTORE_LOCATION}
+     *             instead.
+     */
+    @Deprecated
     public static final String SSL_KEYSTORE_LOCATION = "zookeeper.ssl.keyStore.location";
+    /**
+     * @deprecated Use {@link ZKConfig#SSL_KEYSTORE_PASSWD}
+     *             instead.
+     */
+    @Deprecated
     public static final String SSL_KEYSTORE_PASSWD = "zookeeper.ssl.keyStore.password";
+    /**
+     * @deprecated Use {@link ZKConfig#SSL_TRUSTSTORE_LOCATION}
+     *             instead.
+     */
+    @Deprecated
     public static final String SSL_TRUSTSTORE_LOCATION = "zookeeper.ssl.trustStore.location";
+    /**
+     * @deprecated Use {@link ZKConfig#SSL_TRUSTSTORE_PASSWD}
+     *             instead.
+     */
+    @Deprecated
     public static final String SSL_TRUSTSTORE_PASSWD = "zookeeper.ssl.trustStore.password";
+    /**
+     * @deprecated Use {@link ZKConfig#SSL_AUTHPROVIDER}
+     *             instead.
+     */
+    @Deprecated
     public static final String SSL_AUTHPROVIDER = "zookeeper.ssl.authProvider";
 
     public static SSLContext createSSLContext() throws SSLContextException {
+        /**
+         * Since Configuration initializes the key store and trust store related
+         * configuration from system property. Reading property from
+         * configuration will be same reading from system property
+         */
+        ZKConfig config=new ZKConfig();
+        return createSSLContext(config);
+    }
+
+    public static SSLContext createSSLContext(ZKConfig config) throws SSLContextException {
         KeyManager[] keyManagers = null;
         TrustManager[] trustManagers = null;
 
-        String keyStoreLocationProp = System.getProperty(SSL_KEYSTORE_LOCATION);
-        String keyStorePasswordProp = System.getProperty(SSL_KEYSTORE_PASSWD);
+        String keyStoreLocationProp = config.getProperty(ZKConfig.SSL_KEYSTORE_LOCATION);
+        String keyStorePasswordProp = config.getProperty(ZKConfig.SSL_KEYSTORE_PASSWD);
 
         // There are legal states in some use cases for null KeyManager or TrustManager.
         // But if a user wanna specify one, location and password are required.
@@ -76,8 +111,8 @@
             }
         }
 
-        String trustStoreLocationProp = System.getProperty(SSL_TRUSTSTORE_LOCATION);
-        String trustStorePasswordProp = System.getProperty(SSL_TRUSTSTORE_PASSWD);
+        String trustStoreLocationProp = config.getProperty(ZKConfig.SSL_TRUSTSTORE_LOCATION);
+        String trustStorePasswordProp = config.getProperty(ZKConfig.SSL_TRUSTSTORE_PASSWD);
 
         if (trustStoreLocationProp == null && trustStorePasswordProp == null) {
             LOG.warn("keystore not specified for client connection");
diff --git a/src/java/main/org/apache/zookeeper/common/ZKConfig.java b/src/java/main/org/apache/zookeeper/common/ZKConfig.java
new file mode 100644
index 0000000..3b08904
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/common/ZKConfig.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.common;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.zookeeper.Environment;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.util.VerifyingFileFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is a base class for the configurations of both client and server.
+ * It supports reading client configuration from both system properties and
+ * configuration file. A user can override any system property by calling
+ * {@link #setProperty(String, String)}.
+ * @since 3.5.2
+ */
+public class ZKConfig {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ZKConfig.class);
+    @SuppressWarnings("deprecation")
+    public static final String SSL_KEYSTORE_LOCATION = X509Util.SSL_KEYSTORE_LOCATION;
+    @SuppressWarnings("deprecation")
+    public static final String SSL_KEYSTORE_PASSWD = X509Util.SSL_KEYSTORE_PASSWD;
+    @SuppressWarnings("deprecation")
+    public static final String SSL_TRUSTSTORE_LOCATION = X509Util.SSL_TRUSTSTORE_LOCATION;
+    @SuppressWarnings("deprecation")
+    public static final String SSL_TRUSTSTORE_PASSWD = X509Util.SSL_TRUSTSTORE_PASSWD;
+    @SuppressWarnings("deprecation")
+    public static final String SSL_AUTHPROVIDER = X509Util.SSL_AUTHPROVIDER;
+    public static final String JUTE_MAXBUFFER = "jute.maxbuffer";
+    /**
+     * Path to a kinit binary: {@value}. Defaults to
+     * <code>"/usr/bin/kinit"</code>
+     */
+    public static final String KINIT_COMMAND = "zookeeper.kinit";
+    public static final String JGSS_NATIVE = "sun.security.jgss.native";
+
+    private final Map<String, String> properties = new HashMap<String, String>();
+
+    /**
+     * properties, which are common to both client and server, are initialized
+     * from system properties
+     */
+    public ZKConfig() {
+        init();
+    }
+
+    /**
+     * @param configPath
+     *            Configuration file path
+     * @throws ConfigException
+     *             if failed to load configuration properties
+     */
+
+    public ZKConfig(String configPath) throws ConfigException {
+        this(new File(configPath));
+    }
+
+    /**
+     *
+     * @param configFile
+     *            Configuration file
+     * @throws ConfigException
+     *             if failed to load configuration properties
+     */
+    public ZKConfig(File configFile) throws ConfigException {
+        this();
+        addConfiguration(configFile);
+    }
+
+    private void init() {
+        /**
+         * backward compatibility for all currently available client properties
+         */
+        handleBackwardCompatibility();
+    }
+
+    /**
+     * Now onwards client code will use properties from this class but older
+     * clients still be setting properties through system properties. So to make
+     * this change backward compatible we should set old system properties in
+     * this configuration.
+     */
+    protected void handleBackwardCompatibility() {
+        properties.put(SSL_KEYSTORE_LOCATION, System.getProperty(SSL_KEYSTORE_LOCATION));
+        properties.put(SSL_KEYSTORE_PASSWD, System.getProperty(SSL_KEYSTORE_PASSWD));
+        properties.put(SSL_TRUSTSTORE_LOCATION, System.getProperty(SSL_TRUSTSTORE_LOCATION));
+        properties.put(SSL_TRUSTSTORE_PASSWD, System.getProperty(SSL_TRUSTSTORE_PASSWD));
+        properties.put(SSL_AUTHPROVIDER, System.getProperty(SSL_AUTHPROVIDER));
+        properties.put(JUTE_MAXBUFFER, System.getProperty(JUTE_MAXBUFFER));
+        properties.put(KINIT_COMMAND, System.getProperty(KINIT_COMMAND));
+        properties.put(JGSS_NATIVE, System.getProperty(JGSS_NATIVE));
+    }
+
+    /**
+     * Get the property value
+     *
+     * @param key
+     * @return property value
+     */
+    public String getProperty(String key) {
+        return properties.get(key);
+    }
+
+    /**
+     * Get the property value, if it is null return default value
+     *
+     * @param key
+     *            property key
+     * @param defaultValue
+     * @return property value or default value
+     */
+    public String getProperty(String key, String defaultValue) {
+        String value = properties.get(key);
+        return (value == null) ? defaultValue : value;
+    }
+
+    /**
+     * Return the value of "java.security.auth.login.config" system property
+     *
+     * @return value
+     */
+    public String getJaasConfKey() {
+        return System.getProperty(Environment.JAAS_CONF_KEY);
+    }
+
+    /**
+     * Maps the specified <code>key</code> to the specified <code>value</code>.
+     * key can not be <code>null</code>. If key is already mapped then the old
+     * value of the <code>key</code> is replaced by the specified
+     * <code>value</code>.
+     * 
+     * @param key
+     * @param value
+     */
+    public void setProperty(String key, String value) {
+        if (null == key) {
+            throw new IllegalArgumentException("property key is null.");
+        }
+        String oldValue = properties.put(key, value);
+        if (LOG.isDebugEnabled()) {
+            if (null != oldValue && !oldValue.equals(value)) {
+                LOG.debug("key {}'s value {} is replaced with new value {}", key, oldValue, value);
+            }
+        }
+    }
+
+    /**
+     * Add a configuration resource. The properties form this configuration will
+     * overwrite corresponding already loaded property and system property
+     *
+     * @param configFile
+     *            Configuration file.
+     */
+    public void addConfiguration(File configFile) throws ConfigException {
+        LOG.info("Reading configuration from: {}", configFile.getAbsolutePath());
+        try {
+            configFile = (new VerifyingFileFactory.Builder(LOG).warnForRelativePath().failForNonExistingPath().build())
+                    .validate(configFile);
+            Properties cfg = new Properties();
+            FileInputStream in = new FileInputStream(configFile);
+            try {
+                cfg.load(in);
+            } finally {
+                in.close();
+            }
+            parseProperties(cfg);
+        } catch (IOException | IllegalArgumentException e) {
+            LOG.error("Error while configuration from: {}", configFile.getAbsolutePath(), e);
+            throw new ConfigException("Error while processing " + configFile.getAbsolutePath(), e);
+        }
+    }
+
+    /**
+     * Add a configuration resource. The properties form this configuration will
+     * overwrite corresponding already loaded property and system property
+     *
+     * @param configPath
+     *            Configuration file path.
+     */
+    public void addConfiguration(String configPath) throws ConfigException {
+        addConfiguration(new File(configPath));
+    }
+
+    private void parseProperties(Properties cfg) {
+        for (Entry<Object, Object> entry : cfg.entrySet()) {
+            String key = entry.getKey().toString().trim();
+            String value = entry.getValue().toString().trim();
+            setProperty(key, value);
+        }
+    }
+
+    /**
+     * Returns {@code true} if and only if the property named by the argument
+     * exists and is equal to the string {@code "true"}.
+     */
+    public boolean getBoolean(String key) {
+        return Boolean.parseBoolean(getProperty(key));
+    }
+
+}
diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
index 2b07d45..8bfd83f 100644
--- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
+++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
@@ -39,6 +39,7 @@
 import javax.net.ssl.X509TrustManager;
 
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.common.ZKConfig;
 import org.apache.zookeeper.common.X509Exception;
 import org.apache.zookeeper.common.X509Exception.SSLContextException;
 import org.apache.zookeeper.common.X509Util;
@@ -291,7 +292,7 @@
                     cnxn.setClientCertificateChain(session.getPeerCertificates());
 
                     String authProviderProp
-                            = System.getProperty(X509Util.SSL_AUTHPROVIDER, "x509");
+                            = System.getProperty(ZKConfig.SSL_AUTHPROVIDER, "x509");
 
                     X509AuthenticationProvider authProvider =
                             (X509AuthenticationProvider)
@@ -351,7 +352,7 @@
 
     private synchronized void initSSL(ChannelPipeline p)
             throws X509Exception, KeyManagementException, NoSuchAlgorithmException {
-        String authProviderProp = System.getProperty(X509Util.SSL_AUTHPROVIDER);
+        String authProviderProp = System.getProperty(ZKConfig.SSL_AUTHPROVIDER);
         SSLContext sslContext;
         if (authProviderProp == null) {
             sslContext = X509Util.createSSLContext();
@@ -359,7 +360,7 @@
             sslContext = SSLContext.getInstance("TLSv1");
             X509AuthenticationProvider authProvider =
                     (X509AuthenticationProvider)ProviderRegistry.getProvider(
-                            System.getProperty(X509Util.SSL_AUTHPROVIDER,
+                            System.getProperty(ZKConfig.SSL_AUTHPROVIDER,
                                     "x509"));
 
             if (authProvider == null)
diff --git a/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java b/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
index 1235faa..fc6766c 100644
--- a/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
+++ b/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
@@ -33,6 +33,7 @@
 
 import org.apache.zookeeper.Environment;
 import org.apache.zookeeper.Login;
+import org.apache.zookeeper.common.ZKConfig;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.server.auth.SaslServerCallbackHandler;
 import org.slf4j.Logger;
@@ -234,7 +235,7 @@
         // jaas.conf entry available
         try {
             saslServerCallbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration());
-            login = new Login(serverSection, saslServerCallbackHandler);
+            login = new Login(serverSection, saslServerCallbackHandler, new ZKConfig() );
             login.startThreadIfNeeded();
         } catch (LoginException e) {
             throw new IOException("Could not configure server because SASL configuration did not allow the "
diff --git a/src/java/main/org/apache/zookeeper/server/auth/X509AuthenticationProvider.java b/src/java/main/org/apache/zookeeper/server/auth/X509AuthenticationProvider.java
index eeda754..902b307 100644
--- a/src/java/main/org/apache/zookeeper/server/auth/X509AuthenticationProvider.java
+++ b/src/java/main/org/apache/zookeeper/server/auth/X509AuthenticationProvider.java
@@ -26,6 +26,7 @@
 import javax.security.auth.x500.X500Principal;
 
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.common.ZKConfig;
 import org.apache.zookeeper.common.X509Exception.KeyManagerException;
 import org.apache.zookeeper.common.X509Exception.TrustManagerException;
 import org.apache.zookeeper.common.X509Util;
@@ -65,9 +66,9 @@
      */
     public X509AuthenticationProvider() {
         String keyStoreLocationProp = System.getProperty(
-                X509Util.SSL_KEYSTORE_LOCATION);
+                ZKConfig.SSL_KEYSTORE_LOCATION);
         String keyStorePasswordProp = System.getProperty(
-                X509Util.SSL_KEYSTORE_PASSWD);
+                ZKConfig.SSL_KEYSTORE_PASSWD);
 
         X509KeyManager km = null;
         X509TrustManager tm = null;
@@ -79,9 +80,9 @@
         }
 
         String trustStoreLocationProp = System.getProperty(
-                X509Util.SSL_TRUSTSTORE_LOCATION);
+                ZKConfig.SSL_TRUSTSTORE_LOCATION);
         String trustStorePasswordProp = System.getProperty(
-                X509Util.SSL_TRUSTSTORE_PASSWD);
+                ZKConfig.SSL_TRUSTSTORE_PASSWD);
 
         try {
             tm = X509Util.createTrustManager(
diff --git a/src/java/main/org/apache/zookeeper/server/util/VerifyingFileFactory.java b/src/java/main/org/apache/zookeeper/server/util/VerifyingFileFactory.java
index 74c7408..0e93159 100644
--- a/src/java/main/org/apache/zookeeper/server/util/VerifyingFileFactory.java
+++ b/src/java/main/org/apache/zookeeper/server/util/VerifyingFileFactory.java
@@ -37,6 +37,10 @@
 
     public File create(String path) {
         File file = new File(path);
+        return validate(file);
+    }
+
+    public File validate(File file) {
         if(warnForRelativePath) doWarnForRelativePath(file);
         if(failForNonExistingPath) doFailForNonExistingPath(file);
         return file;
diff --git a/src/java/test/org/apache/zookeeper/ClientReconnectTest.java b/src/java/test/org/apache/zookeeper/ClientReconnectTest.java
index 111c275..566b915 100644
--- a/src/java/test/org/apache/zookeeper/ClientReconnectTest.java
+++ b/src/java/test/org/apache/zookeeper/ClientReconnectTest.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.client.HostProvider;
 import org.junit.Assert;
 import org.junit.Test;
@@ -38,7 +39,7 @@
     
     class MockCnxn extends ClientCnxnSocketNIO {
         MockCnxn() throws IOException {
-            super();
+            super(new ZKClientConfig());
         }
 
         @Override
@@ -61,6 +62,7 @@
         InetSocketAddress inaddr = new InetSocketAddress("127.0.0.1", 1111);
         when(hostProvider.next(anyLong())).thenReturn(inaddr);
         ZooKeeper zk = mock(ZooKeeper.class);
+        when(zk.getClientConfig()).thenReturn(new ZKClientConfig());
         sc =  SocketChannel.open();
 
         ClientCnxnSocketNIO nioCnxn = new MockCnxn();
diff --git a/src/java/test/org/apache/zookeeper/RemoveWatchesTest.java b/src/java/test/org/apache/zookeeper/RemoveWatchesTest.java
index be4d7fa..931bb6f 100644
--- a/src/java/test/org/apache/zookeeper/RemoveWatchesTest.java
+++ b/src/java/test/org/apache/zookeeper/RemoveWatchesTest.java
@@ -36,6 +36,7 @@
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.WatcherType;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.Assert;
@@ -1111,6 +1112,10 @@
      * before/after calling removeWatches */
     private class MyZooKeeper extends ZooKeeper {
         class MyWatchManager extends ZKWatchManager {
+            public MyWatchManager(boolean disableAutoWatchReset) {
+                super(disableAutoWatchReset);
+            }
+
             public int lastrc;
 
             /* Pretend that any watcher exists */
@@ -1136,7 +1141,7 @@
         private MyWatchManager myWatchManager;
 
         protected ZKWatchManager defaultWatchManager() {
-            myWatchManager = new MyWatchManager();
+            myWatchManager = new MyWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
             return myWatchManager;
         }
 
@@ -1150,7 +1155,7 @@
         private String eventPath;
         private CountDownLatch latch;
         private List<EventType> eventsAfterWatchRemoval = new ArrayList<EventType>();
-        public MyWatcher(String path, int count) {
+        MyWatcher(String path, int count) {
             this.path = path;
             latch = new CountDownLatch(count);
         }
diff --git a/src/java/test/org/apache/zookeeper/client/ZKClientConfigTest.java b/src/java/test/org/apache/zookeeper/client/ZKClientConfigTest.java
new file mode 100644
index 0000000..88ae1fc
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/client/ZKClientConfigTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.client;
+
+import static org.apache.zookeeper.client.ZKClientConfig.DISABLE_AUTO_WATCH_RESET;
+import static org.apache.zookeeper.client.ZKClientConfig.ENABLE_CLIENT_SASL_KEY;
+import static org.apache.zookeeper.client.ZKClientConfig.LOGIN_CONTEXT_NAME_KEY;
+import static org.apache.zookeeper.client.ZKClientConfig.SECURE_CLIENT;
+import static org.apache.zookeeper.client.ZKClientConfig.ZK_SASL_CLIENT_USERNAME;
+import static org.apache.zookeeper.client.ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+import static org.apache.zookeeper.client.ZKClientConfig.ZOOKEEPER_SERVER_REALM;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+public class ZKClientConfigTest {
+    private static final File testData = new File(System.getProperty("test.data.dir", "build/test/data"));
+    @Rule
+    public Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
+
+    @BeforeClass
+    public static void init() {
+        if (!testData.exists()) {
+            testData.mkdirs();
+        }
+    }
+
+    @Test
+    public void testDefaultConfiguration() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(ZK_SASL_CLIENT_USERNAME, "zookeeper1");
+        properties.put(LOGIN_CONTEXT_NAME_KEY, "Client1");
+        properties.put(ENABLE_CLIENT_SASL_KEY, "true");
+        properties.put(ZOOKEEPER_SERVER_REALM, "zookeeper/hadoop.hadoop.com");
+        properties.put(DISABLE_AUTO_WATCH_RESET, "true");
+        properties.put(ZOOKEEPER_CLIENT_CNXN_SOCKET, "ClientCnxnSocketNetty");
+        properties.put(SECURE_CLIENT, "true");
+
+        for (Map.Entry<String, String> e : properties.entrySet()) {
+            System.setProperty(e.getKey(), e.getValue());
+        }
+        /**
+         * ZKClientConfig should get initialized with system properties
+         */
+        ZKClientConfig conf = new ZKClientConfig();
+        for (Map.Entry<String, String> e : properties.entrySet()) {
+            assertEquals(e.getValue(), conf.getProperty(e.getKey()));
+        }
+        /**
+         * clear properties
+         */
+        for (Map.Entry<String, String> e : properties.entrySet()) {
+            System.clearProperty(e.getKey());
+        }
+
+        conf = new ZKClientConfig();
+        /**
+         * test that all the properties are null
+         */
+        for (Map.Entry<String, String> e : properties.entrySet()) {
+            String result = conf.getProperty(e.getKey());
+            assertNull(result);
+        }
+    }
+
+    @Test
+    public void testSystemPropertyValue() {
+        String clientName = "zookeeper1";
+        System.setProperty(ZK_SASL_CLIENT_USERNAME, clientName);
+
+        ZKClientConfig conf = new ZKClientConfig();
+        assertEquals(conf.getProperty(ZK_SASL_CLIENT_USERNAME), clientName);
+
+        String newClientName = "zookeeper2";
+        conf.setProperty(ZK_SASL_CLIENT_USERNAME, newClientName);
+
+        assertEquals(conf.getProperty(ZK_SASL_CLIENT_USERNAME), newClientName);
+    }
+
+    @Test
+    public void testReadConfigurationFile() throws IOException, ConfigException {
+        File file = File.createTempFile("clientConfig", ".conf", testData);
+        file.deleteOnExit();
+        Properties clientConfProp = new Properties();
+        clientConfProp.setProperty(ENABLE_CLIENT_SASL_KEY, "true");
+        clientConfProp.setProperty(ZK_SASL_CLIENT_USERNAME, "ZK");
+        clientConfProp.setProperty(LOGIN_CONTEXT_NAME_KEY, "MyClient");
+        clientConfProp.setProperty(ZOOKEEPER_SERVER_REALM, "HADOOP.COM");
+        clientConfProp.setProperty("dummyProperty", "dummyValue");
+        OutputStream io = new FileOutputStream(file);
+        try {
+            clientConfProp.store(io, "Client Configurations");
+        } finally {
+            io.close();
+        }
+
+        ZKClientConfig conf = new ZKClientConfig();
+        conf.addConfiguration(file.getAbsolutePath());
+        assertEquals(conf.getProperty(ENABLE_CLIENT_SASL_KEY), "true");
+        assertEquals(conf.getProperty(ZK_SASL_CLIENT_USERNAME), "ZK");
+        assertEquals(conf.getProperty(LOGIN_CONTEXT_NAME_KEY), "MyClient");
+        assertEquals(conf.getProperty(ZOOKEEPER_SERVER_REALM), "HADOOP.COM");
+        assertEquals(conf.getProperty("dummyProperty"), "dummyValue");
+
+        // try to delete it now as we have done with the created file, why to
+        // wait for deleteOnExit() deletion
+        file.delete();
+
+    }
+
+    @Test
+    public void testSetConfiguration() {
+        ZKClientConfig conf = new ZKClientConfig();
+        String defaultValue = conf.getProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY,
+                ZKClientConfig.ENABLE_CLIENT_SASL_DEFAULT);
+        if (defaultValue.equals("true")) {
+            conf.setProperty(ENABLE_CLIENT_SASL_KEY, "false");
+        } else {
+            conf.setProperty(ENABLE_CLIENT_SASL_KEY, "true");
+        }
+        assertTrue(conf.getProperty(ENABLE_CLIENT_SASL_KEY) != defaultValue);
+    }
+
+}
diff --git a/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java b/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java
index aa84014..684d67a 100644
--- a/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java
+++ b/src/java/test/org/apache/zookeeper/test/NettyNettySuiteBase.java
@@ -19,7 +19,7 @@
 package org.apache.zookeeper.test;
 
 import org.apache.zookeeper.ClientCnxnSocketNetty;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.server.NettyServerCnxnFactory;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.junit.AfterClass;
@@ -36,7 +36,7 @@
     public static void setUp() {
         System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
                 NettyServerCnxnFactory.class.getName());
-        System.setProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET,
+        System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
                 ClientCnxnSocketNetty.class.getName());
         System.setProperty("zookeeper.admin.enableServer", "false");
     }
@@ -44,6 +44,6 @@
     @AfterClass
     public static void tearDown() {
         System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
-        System.clearProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET);
+        System.clearProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
     }
 }
diff --git a/src/java/test/org/apache/zookeeper/test/SSLAuthTest.java b/src/java/test/org/apache/zookeeper/test/SSLAuthTest.java
index 0bba28e..7a4c02c 100644
--- a/src/java/test/org/apache/zookeeper/test/SSLAuthTest.java
+++ b/src/java/test/org/apache/zookeeper/test/SSLAuthTest.java
@@ -22,8 +22,8 @@
 
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.TestableZooKeeper;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.common.X509Util;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ZKConfig;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.junit.After;
 import org.junit.Assert;
@@ -35,13 +35,13 @@
     public void setUp() throws Exception {
         String testDataPath = System.getProperty("test.data.dir", "build/test/data");
         System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, "org.apache.zookeeper.server.NettyServerCnxnFactory");
-        System.setProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty");
-        System.setProperty(ZooKeeper.SECURE_CLIENT, "true");
-        System.setProperty(X509Util.SSL_AUTHPROVIDER, "x509");
-        System.setProperty(X509Util.SSL_KEYSTORE_LOCATION, testDataPath + "/ssl/testKeyStore.jks");
-        System.setProperty(X509Util.SSL_KEYSTORE_PASSWD, "testpass");
-        System.setProperty(X509Util.SSL_TRUSTSTORE_LOCATION, testDataPath + "/ssl/testTrustStore.jks");
-        System.setProperty(X509Util.SSL_TRUSTSTORE_PASSWD, "testpass");
+        System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty");
+        System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
+        System.setProperty(ZKConfig.SSL_AUTHPROVIDER, "x509");
+        System.setProperty(ZKConfig.SSL_KEYSTORE_LOCATION, testDataPath + "/ssl/testKeyStore.jks");
+        System.setProperty(ZKConfig.SSL_KEYSTORE_PASSWD, "testpass");
+        System.setProperty(ZKConfig.SSL_TRUSTSTORE_LOCATION, testDataPath + "/ssl/testTrustStore.jks");
+        System.setProperty(ZKConfig.SSL_TRUSTSTORE_PASSWD, "testpass");
         System.setProperty("javax.net.debug", "ssl");
 
         String host = "localhost";
@@ -57,13 +57,13 @@
     @After
     public void teardown() throws Exception {
         System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
-        System.clearProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET);
-        System.clearProperty(ZooKeeper.SECURE_CLIENT);
-        System.clearProperty(X509Util.SSL_AUTHPROVIDER);
-        System.clearProperty(X509Util.SSL_KEYSTORE_LOCATION);
-        System.clearProperty(X509Util.SSL_KEYSTORE_PASSWD);
-        System.clearProperty(X509Util.SSL_TRUSTSTORE_LOCATION);
-        System.clearProperty(X509Util.SSL_TRUSTSTORE_PASSWD);
+        System.clearProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
+        System.clearProperty(ZKClientConfig.SECURE_CLIENT);
+        System.clearProperty(ZKConfig.SSL_AUTHPROVIDER);
+        System.clearProperty(ZKConfig.SSL_KEYSTORE_LOCATION);
+        System.clearProperty(ZKConfig.SSL_KEYSTORE_PASSWD);
+        System.clearProperty(ZKConfig.SSL_TRUSTSTORE_LOCATION);
+        System.clearProperty(ZKConfig.SSL_TRUSTSTORE_PASSWD);
         System.clearProperty("javax.net.debug");
     }
 
@@ -72,8 +72,8 @@
         String testDataPath = System.getProperty("test.data.dir", "build/test/data");
 
         // Replace trusted keys with a valid key that is not trusted by the server
-        System.setProperty(X509Util.SSL_KEYSTORE_LOCATION, testDataPath + "/ssl/testUntrustedKeyStore.jks");
-        System.setProperty(X509Util.SSL_KEYSTORE_PASSWD, "testpass");
+        System.setProperty(ZKConfig.SSL_KEYSTORE_LOCATION, testDataPath + "/ssl/testUntrustedKeyStore.jks");
+        System.setProperty(ZKConfig.SSL_KEYSTORE_PASSWD, "testpass");
 
         CountdownWatcher watcher = new CountdownWatcher();
 
@@ -85,11 +85,11 @@
 
     @Test
     public void testMisconfiguration() throws Exception {
-        System.clearProperty(X509Util.SSL_AUTHPROVIDER);
-        System.clearProperty(X509Util.SSL_KEYSTORE_LOCATION);
-        System.clearProperty(X509Util.SSL_KEYSTORE_PASSWD);
-        System.clearProperty(X509Util.SSL_TRUSTSTORE_LOCATION);
-        System.clearProperty(X509Util.SSL_TRUSTSTORE_PASSWD);
+        System.clearProperty(ZKConfig.SSL_AUTHPROVIDER);
+        System.clearProperty(ZKConfig.SSL_KEYSTORE_LOCATION);
+        System.clearProperty(ZKConfig.SSL_KEYSTORE_PASSWD);
+        System.clearProperty(ZKConfig.SSL_TRUSTSTORE_LOCATION);
+        System.clearProperty(ZKConfig.SSL_TRUSTSTORE_PASSWD);
 
         CountdownWatcher watcher = new CountdownWatcher();
         new TestableZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher);
diff --git a/src/java/test/org/apache/zookeeper/test/SSLTest.java b/src/java/test/org/apache/zookeeper/test/SSLTest.java
index 1131751..a687f6f 100644
--- a/src/java/test/org/apache/zookeeper/test/SSLTest.java
+++ b/src/java/test/org/apache/zookeeper/test/SSLTest.java
@@ -30,7 +30,8 @@
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.common.X509Util;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ZKConfig;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
 import org.junit.After;
@@ -44,23 +45,23 @@
     public void setup() {
         String testDataPath = System.getProperty("test.data.dir", "build/test/data");
         System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, "org.apache.zookeeper.server.NettyServerCnxnFactory");
-        System.setProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty");
-        System.setProperty(ZooKeeper.SECURE_CLIENT, "true");
-        System.setProperty(X509Util.SSL_KEYSTORE_LOCATION, testDataPath + "/ssl/testKeyStore.jks");
-        System.setProperty(X509Util.SSL_KEYSTORE_PASSWD, "testpass");
-        System.setProperty(X509Util.SSL_TRUSTSTORE_LOCATION, testDataPath + "/ssl/testTrustStore.jks");
-        System.setProperty(X509Util.SSL_TRUSTSTORE_PASSWD, "testpass");
+        System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty");
+        System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
+        System.setProperty(ZKConfig.SSL_KEYSTORE_LOCATION, testDataPath + "/ssl/testKeyStore.jks");
+        System.setProperty(ZKConfig.SSL_KEYSTORE_PASSWD, "testpass");
+        System.setProperty(ZKConfig.SSL_TRUSTSTORE_LOCATION, testDataPath + "/ssl/testTrustStore.jks");
+        System.setProperty(ZKConfig.SSL_TRUSTSTORE_PASSWD, "testpass");
     }
 
     @After
     public void teardown() throws Exception {
         System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
-        System.clearProperty(ZooKeeper.ZOOKEEPER_CLIENT_CNXN_SOCKET);
-        System.clearProperty(ZooKeeper.SECURE_CLIENT);
-        System.clearProperty(X509Util.SSL_KEYSTORE_LOCATION);
-        System.clearProperty(X509Util.SSL_KEYSTORE_PASSWD);
-        System.clearProperty(X509Util.SSL_TRUSTSTORE_LOCATION);
-        System.clearProperty(X509Util.SSL_TRUSTSTORE_PASSWD);
+        System.clearProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
+        System.clearProperty(ZKClientConfig.SECURE_CLIENT);
+        System.clearProperty(ZKConfig.SSL_KEYSTORE_LOCATION);
+        System.clearProperty(ZKConfig.SSL_KEYSTORE_PASSWD);
+        System.clearProperty(ZKConfig.SSL_TRUSTSTORE_LOCATION);
+        System.clearProperty(ZKConfig.SSL_TRUSTSTORE_PASSWD);
     }
 
     /**
diff --git a/src/java/test/org/apache/zookeeper/test/SaslAuthDesignatedClientTest.java b/src/java/test/org/apache/zookeeper/test/SaslAuthDesignatedClientTest.java
index 3c909ce..c007022 100644
--- a/src/java/test/org/apache/zookeeper/test/SaslAuthDesignatedClientTest.java
+++ b/src/java/test/org/apache/zookeeper/test/SaslAuthDesignatedClientTest.java
@@ -29,6 +29,7 @@
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.client.ZooKeeperSaslClient;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
@@ -38,7 +39,8 @@
 public class SaslAuthDesignatedClientTest extends ClientBase {
     static {
         System.setProperty("zookeeper.authProvider.1","org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
-        System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "MyZookeeperClient");
+        System.setProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY,
+                "MyZookeeperClient");
 
         try {
             File tmpDir = createTmpDir();
@@ -144,7 +146,7 @@
       Thread.sleep(100);
       
       // disable Client Sasl
-      System.setProperty(ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY, "false");
+        System.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY, "false");
       
       try {
         zk = createClient();
@@ -157,7 +159,8 @@
         zk.close();
       } finally {
         // enable Client Sasl
-        System.setProperty(ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY, "true");
+            System.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY,
+                    "true");
       }
     }
 }
diff --git a/src/java/test/org/apache/zookeeper/test/SaslAuthFailDesignatedClientTest.java b/src/java/test/org/apache/zookeeper/test/SaslAuthFailDesignatedClientTest.java
index 5291141..df8c903 100644
--- a/src/java/test/org/apache/zookeeper/test/SaslAuthFailDesignatedClientTest.java
+++ b/src/java/test/org/apache/zookeeper/test/SaslAuthFailDesignatedClientTest.java
@@ -22,23 +22,20 @@
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.TestableZooKeeper;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class SaslAuthFailDesignatedClientTest extends ClientBase {
     static {
         System.setProperty("zookeeper.authProvider.1","org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
-        System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "MyZookeeperClient");
+        System.setProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY,
+                "MyZookeeperClient");
 
         try {
             File tmpDir = createTmpDir();
@@ -70,19 +67,6 @@
         }
     }
 
-    private AtomicInteger authFailed = new AtomicInteger(0);
-
-    private class MyWatcher extends CountdownWatcher {
-        @Override
-        public synchronized void process(WatchedEvent event) {
-            if (event.getState() == KeeperState.AuthFailed) {
-                authFailed.incrementAndGet();
-            }
-            else {
-                super.process(event);
-            }
-        }
-    }
 
     @Test
     public void testAuth() throws Exception {
diff --git a/src/java/test/org/apache/zookeeper/test/SaslAuthMissingClientConfigTest.java b/src/java/test/org/apache/zookeeper/test/SaslAuthMissingClientConfigTest.java
index 98be0be..ee01387 100644
--- a/src/java/test/org/apache/zookeeper/test/SaslAuthMissingClientConfigTest.java
+++ b/src/java/test/org/apache/zookeeper/test/SaslAuthMissingClientConfigTest.java
@@ -21,15 +21,12 @@
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -38,7 +35,8 @@
         System.setProperty("zookeeper.authProvider.1","org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
         // This configuration section 'MyZookeeperClient', is missing from the JAAS configuration.
         // As a result, SASL authentication should fail, which is tested by this test (testAuth()).
-        System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, "MyZookeeperClient");
+        System.setProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY,
+                "MyZookeeperClient");
 
         try {
             File tmpDir = createTmpDir();
@@ -66,20 +64,6 @@
         }
     }
 
-    private AtomicInteger authFailed = new AtomicInteger(0);
-
-    private class MyWatcher extends CountdownWatcher {
-        @Override
-        public synchronized void process(WatchedEvent event) {
-            if (event.getState() == KeeperState.AuthFailed) {
-                authFailed.incrementAndGet();
-            }
-            else {
-                super.process(event);
-            }
-        }
-    }
-
     @Test
     public void testAuth() throws Exception {
         ZooKeeper zk = createClient();
diff --git a/src/java/test/org/apache/zookeeper/test/SaslClientTest.java b/src/java/test/org/apache/zookeeper/test/SaslClientTest.java
index 8213abc..95bf2f6 100644
--- a/src/java/test/org/apache/zookeeper/test/SaslClientTest.java
+++ b/src/java/test/org/apache/zookeeper/test/SaslClientTest.java
@@ -18,45 +18,52 @@
 
 package org.apache.zookeeper.test;
 
+
 import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
-
 public class SaslClientTest extends ZKTestCase {
 
     private String existingPropertyValue = null;
 
     @Before
     public void setUp() {
-        existingPropertyValue = System.getProperty(ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY);
+        existingPropertyValue = System
+                .getProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY);
     }
 
     @After
     public void tearDown() {
         // Restore the System property if it was set previously
         if (existingPropertyValue != null) {
-            System.setProperty(ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY, existingPropertyValue);
+            System.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY,
+                    existingPropertyValue);
         }
     }
 
     @Test
     public void testSaslClientDisabled() {
-        System.clearProperty(ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY);
-        Assert.assertTrue("SASL client disabled", ZooKeeperSaslClient.isEnabled());
+        System.clearProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY);
+        Assert.assertTrue("SASL client disabled",
+                new ZKClientConfig().isSaslClientEnabled());
 
         for (String value : Arrays.asList("true", "TRUE")) {
-            System.setProperty(ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY, value);
-            Assert.assertTrue("SASL client disabled", ZooKeeperSaslClient.isEnabled());
+            System.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY,
+                    value);
+            Assert.assertTrue("SASL client disabled",
+                    new ZKClientConfig().isSaslClientEnabled());
         }
 
         for (String value : Arrays.asList("false", "FALSE")) {
-            System.setProperty(ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY, value);
-            Assert.assertFalse("SASL client disabled", ZooKeeperSaslClient.isEnabled());
+            System.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY,
+                    value);
+            Assert.assertFalse("SASL client disabled",
+                    new ZKClientConfig().isSaslClientEnabled());
         }
     }
 }
diff --git a/src/java/test/org/apache/zookeeper/test/WatcherTest.java b/src/java/test/org/apache/zookeeper/test/WatcherTest.java
index 899099d..8f3c4fe 100644
--- a/src/java/test/org/apache/zookeeper/test/WatcherTest.java
+++ b/src/java/test/org/apache/zookeeper/test/WatcherTest.java
@@ -25,7 +25,6 @@
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.ClientCnxn;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.TestableZooKeeper;
@@ -36,6 +35,7 @@
 import org.apache.zookeeper.Watcher.Event;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.data.Stat;
 import org.junit.Assert;
 import org.junit.Before;
@@ -73,7 +73,7 @@
         super.setUp();
         // Reset to default value since some test cases set this to true.
         // Needed for JDK7 since unit test can run is random order
-        ClientCnxn.setDisableAutoResetWatch(false);
+        System.setProperty(ZKClientConfig.DISABLE_AUTO_WATCH_RESET, "false");
     }
 
     /**
@@ -243,13 +243,16 @@
 
     @Test
     public void testWatcherAutoResetDisabledWithGlobal() throws Exception {
-        ClientCnxn.setDisableAutoResetWatch(true);
+        /**
+         * When ZooKeeper is created this property will get used.
+         */
+        System.setProperty(ZKClientConfig.DISABLE_AUTO_WATCH_RESET, "true");
         testWatcherAutoResetWithGlobal();
     }
 
     @Test
     public void testWatcherAutoResetDisabledWithLocal() throws Exception {
-        ClientCnxn.setDisableAutoResetWatch(true);
+        System.setProperty(ZKClientConfig.DISABLE_AUTO_WATCH_RESET, "true");
         testWatcherAutoResetWithLocal();
     }
 
@@ -278,7 +281,8 @@
         localWatcher.waitForDisconnected(500);
         startServer();
         globalWatcher.waitForConnected(3000);
-        if (!isGlobal && !ClientCnxn.getDisableAutoResetWatch()) {
+        boolean disableAutoWatchReset = zk.getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET);
+        if (!isGlobal && !disableAutoWatchReset) {
             localWatcher.waitForConnected(500);
         }
 
@@ -288,7 +292,7 @@
                 CreateMode.PERSISTENT);
 
         WatchedEvent e;
-        if (!ClientCnxn.getDisableAutoResetWatch()) {
+        if (!disableAutoWatchReset) {
             e = localWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
             Assert.assertEquals(e.getPath(), EventType.NodeDataChanged, e.getType());
             Assert.assertEquals("/watchtest/child", e.getPath());
@@ -297,7 +301,7 @@
             // why waste the time on poll
         }
 
-        if (!ClientCnxn.getDisableAutoResetWatch()) {
+        if (!disableAutoWatchReset) {
             e = localWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
             // The create will trigger the get children and the exist
             // watches
@@ -308,7 +312,7 @@
             // why waste the time on poll
         }
 
-        if (!ClientCnxn.getDisableAutoResetWatch()) {
+        if (!disableAutoWatchReset) {
             e = localWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
             Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
             Assert.assertEquals("/watchtest", e.getPath());
@@ -323,11 +327,11 @@
         try {
             try {
                 localWatcher.waitForDisconnected(500);
-                if (!isGlobal && !ClientCnxn.getDisableAutoResetWatch()) {
+                if (!isGlobal && !disableAutoWatchReset) {
                     Assert.fail("Got an event when I shouldn't have");
                 }
             } catch(TimeoutException toe) {
-                if (ClientCnxn.getDisableAutoResetWatch()) {
+                if (disableAutoWatchReset) {
                     Assert.fail("Didn't get an event when I should have");
                 }
                 // Else what we are expecting since there are no outstanding watches
@@ -368,14 +372,14 @@
         localWatcher.waitForDisconnected(500);
         startServer();
         globalWatcher.waitForConnected(TIMEOUT);
-        if (!isGlobal && !ClientCnxn.getDisableAutoResetWatch()) {
+        if (!isGlobal && !disableAutoWatchReset) {
             localWatcher.waitForConnected(500);
         }
 
         zk.delete("/watchtest/child", -1);
         zk.delete("/watchtest", -1);
 
-        if (!ClientCnxn.getDisableAutoResetWatch()) {
+        if (!disableAutoWatchReset) {
             e = localWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
             Assert.assertEquals(EventType.NodeDeleted, e.getType());
             Assert.assertEquals("/watchtest/child", e.getPath());
@@ -387,7 +391,6 @@
         // Make sure nothing is straggling!
         Thread.sleep(1000);
         Assert.assertTrue(localWatcher.events.isEmpty());
-
     }
 
 }