Add getters for partition id and partition count in App class
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
index d7c8cee..59409a7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
@@ -47,7 +47,8 @@
import com.google.inject.name.Names;
/**
- * Default configuration module for the communication layer. Parameterizable through a configuration file.
+ * Default configuration module for the communication layer. Parameterizable / overridable through custom modules and /
+ * or configuration file (for string literals).
*
*/
public class DefaultCommModule extends AbstractModule {
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 936d225..673d0bb 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -28,6 +28,7 @@
import org.apache.s4.base.KeyFinder;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.topology.RemoteStreams;
import org.apache.s4.core.ft.CheckpointingFramework;
import org.apache.s4.core.window.AbstractSlidingWindowPE;
@@ -41,7 +42,7 @@
/**
* Container base class to hold all processing elements.
- *
+ *
* It is also where one defines the application graph: PE prototypes, internal streams, input and output streams.
*/
public abstract class App {
@@ -57,7 +58,7 @@
final private List<Streamable<Event>> streams = new ArrayList<Streamable<Event>>();
/* Pes indexed by name. */
- Map<String, ProcessingElement> peByName = Maps.newHashMap();
+ final Map<String, ProcessingElement> peByName = Maps.newHashMap();
private ClockType clockType = ClockType.WALL_CLOCK;
private int id = -1;
@@ -68,17 +69,20 @@
private Receiver receiver;
@Inject
- RemoteSenders remoteSenders;
+ private Cluster cluster;
@Inject
- Hasher hasher;
+ private RemoteSenders remoteSenders;
@Inject
- RemoteStreams remoteStreams;
+ private Hasher hasher;
+
+ @Inject
+ private RemoteStreams remoteStreams;
@Inject
@Named("s4.cluster.name")
- String clusterName;
+ private String clusterName;
// default is NoOpCheckpointingFramework
@Inject
@@ -256,6 +260,25 @@
}
/**
+ *
+ * Returns the id of the partition assigned to the current node.
+ *
+ * NOTE: This method will block until the current node gets assigned a partition
+ *
+ */
+ public int getPartitionId() {
+ return getReceiver().getPartitionId();
+ }
+
+ /**
+ *
+ * Returns the total number of partitions of the cluster this nodes belongs to.
+ */
+ public int getPartitionCount() {
+ return cluster.getPhysicalCluster().getPartitionCount();
+ }
+
+ /**
* @return the sender object
*/
public Sender getSender() {
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 5701640..0c42f48 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -41,8 +41,7 @@
import com.google.inject.name.Names;
/**
- * Temporary module allowing assignment from ZK, communication through Netty, and distributed deployment management,
- * until we have a better way to customize node configuration
+ * Default module allowing assignment from ZK, communication through Netty, and distributed deployment management
*
*/
public class DefaultCoreModule extends AbstractModule {
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
index 6c0b19c..68840c0 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java
@@ -67,7 +67,7 @@
streams = new MapMaker().makeMap();
}
- int getPartition() {
+ int getPartitionId() {
return listener.getPartitionId();
}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
index 5b0b03d..b5dcf6e 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java
@@ -24,6 +24,8 @@
import org.apache.s4.base.Hasher;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.ClusterChangeListener;
import org.apache.s4.comm.topology.ClusterNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,16 +40,17 @@
* Details on how the cluster is partitioned and how events are serialized and transmitted to its destination are hidden
* from the application developer.
*/
-public class Sender {
+public class Sender implements ClusterChangeListener {
private static Logger logger = LoggerFactory.getLogger(Sender.class);
final private Emitter emitter;
final private SerializerDeserializer serDeser;
final private Hasher hasher;
+ final private Cluster cluster;
- Assignment assignment;
- private int localPartitionId = -1;
+ final private Assignment assignment;
+ private volatile int localPartitionId = -1;
/**
*
@@ -59,11 +62,14 @@
* a hashing function to map keys to partition IDs.
*/
@Inject
- public Sender(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment) {
+ public Sender(Emitter emitter, SerializerDeserializer serDeser, Hasher hasher, Assignment assignment,
+ Cluster cluster) {
this.emitter = emitter;
this.serDeser = serDeser;
this.hasher = hasher;
this.assignment = assignment;
+ this.cluster = cluster;
+ this.cluster.addListener(this);
}
@Inject
@@ -121,4 +127,9 @@
}
}
+ @Override
+ public void onChange() {
+ resolveLocalPartitionId();
+ }
+
}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
index 71bae7a..6a60ec5 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java
@@ -20,6 +20,8 @@
import org.apache.s4.base.Emitter;
import org.apache.s4.base.Listener;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.PhysicalCluster;
import org.apache.s4.core.Receiver;
import org.apache.s4.deploy.DeploymentManager;
import org.apache.s4.deploy.NoOpDeploymentManager;
@@ -47,5 +49,8 @@
bind(Emitter.class).toInstance(Mockito.mock(Emitter.class));
bind(Listener.class).toInstance(Mockito.mock(Listener.class));
bind(Receiver.class).toInstance(Mockito.mock(Receiver.class));
+ Cluster clusterMock = Mockito.mock(Cluster.class);
+ Mockito.when(clusterMock.getPhysicalCluster()).thenReturn(new PhysicalCluster(1));
+ bind(Cluster.class).toInstance(clusterMock);
}
}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java
index c9d8635..a124db3 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCounterPE.java
@@ -23,39 +23,45 @@
import org.apache.s4.core.Stream;
public class WordCounterPE extends ProcessingElement {
-
+
int wordCounter;
transient Stream<WordCountEvent> wordClassifierStream;
- private WordCounterPE() {}
-
+ private WordCounterPE() {
+ }
+
public WordCounterPE(App app) {
super(app);
}
-
+
public void setWordClassifierStream(Stream<WordCountEvent> stream) {
this.wordClassifierStream = stream;
}
- public void onEvent(WordSeenEvent event) {
+ public void onEvent(WordSeenEvent event) {
+
wordCounter++;
System.out.println("seen word " + event.getWord());
- // NOTE: it seems the id is the key for now...
+ // NOTE: it seems the id is the key for now...
wordClassifierStream.put(new WordCountEvent(getId(), wordCounter));
+
+ // add some tests for partition count and id
+ if (!((getApp().getPartitionCount() == 1) && (getApp().getPartitionId() == 0))) {
+ throw new RuntimeException("Invalid partitioning");
+ }
+
}
@Override
protected void onCreate() {
// TODO Auto-generated method stub
-
+
}
@Override
protected void onRemove() {
// TODO Auto-generated method stub
-
- }
-
+ }
}