APEXMALHAR-2412 Provide emitTuple overriding functionality for user in kinesis Input operator
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index 18a6399..30ceadb 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -158,6 +158,14 @@
    */
   public abstract T getTuple(Record rc);
 
+  /**
+   * Any concrete class derived from AbstractKinesisInputOperator may implement this method to emit tuples to an output port.
+   */
+  public void emitTuple(Pair<String, Record> data)
+  {
+    outputPort.emit(getTuple(data.getSecond()));
+  }
+
   @Override
   public void partitioned(Map<Integer, Partition<AbstractKinesisInputOperator>> partitions)
   {
@@ -465,7 +473,7 @@
           List<Record> records = KinesisUtil.getInstance().getRecords(consumer.streamName, rc.getValue().getSecond(),
               rc.getKey(), ShardIteratorType.AT_SEQUENCE_NUMBER, rc.getValue().getFirst());
           for (Record record : records) {
-            outputPort.emit(getTuple(record));
+            emitTuple(new Pair<String, Record>(rc.getKey(), record));
             shardPosition.put(rc.getKey(), record.getSequenceNumber());
           }
         } catch(Exception e)
@@ -569,8 +577,7 @@
       Pair<String, Record> data = consumer.pollRecord();
       String shardId = data.getFirst();
       String recordId = data.getSecond().getSequenceNumber();
-      T tuple = getTuple(data.getSecond());
-      outputPort.emit(tuple);
+      emitTuple(data);
       if(!currentWindowRecoveryState.containsKey(shardId))
       {
         currentWindowRecoveryState.put(shardId, new KinesisPair<String, Integer>(recordId, 1));