APEXMALHAR-2412 Provide emitTuple overriding functionality for user in kinesis Input operator
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index 18a6399..30ceadb 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -158,6 +158,14 @@
*/
public abstract T getTuple(Record rc);
+ /**
+ * Any concrete class derived from AbstractKinesisInputOperator may implement this method to emit tuples to an output port.
+ */
+ public void emitTuple(Pair<String, Record> data)
+ {
+ outputPort.emit(getTuple(data.getSecond()));
+ }
+
@Override
public void partitioned(Map<Integer, Partition<AbstractKinesisInputOperator>> partitions)
{
@@ -465,7 +473,7 @@
List<Record> records = KinesisUtil.getInstance().getRecords(consumer.streamName, rc.getValue().getSecond(),
rc.getKey(), ShardIteratorType.AT_SEQUENCE_NUMBER, rc.getValue().getFirst());
for (Record record : records) {
- outputPort.emit(getTuple(record));
+ emitTuple(new Pair<String, Record>(rc.getKey(), record));
shardPosition.put(rc.getKey(), record.getSequenceNumber());
}
} catch(Exception e)
@@ -569,8 +577,7 @@
Pair<String, Record> data = consumer.pollRecord();
String shardId = data.getFirst();
String recordId = data.getSecond().getSequenceNumber();
- T tuple = getTuple(data.getSecond());
- outputPort.emit(tuple);
+ emitTuple(data);
if(!currentWindowRecoveryState.containsKey(shardId))
{
currentWindowRecoveryState.put(shardId, new KinesisPair<String, Integer>(recordId, 1));