APEXCORE-810 Fixing race condition between publisher and subscriber teardowns
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index 3e8846d..af5db09 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -115,12 +115,7 @@
*/
public void removeChannel(WriteOnlyClient client)
{
- for (PhysicalNode pn : physicalNodes) {
- if (pn.getClient() == client) {
- physicalNodes.remove(pn);
- break;
- }
- }
+ physicalNodes.removeIf(node -> (node.getClient().equals(client)));
}
/**
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index c5700f2..6332a18 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -24,9 +24,6 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -117,9 +114,11 @@
@Override
public void unregistered(SelectionKey key)
{
+ logger.debug("Unregistered {}", this);
for (LogicalNode ln : subscriberGroups.values()) {
ln.boot();
}
+ super.unregistered(key);
/*
* There may be un-register tasks scheduled to run on the event loop that use serverHelperExecutor.
*/
@@ -860,41 +859,32 @@
}
torndown = true;
- /*
- * if the publisher unregistered, all the downstream guys are going to be unregistered anyways
- * in our world. So it makes sense to kick them out proactively. Otherwise these clients since
- * are not being written to, just stick around till the next publisher shows up and eat into
- * the data it's publishing for the new subscribers.
- */
+ serverHelperExecutor.submit(() ->
+ {
+ /*
+ * if the publisher unregistered, all the downstream guys are going to be unregistered anyways
+ * in our world. So it makes sense to kick them out proactively. Otherwise these clients since
+ * are not being written to, just stick around till the next publisher shows up and eat into
+ * the data it's publishing for the new subscribers.
+ */
- /**
- * since the publisher server died, the queue which it was using would stop pumping the data unless
- * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node
- * with the same identifier as the one which just died.
- */
- if (publisherChannels.containsValue(this)) {
- final Iterator<Entry<String, AbstractLengthPrependerClient>> i = publisherChannels.entrySet().iterator();
- while (i.hasNext()) {
- if (i.next().getValue() == this) {
- i.remove();
- break;
+ /**
+ * since the publisher server died, the queue which it was using would stop pumping the data unless
+ * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node
+ * with the same identifier as the one which just died.
+ */
+ String publisherIdentifier = datalist.getIdentifier();
+ if (!publisherChannels.remove(publisherIdentifier, Publisher.this)) {
+ logger.warn("{} could not be removed from channels", Publisher.this);
+ }
+
+ subscriberGroups.forEach((type, ln) -> {
+ if (publisherIdentifier.equals(ln.getUpstream())) {
+ logger.debug("Booting logical node {} from publisher", ln);
+ ln.boot();
}
- }
- }
-
- ArrayList<LogicalNode> list = new ArrayList<>();
- String publisherIdentifier = datalist.getIdentifier();
- Iterator<LogicalNode> iterator = subscriberGroups.values().iterator();
- while (iterator.hasNext()) {
- LogicalNode ln = iterator.next();
- if (publisherIdentifier.equals(ln.getUpstream())) {
- list.add(ln);
- }
- }
-
- for (LogicalNode ln : list) {
- ln.boot();
- }
+ });
+ });
}
}