Synchronize cluster updates
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 69ec5f1..f88d9b7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -9,6 +9,8 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.EventMessage;
@@ -111,6 +113,9 @@
     @Inject
     SerializerDeserializer serDeser;
 
+    // lock for synchronizing between cluster updates callbacks and other code
+    private final Lock lock;
+
     @Inject
     public TCPEmitter(Cluster topology, @Named("tcp.partition.queue_size") int bufferSize,
             @Named("comm.retries") int retries, @Named("comm.retry_delay") int retryDelay,
@@ -120,6 +125,7 @@
         this.nettyTimeout = timeout;
         this.bufferCapacity = bufferSize;
         this.topology = topology;
+        this.lock = new ReentrantLock();
 
         // Initialize data structures
         int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
@@ -161,8 +167,8 @@
 
     @Inject
     private void init() {
-        this.topology.addListener(this);
         refreshCluster();
+        this.topology.addListener(this);
     }
 
     private class Message implements ChannelFutureListener {
@@ -346,7 +352,7 @@
         if (clusterNode == null) {
 
             logger.error("No ClusterNode exists for partitionId " + partitionId);
-            onChange();
+            refreshCluster();
             return false;
         }
 
@@ -465,18 +471,23 @@
     }
 
     private void refreshCluster() {
-        for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
-            Integer partition = clusterNode.getPartition();
-            if (partition == null) {
-                logger.error("onChange(): Illegal partition for clusterNode - " + clusterNode);
-                return;
-            }
+        lock.lock();
+        try {
+            for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
+                Integer partition = clusterNode.getPartition();
+                if (partition == null) {
+                    logger.error("onChange(): Illegal partition for clusterNode - " + clusterNode);
+                    return;
+                }
 
-            ClusterNode oldNode = partitionNodeMap.remove(partition);
-            if (oldNode != null && !oldNode.equals(clusterNode)) {
-                removeChannel(partition);
+                ClusterNode oldNode = partitionNodeMap.remove(partition);
+                if (oldNode != null && !oldNode.equals(clusterNode)) {
+                    removeChannel(partition);
+                }
+                partitionNodeMap.forcePut(partition, clusterNode);
             }
-            partitionNodeMap.forcePut(partition, clusterNode);
+        } finally {
+            lock.unlock();
         }
     }