GOSSIP-77 better send()
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index e3dcb21..f53419d 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -45,9 +45,18 @@
public class GossipCore implements GossipCoreConstants {
+ class LatchAndBase {
+ private final CountDownLatch latch;
+ private volatile Base base;
+
+ LatchAndBase(){
+ latch = new CountDownLatch(1);
+ }
+
+ }
public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
private final GossipManager gossipManager;
- private ConcurrentHashMap<String, Base> requests;
+ private ConcurrentHashMap<String, LatchAndBase> requests;
private ThreadPoolExecutor service;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
@@ -224,46 +233,30 @@
}
final Trackable t;
+ LatchAndBase latchAndBase = null;
if (message instanceof Trackable){
t = (Trackable) message;
+ latchAndBase = new LatchAndBase();
+ requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase);
} else {
t = null;
}
sendInternal(message, uri);
- if (t == null){
+ if (latchAndBase == null){
return null;
- }
- final Future<Response> response = service.submit( new Callable<Response>(){
- @Override
- public Response call() throws Exception {
- while(true){
- Base b = requests.remove(t.getUuid() + "/" + t.getUriFrom());
- if (b != null){
- return (Response) b;
- }
- try {
- Thread.sleep(0, 555555);
- } catch (InterruptedException e) {
-
- }
- }
- }
- });
-
+ }
+
try {
- //TODO this needs to be a setting base on attempts/second
- return response.get(1, TimeUnit.SECONDS);
+ boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS);
+ if (complete){
+ return (Response) latchAndBase.base;
+ } else{
+ return null;
+ }
} catch (InterruptedException e) {
throw new RuntimeException(e);
- } catch (ExecutionException e) {
- LOGGER.debug(e.getMessage(), e);
- return null;
- } catch (TimeoutException e) {
- boolean cancelled = response.cancel(true);
- LOGGER.debug(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled));
- return null;
} finally {
- if (t != null){
+ if (latchAndBase != null){
requests.remove(t.getUuid() + "/" + t.getUriFrom());
}
}
@@ -302,8 +295,10 @@
}
}
- public void addRequest(String k, Base v) {
- requests.put(k, v);
+ public void handleResponse(String k, Base v) {
+ LatchAndBase latch = requests.get(k);
+ latch.base = v;
+ latch.latch.countDown();
}
/**
diff --git a/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java b/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
index 36102d5..2f33b01 100644
--- a/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
+++ b/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java
@@ -27,7 +27,7 @@
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
if (base instanceof Trackable) {
Trackable t = (Trackable) base;
- gossipCore.addRequest(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
+ gossipCore.handleResponse(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
}
}
}