Setting stream codec on all the input ports of the stream
diff --git a/demos/src/main/java/com/datatorrent/demos/frauddetect/Application.java b/demos/src/main/java/com/datatorrent/demos/frauddetect/Application.java
index 7259ce2..5d76264 100644
--- a/demos/src/main/java/com/datatorrent/demos/frauddetect/Application.java
+++ b/demos/src/main/java/com/datatorrent/demos/frauddetect/Application.java
@@ -23,12 +23,15 @@
import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
import com.datatorrent.lib.math.RangeKeyVal;
import com.datatorrent.lib.multiwindow.SimpleMovingAverage;
+import com.datatorrent.lib.util.BaseKeyValueOperator;
import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.demos.frauddetect.operator.HdfsStringOutputOperator;
import com.datatorrent.demos.frauddetect.operator.MongoDBOutputOperator;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
import java.net.URI;
/**
@@ -172,6 +175,8 @@
return oper;
}
+ public static class KeyPartitionCodec<K, V> extends BaseKeyValueOperator.DefaultPartitionCodec<K,V> implements Serializable {}
+
/**
* Create the DAG
*/
@@ -265,6 +270,12 @@
dag.addStream("bankInfoCount", smsOperator.integerSum, binSampler.txCountInputPort);
dag.addStream("filteredTransactions", txBucketOperator.txOutputPort, rangeOperator.data, smaOperator.data, avgAlertingOperator.txInputPort);
+
+ KeyPartitionCodec<MerchantKey, Long> txCodec = new KeyPartitionCodec<MerchantKey, Long>();
+ dag.setInputPortAttribute(rangeOperator.data, Context.PortContext.STREAM_CODEC, txCodec);
+ dag.setInputPortAttribute(smaOperator.data, Context.PortContext.STREAM_CODEC, txCodec);
+ dag.setInputPortAttribute(avgAlertingOperator.txInputPort, Context.PortContext.STREAM_CODEC, txCodec);
+
dag.addStream("creditCardData", txBucketOperator.ccAlertOutputPort, ccSamplerOperator.inputPort);
dag.addStream("txnSummaryData", txBucketOperator.summaryTxnOutputPort, txSummaryWsOutput.input);