[STORM-3725] make DRPCSpout support real async connections (#3364)
diff --git a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
index 4c35c67..c1edacc 100644
--- a/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
@@ -19,14 +19,12 @@
package org.apache.storm.drpc;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.apache.storm.Config;
@@ -54,12 +52,13 @@
public static final Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
//ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
private static final long serialVersionUID = 2387848310969237877L;
+ private static final int clientConstructionRetryIntervalSec = 120;
private final String function;
private final String localDrpcId;
private SpoutOutputCollector collector;
- private List<DRPCInvocationsClient> clients = new ArrayList<>();
- private transient LinkedList<Future<Void>> futures = null;
+ private List<DRPCInvocationsClient> clients = Collections.synchronizedList(new ArrayList<>());
private transient ExecutorService background = null;
+ private transient Map<String, CompletableFuture<Void>> futuresMap = null; // server : future
public DRPCSpout(String function) {
this.function = function;
@@ -81,13 +80,25 @@
}
private void reconnectAsync(final DRPCInvocationsClient client) {
- futures.add(background.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- client.reconnectClient();
- return null;
- }
- }));
+ String remote = client.getHost();
+ CompletableFuture<Void> future = futuresMap.get(remote);
+ if (future.isDone()) {
+ LOG.warn("DRPCInvocationsClient [{}:{}] connection failed, no pending reconnection. Try reconnecting...",
+ client.getHost(), client.getPort());
+ CompletableFuture<Void> newFuture =
+ CompletableFuture.runAsync(() -> {
+ try {
+ client.reconnectClient();
+ LOG.info("Reconnected to remote {}:{}. ",
+ client.getHost(), client.getPort());
+ } catch (Exception e) {
+ collector.reportError(e);
+ LOG.warn("Failed to reconnect to remote {}:{}. ",
+ client.getHost(), client.getPort(), e);
+ }
+ }, background);
+ futuresMap.put(remote, newFuture);
+ }
}
private void reconnectSync(DRPCInvocationsClient client) {
@@ -99,21 +110,6 @@
}
}
- private void checkFutures() {
- Iterator<Future<Void>> i = futures.iterator();
- while (i.hasNext()) {
- Future<Void> f = i.next();
- if (f.isDone()) {
- i.remove();
- }
- try {
- f.get();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
@@ -121,8 +117,7 @@
background = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
- futures = new LinkedList<>();
-
+ futuresMap = new HashMap<>();
int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
int index = context.getThisTaskIndex();
@@ -132,16 +127,28 @@
throw new RuntimeException("No DRPC servers configured for topology");
}
+ List<DRPCClientBuilder> clientBuilders = new ArrayList<>();
if (numTasks < servers.size()) {
for (String s : servers) {
- futures.add(background.submit(new Adder(s, port, conf)));
+ clientBuilders.add(new DRPCClientBuilder(s, port, conf));
}
} else {
int i = index % servers.size();
- futures.add(background.submit(new Adder(servers.get(i), port, conf)));
+ clientBuilders.add(new DRPCClientBuilder(servers.get(i), port, conf));
}
+ establishConnections(clientBuilders);
}
+ }
+ protected void establishConnections(List<DRPCClientBuilder> clientBuilders) {
+ int numOfClients = clientBuilders.size();
+
+ for (int i = 0; i < numOfClients; i++) {
+ DRPCClientBuilder builder = clientBuilders.get(i);
+ String server = builder.getServer();
+ CompletableFuture<Void> future = CompletableFuture.runAsync(builder, background);
+ futuresMap.put(server, future);
+ }
}
@Override
@@ -149,22 +156,18 @@
for (DRPCInvocationsClient client : clients) {
client.close();
}
+ if (background != null) {
+ background.shutdownNow();
+ }
}
@Override
public void nextTuple() {
if (localDrpcId == null) {
- int size = 0;
- synchronized (clients) {
- size = clients.size(); //This will only ever grow, so no need to worry about falling off the end
- }
- for (int i = 0; i < size; i++) {
- DRPCInvocationsClient client;
- synchronized (clients) {
- client = clients.get(i);
- }
+ //This will only ever grow and at least one client has been up
+ for (int i = 0; i < clients.size(); i++) {
+ DRPCInvocationsClient client = clients.get(i);
if (!client.isConnected()) {
- LOG.warn("DRPCInvocationsClient [{}:{}] is not connected.", client.getHost(), client.getPort());
reconnectAsync(client);
continue;
}
@@ -180,16 +183,15 @@
break;
}
} catch (AuthorizationException aze) {
+ LOG.error("Not authorized to fetch DRPC request from DRPC server [{}:{}]",
+ client.getHost(), client.getPort(), aze);
reconnectAsync(client);
- LOG.error("Not authorized to fetch DRPC request from DRPC server", aze);
- } catch (TException e) {
- reconnectAsync(client);
- LOG.error("Failed to fetch DRPC request from DRPC server", e);
} catch (Exception e) {
- LOG.error("Failed to fetch DRPC request from DRPC server", e);
+ LOG.error("Failed to fetch DRPC request from DRPC server [{}:{}]",
+ client.getHost(), client.getPort(), e);
+ reconnectAsync(client);
}
}
- checkFutures();
} else {
DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(localDrpcId);
if (drpc != null) { // can happen during shutdown of drpc while topology is still up
@@ -264,24 +266,47 @@
}
}
- private class Adder implements Callable<Void> {
+ private class DRPCClientBuilder implements Runnable {
private String server;
private int port;
private Map<String, Object> conf;
- Adder(String server, int port, Map<String, Object> conf) {
+ DRPCClientBuilder(String server, int port, Map<String, Object> conf) {
this.server = server;
this.port = port;
this.conf = conf;
}
@Override
- public Void call() throws Exception {
- DRPCInvocationsClient c = new DRPCInvocationsClient(conf, server, port);
- synchronized (clients) {
- clients.add(c);
+ public void run() {
+ DRPCInvocationsClient c = null;
+ while (c == null) {
+ try {
+ // DRPCInvocationsClient has backoff retry logic
+ c = new DRPCInvocationsClient(conf, server, port);
+ } catch (Exception e) {
+ collector.reportError(e);
+ LOG.error("Failed to create DRPCInvocationsClient for remote {}:{}. Retrying after {} secs.",
+ server, port, clientConstructionRetryIntervalSec, e);
+ try {
+ Thread.sleep(clientConstructionRetryIntervalSec * 1000);
+ } catch (InterruptedException ex) {
+ LOG.warn("DRPCInvocationsClient creation retry sleep interrupted.");
+ break;
+ }
+ }
}
- return null;
+ if (c != null) {
+ LOG.info("Successfully created DRPCInvocationsClient for remote {}:{}.", server, port);
+ clients.add(c);
+ } else {
+ LOG.warn("DRPCInvocationsClient creation retry for remote {}:{} interrupted.",
+ server, port);
+ }
+ }
+
+ public String getServer() {
+ return server;
}
}
}