[STORM-3725] make DRPCSpout support real async connections (#3364)

diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
index 4c35c67..c1edacc 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
@@ -19,14 +19,12 @@
 package org.apache.storm.drpc;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 import org.apache.storm.Config;
@@ -54,12 +52,13 @@
     public static final Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
     //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
     private static final long serialVersionUID = 2387848310969237877L;
+    private static final int clientConstructionRetryIntervalSec = 120;
     private final String function;
     private final String localDrpcId;
     private SpoutOutputCollector collector;
-    private List<DRPCInvocationsClient> clients = new ArrayList<>();
-    private transient LinkedList<Future<Void>> futures = null;
+    private List<DRPCInvocationsClient> clients = Collections.synchronizedList(new ArrayList<>());
     private transient ExecutorService background = null;
+    private transient Map<String, CompletableFuture<Void>> futuresMap = null;  // server : future
 
     public DRPCSpout(String function) {
         this.function = function;
@@ -81,13 +80,25 @@
     }
 
     private void reconnectAsync(final DRPCInvocationsClient client) {
-        futures.add(background.submit(new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                client.reconnectClient();
-                return null;
-            }
-        }));
+        String remote = client.getHost();
+        CompletableFuture<Void> future = futuresMap.get(remote);
+        if (future.isDone()) {
+            LOG.warn("DRPCInvocationsClient [{}:{}] connection failed, no pending reconnection. Try reconnecting...",
+                client.getHost(), client.getPort());
+            CompletableFuture<Void> newFuture =
+                CompletableFuture.runAsync(() -> {
+                    try {
+                        client.reconnectClient();
+                        LOG.info("Reconnected to remote {}:{}. ",
+                            client.getHost(), client.getPort());
+                    } catch (Exception e) {
+                        collector.reportError(e);
+                        LOG.warn("Failed to reconnect to remote {}:{}. ",
+                            client.getHost(), client.getPort(), e);
+                    }
+                }, background);
+            futuresMap.put(remote, newFuture);
+        }
     }
 
     private void reconnectSync(DRPCInvocationsClient client) {
@@ -99,21 +110,6 @@
         }
     }
 
-    private void checkFutures() {
-        Iterator<Future<Void>> i = futures.iterator();
-        while (i.hasNext()) {
-            Future<Void> f = i.next();
-            if (f.isDone()) {
-                i.remove();
-            }
-            try {
-                f.get();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
     @Override
     public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
         this.collector = collector;
@@ -121,8 +117,7 @@
             background = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
                                                         60L, TimeUnit.SECONDS,
                                                         new SynchronousQueue<Runnable>());
-            futures = new LinkedList<>();
-
+            futuresMap = new HashMap<>();
             int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
             int index = context.getThisTaskIndex();
 
@@ -132,16 +127,28 @@
                 throw new RuntimeException("No DRPC servers configured for topology");
             }
 
+            List<DRPCClientBuilder> clientBuilders = new ArrayList<>();
             if (numTasks < servers.size()) {
                 for (String s : servers) {
-                    futures.add(background.submit(new Adder(s, port, conf)));
+                    clientBuilders.add(new DRPCClientBuilder(s, port, conf));
                 }
             } else {
                 int i = index % servers.size();
-                futures.add(background.submit(new Adder(servers.get(i), port, conf)));
+                clientBuilders.add(new DRPCClientBuilder(servers.get(i), port, conf));
             }
+            establishConnections(clientBuilders);
         }
+    }
 
+    protected void establishConnections(List<DRPCClientBuilder> clientBuilders) {
+        int numOfClients = clientBuilders.size();
+
+        for (int i = 0; i < numOfClients; i++) {
+            DRPCClientBuilder builder = clientBuilders.get(i);
+            String server = builder.getServer();
+            CompletableFuture<Void> future = CompletableFuture.runAsync(builder, background);
+            futuresMap.put(server, future);
+        }
     }
 
     @Override
@@ -149,22 +156,18 @@
         for (DRPCInvocationsClient client : clients) {
             client.close();
         }
+        if (background != null) {
+            background.shutdownNow();
+        }
     }
 
     @Override
     public void nextTuple() {
         if (localDrpcId == null) {
-            int size = 0;
-            synchronized (clients) {
-                size = clients.size(); //This will only ever grow, so no need to worry about falling off the end
-            }
-            for (int i = 0; i < size; i++) {
-                DRPCInvocationsClient client;
-                synchronized (clients) {
-                    client = clients.get(i);
-                }
+            //This will only ever grow and at least one client has been up
+            for (int i = 0; i < clients.size(); i++) {
+                DRPCInvocationsClient client = clients.get(i);
                 if (!client.isConnected()) {
-                    LOG.warn("DRPCInvocationsClient [{}:{}] is not connected.", client.getHost(), client.getPort());
                     reconnectAsync(client);
                     continue;
                 }
@@ -180,16 +183,15 @@
                         break;
                     }
                 } catch (AuthorizationException aze) {
+                    LOG.error("Not authorized to fetch DRPC request from DRPC server [{}:{}]",
+                        client.getHost(), client.getPort(), aze);
                     reconnectAsync(client);
-                    LOG.error("Not authorized to fetch DRPC request from DRPC server", aze);
-                } catch (TException e) {
-                    reconnectAsync(client);
-                    LOG.error("Failed to fetch DRPC request from DRPC server", e);
                 } catch (Exception e) {
-                    LOG.error("Failed to fetch DRPC request from DRPC server", e);
+                    LOG.error("Failed to fetch DRPC request from DRPC server [{}:{}]",
+                        client.getHost(), client.getPort(), e);
+                    reconnectAsync(client);
                 }
             }
-            checkFutures();
         } else {
             DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(localDrpcId);
             if (drpc != null) { // can happen during shutdown of drpc while topology is still up
@@ -264,24 +266,47 @@
         }
     }
 
-    private class Adder implements Callable<Void> {
+    private class DRPCClientBuilder implements Runnable {
         private String server;
         private int port;
         private Map<String, Object> conf;
 
-        Adder(String server, int port, Map<String, Object> conf) {
+        DRPCClientBuilder(String server, int port, Map<String, Object> conf) {
             this.server = server;
             this.port = port;
             this.conf = conf;
         }
 
         @Override
-        public Void call() throws Exception {
-            DRPCInvocationsClient c = new DRPCInvocationsClient(conf, server, port);
-            synchronized (clients) {
-                clients.add(c);
+        public void run() {
+            DRPCInvocationsClient c = null;
+            while (c == null) {
+                try {
+                    // DRPCInvocationsClient has backoff retry logic
+                    c = new DRPCInvocationsClient(conf, server, port);
+                } catch (Exception e) {
+                    collector.reportError(e);
+                    LOG.error("Failed to create DRPCInvocationsClient for remote {}:{}. Retrying after {} secs.",
+                        server, port, clientConstructionRetryIntervalSec, e);
+                    try {
+                        Thread.sleep(clientConstructionRetryIntervalSec * 1000);
+                    } catch (InterruptedException ex) {
+                        LOG.warn("DRPCInvocationsClient creation retry sleep interrupted.");
+                        break;
+                    }
+                }
             }
-            return null;
+            if (c != null) {
+                LOG.info("Successfully created DRPCInvocationsClient for remote {}:{}.", server, port);
+                clients.add(c);
+            } else {
+                LOG.warn("DRPCInvocationsClient creation retry for remote {}:{} interrupted.",
+                    server, port);
+            }
+        }
+
+        public String getServer() {
+            return server;
         }
     }
 }