Setting stream codec on all the input ports of the stream
diff --git a/demos/src/main/java/com/datatorrent/demos/frauddetect/Application.java b/demos/src/main/java/com/datatorrent/demos/frauddetect/Application.java
index 7259ce2..5d76264 100644
--- a/demos/src/main/java/com/datatorrent/demos/frauddetect/Application.java
+++ b/demos/src/main/java/com/datatorrent/demos/frauddetect/Application.java
@@ -23,12 +23,15 @@
 import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
 import com.datatorrent.lib.math.RangeKeyVal;
 import com.datatorrent.lib.multiwindow.SimpleMovingAverage;
+import com.datatorrent.lib.util.BaseKeyValueOperator;
 import com.datatorrent.lib.util.KeyValPair;
 import com.datatorrent.demos.frauddetect.operator.HdfsStringOutputOperator;
 import com.datatorrent.demos.frauddetect.operator.MongoDBOutputOperator;
 import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
 import java.net.URI;
 
 /**
@@ -172,6 +175,8 @@
     return oper;
   }
 
+  public static class KeyPartitionCodec<K, V> extends BaseKeyValueOperator.DefaultPartitionCodec<K,V> implements Serializable {}
+
   /**
    * Create the DAG
    */
@@ -265,6 +270,12 @@
       dag.addStream("bankInfoCount", smsOperator.integerSum, binSampler.txCountInputPort);
 
       dag.addStream("filteredTransactions", txBucketOperator.txOutputPort, rangeOperator.data, smaOperator.data, avgAlertingOperator.txInputPort);
+
+      KeyPartitionCodec<MerchantKey, Long> txCodec = new KeyPartitionCodec<MerchantKey, Long>();
+      dag.setInputPortAttribute(rangeOperator.data, Context.PortContext.STREAM_CODEC, txCodec);
+      dag.setInputPortAttribute(smaOperator.data, Context.PortContext.STREAM_CODEC, txCodec);
+      dag.setInputPortAttribute(avgAlertingOperator.txInputPort, Context.PortContext.STREAM_CODEC, txCodec);
+
       dag.addStream("creditCardData", txBucketOperator.ccAlertOutputPort, ccSamplerOperator.inputPort);
       dag.addStream("txnSummaryData", txBucketOperator.summaryTxnOutputPort, txSummaryWsOutput.input);