blob: 2bb3a828d0adfacc8636637120b43f17d3b99b05 [file] [log] [blame]
package org.apache.s4.wordcount;
import org.apache.s4.core.App;
import org.apache.s4.core.ProcessingElement;
import org.apache.s4.core.Stream;
public class WordCounterPE extends ProcessingElement {
int wordCounter;
transient Stream<WordCountEvent> wordClassifierStream;
private WordCounterPE() {}
public WordCounterPE(App app) {
super(app);
}
public void setWordClassifierStream(Stream<WordCountEvent> stream) {
this.wordClassifierStream = stream;
}
public void onEvent(WordSeenEvent event) {
wordCounter++;
System.out.println("seen word " + event.getWord());
// NOTE: it seems the id is the key for now...
wordClassifierStream.put(new WordCountEvent(getId(), wordCounter));
}
@Override
protected void onCreate() {
// TODO Auto-generated method stub
}
@Override
protected void onRemove() {
// TODO Auto-generated method stub
}
}