Fix local communication optimization
- by resolving local partition id upon sender initialization
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
index 10da9a2..87cf7bf 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
@@ -5,6 +5,10 @@
 import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 
@@ -17,13 +21,14 @@
  * from the application developer.
  */
 public class Sender {
+
+    private static Logger logger = LoggerFactory.getLogger(Sender.class);
+
     final private Emitter emitter;
     final private SerializerDeserializer serDeser;
     final private Hasher hasher;
 
-    /*
-     * If the local partition id is not initialized, always use the comm layer to send events.
-     */
+    Assignment assignment;
     private int localPartitionId = -1;
 
     /**
@@ -36,33 +41,42 @@
      *            a hashing function to map keys to partition IDs.
      */
     @Inject
-    public Sender(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher) {
+    public Sender(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment) {
         this.emitter = emitter;
         this.serDeser = serDeser;
         this.hasher = hasher;
+        this.assignment = assignment;
+    }
+
+    @Inject
+    private void resolveLocalPartitionId() {
+        ClusterNode node = assignment.assignClusterNode();
+        if (node != null) {
+            localPartitionId = node.getPartition();
+        }
     }
 
     /**
      * This method attempts to send an event to a remote partition. If the destination is local, the method does not
-     * send the event and returns true. The caller is expected to put the event in a local queue instead.
+     * send the event and returns false. <b>The caller is then expected to put the event in a local queue instead.</b>
      * 
      * @param hashKey
      *            the string used to map the value of a key to a specific partition.
      * @param event
      *            the event to be delivered to a {@link ProcessingElement} instance.
-     * @return true if the event is not sent because the destination is local.
+     * @return true if the event was sent because the destination is <b>not</b> local.
      * 
      */
-    public boolean sendAndCheckIfLocal(String hashKey, Event event) {
+    public boolean checkAndSendIfNotLocal(String hashKey, Event event) {
         int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount());
 
         if (partition == localPartitionId) {
             /* Hey we are in the same JVM, don't use the network. */
-            return true;
+            return false;
         }
         send(partition,
                 new EventMessage(String.valueOf(event.getAppId()), event.getStreamName(), serDeser.serialize(event)));
-        return false;
+        return true;
     }
 
     private void send(int partition, EventMessage event) {
@@ -89,7 +103,4 @@
         }
     }
 
-    void setPartition(int partitionId) {
-        localPartitionId = partitionId;
-    }
 }
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index e01191b..a7522bd 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -165,7 +165,7 @@
                  * We send to a specific PE instance using the key but we don't know if the target partition is remote
                  * or local. We need to ask the sender.
                  */
-                if (sender.sendAndCheckIfLocal(key.get((T) event), event)) {
+                if (!sender.checkAndSendIfNotLocal(key.get((T) event), event)) {
 
                     /*
                      * Sender checked and decided that the target is local so we simply put the event in the queue and
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
index 8639e9a..c7b9923 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/comm/BareCommModule.java
@@ -4,6 +4,8 @@
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.ClusterNode;
 import org.apache.s4.comm.topology.Clusters;
 import org.apache.s4.comm.topology.RemoteStreams;
 import org.apache.s4.core.RemoteSenders;
@@ -34,6 +36,9 @@
         bind(RemoteEmitters.class).toInstance(Mockito.mock(RemoteEmitters.class));
         bind(RemoteEmitterFactory.class).toInstance(Mockito.mock(RemoteEmitterFactory.class));
         bind(Clusters.class).toInstance(Mockito.mock(Clusters.class));
+        Assignment mockedAssignment = Mockito.mock(Assignment.class);
+        Mockito.when(mockedAssignment.assignClusterNode()).thenReturn(new ClusterNode(0, 0, "machine", "Task-0"));
+        bind(Assignment.class).toInstance(mockedAssignment);
         Names.bindProperties(binder(), ImmutableMap.of("cluster.name", "testCluster"));
     }
 
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
index 70f88cd..b131c3f 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/BareCoreModule.java
@@ -2,7 +2,6 @@
 
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Listener;
-import org.apache.s4.core.Receiver;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.NoOpDeploymentManager;
 import org.mockito.Mockito;