Amendment to OOZIE-1847
diff --git a/core/src/main/java/org/apache/oozie/event/listener/ZKConnectionListener.java b/core/src/main/java/org/apache/oozie/event/listener/ZKConnectionListener.java
index 1406b6a..a5d22c0 100644
--- a/core/src/main/java/org/apache/oozie/event/listener/ZKConnectionListener.java
+++ b/core/src/main/java/org/apache/oozie/event/listener/ZKConnectionListener.java
@@ -31,6 +31,8 @@
 public class ZKConnectionListener implements ConnectionStateListener {
 
     private XLog LOG = XLog.getLog(getClass());
+    private static ConnectionState connectionState;
+    public static final String CONF_SHUTDOWN_ON_TIMEOUT = "oozie.zookeeper.server.shutdown.ontimeout";
 
     public ZKConnectionListener() {
         LOG.info("ZKConnectionListener started");
@@ -38,13 +40,14 @@
 
     @Override
     public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
+        connectionState = newState;
         LOG.trace("ZK connection status  = " + newState.toString());
-//        if (newState == ConnectionState.CONNECTED) {
-//             ZK connected
-//        }
+        // if (newState == ConnectionState.CONNECTED) {
+        // ZK connected
+        // }
         if (newState == ConnectionState.SUSPENDED) {
             LOG.warn("ZK connection is suspended, waiting for reconnect. If connection doesn't reconnect before "
-                    + ZKUtils.getZKConnectionTimeout() + " Oozie server will shutdown itself");
+                    + ZKUtils.getZKConnectionTimeout() + " (sec) Oozie server will shutdown itself");
         }
 
         if (newState == ConnectionState.RECONNECTED) {
@@ -53,10 +56,16 @@
         }
 
         if (newState == ConnectionState.LOST) {
-            LOG.fatal("ZK is connection is not reconnected in " + ZKUtils.getZKConnectionTimeout()
-                    + ", shutting down Oozie server");
-            Services.get().destroy();
-            System.exit(1);
+            LOG.fatal("ZK is not reconnected in " + ZKUtils.getZKConnectionTimeout());
+            if (Services.get().getConf().getBoolean(CONF_SHUTDOWN_ON_TIMEOUT, true)) {
+                LOG.fatal("Shutting down Oozie server");
+                Services.get().destroy();
+                System.exit(1);
+            }
         }
     }
+
+    public static ConnectionState getZKConnectionState() {
+        return connectionState;
+    }
 }
diff --git a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
index 96fb373..a64f613 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKJobsConcurrencyService.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.oozie.service;
 
 import java.util.ArrayList;
@@ -26,9 +25,11 @@
 import java.util.regex.Pattern;
 
 import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.event.listener.ZKConnectionListener;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
@@ -82,7 +83,7 @@
      */
     @Override
     public void destroy() {
-        if (leaderLatch != null) {
+        if (leaderLatch != null && ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) {
             IOUtils.closeSafely(leaderLatch);
         }
         if (zk != null) {
diff --git a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
index 36c00b9..6f333c8 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKLocksService.java
@@ -15,24 +15,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.oozie.service;
 
 import java.util.HashMap;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.util.Instrumentable;
 import org.apache.oozie.util.Instrumentation;
+import org.apache.oozie.event.listener.ZKConnectionListener;
 import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.ZKUtils;
+
 import java.io.IOException;
 import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.curator.framework.recipes.locks.ChildReaper;
 import org.apache.curator.framework.recipes.locks.Reaper;
+import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.utils.ThreadUtils;
+
 import com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -78,7 +83,7 @@
      */
     @Override
     public void destroy() {
-        if (reaper != null) {
+        if (reaper != null && ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) {
             try {
                 reaper.close();
             }
@@ -86,7 +91,6 @@
                 LOG.error("Error closing childReaper", e);
             }
         }
-
         if (zk != null) {
             zk.unregister(this);
         }
diff --git a/core/src/main/java/org/apache/oozie/util/ZKUtils.java b/core/src/main/java/org/apache/oozie/util/ZKUtils.java
index ec32092..ce38bf6 100644
--- a/core/src/main/java/org/apache/oozie/util/ZKUtils.java
+++ b/core/src/main/java/org/apache/oozie/util/ZKUtils.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.oozie.util;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -35,6 +34,7 @@
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
 import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.curator.x.discovery.ServiceCache;
@@ -163,7 +163,9 @@
         // If there are no more classes using ZooKeeper, we should teardown everything.
         users.remove(user);
         if (users.isEmpty() && zk != null) {
-            zk.teardown();
+            if (ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) {
+                zk.teardown();
+            }
             zk = null;
         }
     }
@@ -172,7 +174,7 @@
         // Connect to the ZooKeeper server
         RetryPolicy retryPolicy = ZKUtils.getRetryPloicy();
         String zkConnectionString = Services.get().getConf().get(ZK_CONNECTION_STRING, "localhost:2181");
-        String zkNamespace = Services.get().getConf().get(ZK_NAMESPACE, "oozie");
+        String zkNamespace = getZKNameSpace();
         zkConnectionTimeout = Services.get().getConf().getInt(ZK_CONNECTION_TIMEOUT, 180);
 
         ACLProvider aclProvider;
@@ -405,6 +407,14 @@
     public static RetryPolicy getRetryPloicy() {
         return new ExponentialBackoffRetry(1000, 3);
     }
+
+    /**
+     * Returns configured zk namesapces
+     * @return oozie.zookeeper.namespace
+     */
+    public static String getZKNameSpace() {
+        return Services.get().getConf().get(ZK_NAMESPACE, "oozie");
+    }
     /**
      * Return ZK connection timeout
      * @return
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 06693bb..26eb7e0 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2069,7 +2069,16 @@
         <value>180</value>
         <description>
         Default ZK connection timeout (in sec). If connection is lost for more than timeout, then Oozie server will shutdown
-        itself.
+        itself if oozie.zookeeper.server.shutdown.ontimeout is true.
+        </description>
+    </property>
+
+    <property>
+        <name>oozie.zookeeper.server.shutdown.ontimeout</name>
+        <value>true</value>
+        <description>
+            If true, Oozie server will shutdown itself on ZK
+            connection timeout.
         </description>
     </property>
 
diff --git a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
index 5697199..84d6e5d 100644
--- a/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/ZKXTestCase.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.oozie.test;
 
 import java.io.IOException;
@@ -35,6 +34,7 @@
 import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
 import org.apache.curator.x.discovery.ServiceInstance;
 import org.apache.curator.x.discovery.details.InstanceSerializer;
+import org.apache.oozie.event.listener.ZKConnectionListener;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.FixedJsonInstanceSerializer;
 import org.apache.oozie.util.ZKUtils;
@@ -89,6 +89,7 @@
         zkServer = setupZKServer();
         Services.get().getConf().set("oozie.zookeeper.connection.string", zkServer.getConnectString());
         Services.get().getConf().set("oozie.instance.id", ZK_ID);
+        Services.get().getConf().setBoolean(ZKConnectionListener.CONF_SHUTDOWN_ON_TIMEOUT, false);
         createClient();
         createServiceDiscovery();
     }