APEXMALHAR-2493 Fixed the issue of KafkaSinglePortExactlyOnceOutputOperator going to the blocked state during recovery
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
index 75af448..23c519f 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -352,6 +352,11 @@
for (ConsumerRecord<String, T> consumerRecord : consumerRecords) {
+ if (consumerRecord.offset() >= currentOffset) {
+ crossedBoundary = true;
+ break;
+ }
+
if (!doesKeyBelongsToThisInstance(operatorId, consumerRecord.key())) {
continue;
}
@@ -365,10 +370,6 @@
partialWindowTuples.put(value, 1);
}
- if (consumerRecord.offset() >= currentOffset) {
- crossedBoundary = true;
- break;
- }
}
if (crossedBoundary) {