Retry tunnel setup when it fails (#3299)
* Retry tunnel setup when it fails
* add delay between retries
* fix bug
diff --git a/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java b/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
index fe097e5..78129cc 100644
--- a/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
+++ b/heron/statemgrs/src/java/org/apache/heron/statemgr/zookeeper/curator/CuratorStateManager.java
@@ -63,6 +63,8 @@
public class CuratorStateManager extends FileSystemStateManager {
private static final Logger LOG = Logger.getLogger(CuratorStateManager.class.getName());
+ private static final int TUNNEL_SETUP_RETRY = 0; // 0 means no retry
+ private static final int TUNNEL_SETUP_RETRY_SLEEP_SEC = 5;
private CuratorFramework client;
private String connectionString;
@@ -83,17 +85,29 @@
NetworkUtils.TunnelConfig.build(config, NetworkUtils.HeronSystem.STATE_MANAGER);
if (tunnelConfig.isTunnelNeeded()) {
- Pair<String, List<Process>> tunneledResults = setupZkTunnel(tunnelConfig);
+ for (int setupCount = 0;; ++setupCount) {
+ Pair<String, List<Process>> tunneledResults = setupZkTunnel(tunnelConfig);
+ String newConnectionString = tunneledResults.first;
- String newConnectionString = tunneledResults.first;
- if (newConnectionString.isEmpty()) {
- throw new IllegalArgumentException("Failed to connect to tunnel host '"
- + tunnelConfig.getTunnelHost() + "'");
+ // If tunnel can't be setup correctly. Retry or bail.
+ if (!newConnectionString.isEmpty()) {
+ // Success, use the new connection string
+ connectionString = newConnectionString;
+ tunnelProcesses.addAll(tunneledResults.second);
+ break;
+ } else {
+ if (setupCount < TUNNEL_SETUP_RETRY) {
+ try {
+ TimeUnit.SECONDS.sleep(TUNNEL_SETUP_RETRY_SLEEP_SEC);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ throw new IllegalArgumentException("Failed to connect to tunnel host '"
+ + tunnelConfig.getTunnelHost() + "'");
+ }
+ }
}
-
- // Use the new connection string
- connectionString = newConnectionString;
- tunnelProcesses.addAll(tunneledResults.second);
}
// Start it