APEXMALHAR-2493 Fixed the issue of KafkaSinglePortExactlyOnceOutputOperator going to the blocked state during recovery
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
index 75af448..23c519f 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -352,6 +352,11 @@
 
         for (ConsumerRecord<String, T> consumerRecord : consumerRecords) {
 
+          if (consumerRecord.offset() >= currentOffset) {
+            crossedBoundary = true;
+            break;
+          }
+
           if (!doesKeyBelongsToThisInstance(operatorId, consumerRecord.key())) {
             continue;
           }
@@ -365,10 +370,6 @@
             partialWindowTuples.put(value, 1);
           }
 
-          if (consumerRecord.offset() >= currentOffset) {
-            crossedBoundary = true;
-            break;
-          }
         }
 
         if (crossedBoundary) {