Synchronize cluster updates
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 69ec5f1..f88d9b7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -9,6 +9,8 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.EventMessage;
@@ -111,6 +113,9 @@
@Inject
SerializerDeserializer serDeser;
+ // lock for synchronizing between cluster updates callbacks and other code
+ private final Lock lock;
+
@Inject
public TCPEmitter(Cluster topology, @Named("tcp.partition.queue_size") int bufferSize,
@Named("comm.retries") int retries, @Named("comm.retry_delay") int retryDelay,
@@ -120,6 +125,7 @@
this.nettyTimeout = timeout;
this.bufferCapacity = bufferSize;
this.topology = topology;
+ this.lock = new ReentrantLock();
// Initialize data structures
int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
@@ -161,8 +167,8 @@
@Inject
private void init() {
- this.topology.addListener(this);
refreshCluster();
+ this.topology.addListener(this);
}
private class Message implements ChannelFutureListener {
@@ -346,7 +352,7 @@
if (clusterNode == null) {
logger.error("No ClusterNode exists for partitionId " + partitionId);
- onChange();
+ refreshCluster();
return false;
}
@@ -465,18 +471,23 @@
}
private void refreshCluster() {
- for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
- Integer partition = clusterNode.getPartition();
- if (partition == null) {
- logger.error("onChange(): Illegal partition for clusterNode - " + clusterNode);
- return;
- }
+ lock.lock();
+ try {
+ for (ClusterNode clusterNode : topology.getPhysicalCluster().getNodes()) {
+ Integer partition = clusterNode.getPartition();
+ if (partition == null) {
+ logger.error("onChange(): Illegal partition for clusterNode - " + clusterNode);
+ return;
+ }
- ClusterNode oldNode = partitionNodeMap.remove(partition);
- if (oldNode != null && !oldNode.equals(clusterNode)) {
- removeChannel(partition);
+ ClusterNode oldNode = partitionNodeMap.remove(partition);
+ if (oldNode != null && !oldNode.equals(clusterNode)) {
+ removeChannel(partition);
+ }
+ partitionNodeMap.forcePut(partition, clusterNode);
}
- partitionNodeMap.forcePut(partition, clusterNode);
+ } finally {
+ lock.unlock();
}
}