Fix local communication optimization
- by resolving local partition id upon sender initialization
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
index 10da9a2..87cf7bf 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
@@ -5,6 +5,10 @@
import org.apache.s4.base.EventMessage;
import org.apache.s4.base.Hasher;
import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
@@ -17,13 +21,14 @@
* from the application developer.
*/
public class Sender {
+
+ private static Logger logger = LoggerFactory.getLogger(Sender.class);
+
final private Emitter emitter;
final private SerializerDeserializer serDeser;
final private Hasher hasher;
- /*
- * If the local partition id is not initialized, always use the comm layer to send events.
- */
+ Assignment assignment;
private int localPartitionId = -1;
/**
@@ -36,33 +41,42 @@
* a hashing function to map keys to partition IDs.
*/
@Inject
- public Sender(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher) {
+ public Sender(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment) {
this.emitter = emitter;
this.serDeser = serDeser;
this.hasher = hasher;
+ this.assignment = assignment;
+ }
+
+ @Inject
+ private void resolveLocalPartitionId() {
+ ClusterNode node = assignment.assignClusterNode();
+ if (node != null) {
+ localPartitionId = node.getPartition();
+ }
}
/**
* This method attempts to send an event to a remote partition. If the destination is local, the method does not
- * send the event and returns true. The caller is expected to put the event in a local queue instead.
+ * send the event and returns false. <b>The caller is then expected to put the event in a local queue instead.</b>
*
* @param hashKey
* the string used to map the value of a key to a specific partition.
* @param event
* the event to be delivered to a {@link ProcessingElement} instance.
- * @return true if the event is not sent because the destination is local.
+ * @return true if the event was sent because the destination is <b>not</b> local.
*
*/
- public boolean sendAndCheckIfLocal(String hashKey, Event event) {
+ public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
if (partition == localPartitionId) {
/* Hey we are in the same JVM, don't use the network. */
- return true;
+ return false;
}
send(partition,
new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), serDeser.serialize(event)));
- return false;
+ return true;
}
private void send(int partition, EventMessage event) {
@@ -89,7 +103,4 @@
}
}
- void setPartition(int partitionId) {
- localPartitionId = partitionId;
- }
}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index e01191b..a7522bd 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -165,7 +165,7 @@
* We send to a specific PE instance using the key but we don't know if the target partition is remote
* or local. We need to ask the sender.
*/
- if (sender.sendAndCheckIfLocal(key.get((T) event), event)) {
+ if (!sender.checkAndSendIfNotLocal(key.get((T) event), event)) {
/*
* Sender checked and decided that the target is local so we simply put the event in the queue and
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
index 8639e9a..c7b9923 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
@@ -4,6 +4,8 @@
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.ClusterNode;
import org.apache.s4.comm.topology.Clusters;
import org.apache.s4.comm.topology.RemoteStreams;
import org.apache.s4.core.RemoteSenders;
@@ -34,6 +36,9 @@
bind(RemoteEmitters.class).toInstance(Mockito.mock(RemoteEmitters.class));
bind(RemoteEmitterFactory.class).toInstance(Mockito.mock(RemoteEmitterFactory.class));
bind(Clusters.class).toInstance(Mockito.mock(Clusters.class));
+ Assignment mockedAssignment = Mockito.mock(Assignment.class);
+ Mockito.when(mockedAssignment.assignClusterNode()).thenReturn(new ClusterNode(0, 0, "machine", "Task-0"));
+ bind(Assignment.class).toInstance(mockedAssignment);
Names.bindProperties(binder(), ImmutableMap.of("cluster.name", "testCluster"));
}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
index 70f88cd..b131c3f 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
@@ -2,7 +2,6 @@
import org.apache.s4.base.Emitter;
import org.apache.s4.base.Listener;
-import org.apache.s4.core.Receiver;
import org.apache.s4.deploy.DeploymentManager;
import org.apache.s4.deploy.NoOpDeploymentManager;
import org.mockito.Mockito;