APEXCORE-810 Fixing race condition between publisher and subscriber teardowns
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index 3e8846d..af5db09 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -115,12 +115,7 @@
    */
   public void removeChannel(WriteOnlyClient client)
   {
-    for (PhysicalNode pn : physicalNodes) {
-      if (pn.getClient() == client) {
-        physicalNodes.remove(pn);
-        break;
-      }
-    }
+    physicalNodes.removeIf(node -> (node.getClient().equals(client)));
   }
 
   /**
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index c5700f2..6332a18 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -24,9 +24,6 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map.Entry;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -117,9 +114,11 @@
   @Override
   public void unregistered(SelectionKey key)
   {
+    logger.debug("Unregistered {}", this);
     for (LogicalNode ln : subscriberGroups.values()) {
       ln.boot();
     }
+    super.unregistered(key);
     /*
      * There may be un-register tasks scheduled to run on the event loop that use serverHelperExecutor.
      */
@@ -860,41 +859,32 @@
       }
       torndown = true;
 
-      /*
-       * if the publisher unregistered, all the downstream guys are going to be unregistered anyways
-       * in our world. So it makes sense to kick them out proactively. Otherwise these clients since
-       * are not being written to, just stick around till the next publisher shows up and eat into
-       * the data it's publishing for the new subscribers.
-       */
+      serverHelperExecutor.submit(() ->
+      {
+        /*
+         * if the publisher unregistered, all the downstream guys are going to be unregistered anyways
+         * in our world. So it makes sense to kick them out proactively. Otherwise these clients since
+         * are not being written to, just stick around till the next publisher shows up and eat into
+         * the data it's publishing for the new subscribers.
+         */
 
-      /**
-       * since the publisher server died, the queue which it was using would stop pumping the data unless
-       * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node
-       * with the same identifier as the one which just died.
-       */
-      if (publisherChannels.containsValue(this)) {
-        final Iterator<Entry<String, AbstractLengthPrependerClient>> i = publisherChannels.entrySet().iterator();
-        while (i.hasNext()) {
-          if (i.next().getValue() == this) {
-            i.remove();
-            break;
+        /**
+         * since the publisher server died, the queue which it was using would stop pumping the data unless
+         * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node
+         * with the same identifier as the one which just died.
+         */
+        String publisherIdentifier = datalist.getIdentifier();
+        if (!publisherChannels.remove(publisherIdentifier, Publisher.this)) {
+          logger.warn("{} could not be removed from channels", Publisher.this);
+        }
+
+        subscriberGroups.forEach((type, ln) ->  {
+          if (publisherIdentifier.equals(ln.getUpstream())) {
+            logger.debug("Booting logical node {} from publisher", ln);
+            ln.boot();
           }
-        }
-      }
-
-      ArrayList<LogicalNode> list = new ArrayList<>();
-      String publisherIdentifier = datalist.getIdentifier();
-      Iterator<LogicalNode> iterator = subscriberGroups.values().iterator();
-      while (iterator.hasNext()) {
-        LogicalNode ln = iterator.next();
-        if (publisherIdentifier.equals(ln.getUpstream())) {
-          list.add(ln);
-        }
-      }
-
-      for (LogicalNode ln : list) {
-        ln.boot();
-      }
+        });
+      });
     }
 
   }