Retry tunnel setup when it fails (#3299)

* Retry tunnel setup when it fails

* add delay between retries

* fix bug
diff --git a/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java b/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
index fe097e5..78129cc 100644
--- a/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
+++ b/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
@@ -63,6 +63,8 @@
 
 public class CuratorStateManager extends FileSystemStateManager {
   private static final Logger LOG = Logger.getLogger(CuratorStateManager.class.getName());
+  private static final int TUNNEL_SETUP_RETRY = 0;  // 0 means no retry
+  private static final int TUNNEL_SETUP_RETRY_SLEEP_SEC = 5;
 
   private CuratorFramework client;
   private String connectionString;
@@ -83,17 +85,29 @@
         NetworkUtils.TunnelConfig.build(config, NetworkUtils.HeronSystem.STATE_MANAGER);
 
     if (tunnelConfig.isTunnelNeeded()) {
-      Pair<String, List<Process>> tunneledResults = setupZkTunnel(tunnelConfig);
+      for (int setupCount = 0;; ++setupCount) {
+        Pair<String, List<Process>> tunneledResults = setupZkTunnel(tunnelConfig);
+        String newConnectionString = tunneledResults.first;
 
-      String newConnectionString = tunneledResults.first;
-      if (newConnectionString.isEmpty()) {
-        throw new IllegalArgumentException("Failed to connect to tunnel host '"
-            + tunnelConfig.getTunnelHost() + "'");
+        // If tunnel can't be setup correctly. Retry or bail.
+        if (!newConnectionString.isEmpty()) {
+          // Success, use the new connection string
+          connectionString = newConnectionString;
+          tunnelProcesses.addAll(tunneledResults.second);
+          break;
+        } else {
+          if (setupCount < TUNNEL_SETUP_RETRY) {
+            try {
+              TimeUnit.SECONDS.sleep(TUNNEL_SETUP_RETRY_SLEEP_SEC);
+            } catch (InterruptedException ex) {
+              Thread.currentThread().interrupt();
+            }
+          } else {
+            throw new IllegalArgumentException("Failed to connect to tunnel host '"
+                + tunnelConfig.getTunnelHost() + "'");
+          }
+        }
       }
-
-      // Use the new connection string
-      connectionString = newConnectionString;
-      tunnelProcesses.addAll(tunneledResults.second);
     }
 
     // Start it