Add getters for partition id and partition count in App class
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index d7c8cee..59409a7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -47,7 +47,8 @@
 import com.google.inject.name.Names;
 
 /**
- * Default configuration module for the communication layer. Parameterizable through a configuration file.
+ * Default configuration module for the communication layer. Parameterizable / overridable through custom modules and /
+ * or configuration file (for string literals).
  * 
  */
 public class DefaultCommModule extends AbstractModule {
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 936d225..673d0bb 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -28,6 +28,7 @@
 import org.apache.s4.base.KeyFinder;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Cluster;
 import org.apache.s4.comm.topology.RemoteStreams;
 import org.apache.s4.core.ft.CheckpointingFramework;
 import org.apache.s4.core.window.AbstractSlidingWindowPE;
@@ -41,7 +42,7 @@
 
 /**
  * Container base class to hold all processing elements.
- *
+ * 
  * It is also where one defines the application graph: PE prototypes, internal streams, input and output streams.
  */
 public abstract class App {
@@ -57,7 +58,7 @@
     final private List<Streamable<Event>> streams = new ArrayList<Streamable<Event>>();
 
     /* Pes indexed by name. */
-    Map<String, ProcessingElement> peByName = Maps.newHashMap();
+    final Map<String, ProcessingElement> peByName = Maps.newHashMap();
 
     private ClockType clockType = ClockType.WALL_CLOCK;
     private int id = -1;
@@ -68,17 +69,20 @@
     private Receiver receiver;
 
     @Inject
-    RemoteSenders remoteSenders;
+    private Cluster cluster;
 
     @Inject
-    Hasher hasher;
+    private RemoteSenders remoteSenders;
 
     @Inject
-    RemoteStreams remoteStreams;
+    private Hasher hasher;
+
+    @Inject
+    private RemoteStreams remoteStreams;
 
     @Inject
     @Named("s4.cluster.name")
-    String clusterName;
+    private String clusterName;
 
     // default is NoOpCheckpointingFramework
     @Inject
@@ -256,6 +260,25 @@
     }
 
     /**
+     * 
+     * Returns the id of the partition assigned to the current node.
+     * 
+     * NOTE: This method will block until the current node gets assigned a partition
+     * 
+     */
+    public int getPartitionId() {
+        return getReceiver().getPartitionId();
+    }
+
+    /**
+     * 
+     * Returns the total number of partitions of the cluster this nodes belongs to.
+     */
+    public int getPartitionCount() {
+        return cluster.getPhysicalCluster().getPartitionCount();
+    }
+
+    /**
      * @return the sender object
      */
     public Sender getSender() {
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 5701640..0c42f48 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -41,8 +41,7 @@
 import com.google.inject.name.Names;
 
 /**
- * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
- * until we have a better way to customize node configuration
+ * Default module allowing assignment from ZK, communication through Netty, and distributed deployment management
  * 
  */
 public class DefaultCoreModule extends AbstractModule {
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
index 6c0b19c..68840c0 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
@@ -67,7 +67,7 @@
         streams = new MapMaker().makeMap();
     }
 
-    int getPartition() {
+    int getPartitionId() {
         return listener.getPartitionId();
     }
 
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
index 5b0b03d..b5dcf6e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
@@ -24,6 +24,8 @@
 import org.apache.s4.base.Hasher;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterChangeListener;
 import org.apache.s4.comm.topology.ClusterNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,16 +40,17 @@
  * Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
  * from the application developer.
  */
-public class Sender {
+public class Sender implements ClusterChangeListener {
 
     private static Logger logger = LoggerFactory.getLogger(Sender.class);
 
     final private Emitter emitter;
     final private SerializerDeserializer serDeser;
     final private Hasher hasher;
+    final private Cluster cluster;
 
-    Assignment assignment;
-    private int localPartitionId = -1;
+    final private Assignment assignment;
+    private volatile int localPartitionId = -1;
 
     /**
      * 
@@ -59,11 +62,14 @@
      *            a hashing function to map keys to partition IDs.
      */
     @Inject
-    public Sender(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment) {
+    public Sender(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment,
+            Cluster cluster) {
         this.emitter = emitter;
         this.serDeser = serDeser;
         this.hasher = hasher;
         this.assignment = assignment;
+        this.cluster = cluster;
+        this.cluster.addListener(this);
     }
 
     @Inject
@@ -121,4 +127,9 @@
         }
     }
 
+    @Override
+    public void onChange() {
+        resolveLocalPartitionId();
+    }
+
 }
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index 71bae7a..6a60ec5 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@ -20,6 +20,8 @@
 
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Listener;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.PhysicalCluster;
 import org.apache.s4.core.Receiver;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.NoOpDeploymentManager;
@@ -47,5 +49,8 @@
         bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
         bind(Listener.class).toInstance(Mockito.mock(Listener.class));
         bind(Receiver.class).toInstance(Mockito.mock(Receiver.class));
+        Cluster clusterMock = Mockito.mock(Cluster.class);
+        Mockito.when(clusterMock.getPhysicalCluster()).thenReturn(new PhysicalCluster(1));
+        bind(Cluster.class).toInstance(clusterMock);
     }
 }
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java
index c9d8635..a124db3 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java
@@ -23,39 +23,45 @@
 import org.apache.s4.core.Stream;
 
 public class WordCounterPE extends ProcessingElement {
-    
+
     int wordCounter;
     transient Stream<WordCountEvent> wordClassifierStream;
 
-    private WordCounterPE() {}
-    
+    private WordCounterPE() {
+    }
+
     public WordCounterPE(App app) {
         super(app);
     }
-    
+
     public void setWordClassifierStream(Stream<WordCountEvent> stream) {
         this.wordClassifierStream = stream;
     }
 
-    public void onEvent(WordSeenEvent event) { 
+    public void onEvent(WordSeenEvent event) {
+
         wordCounter++;
         System.out.println("seen word " + event.getWord());
-        // NOTE: it seems the id is the key for now...     
+        // NOTE: it seems the id is the key for now...
         wordClassifierStream.put(new WordCountEvent(getId(), wordCounter));
+
+        // add some tests for partition count and id
+        if (!((getApp().getPartitionCount() == 1) && (getApp().getPartitionId() == 0))) {
+            throw new RuntimeException("Invalid partitioning");
+        }
+
     }
 
     @Override
     protected void onCreate() {
         // TODO Auto-generated method stub
-        
+
     }
 
     @Override
     protected void onRemove() {
         // TODO Auto-generated method stub
-        
-    }
 
-   
+    }
 
 }