[hotfix] Discard watermarks when feed a datastream into iteration body
This closes #223.
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
index 6e6ff9d..8f71677 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
@@ -85,6 +85,9 @@
* <p>The limitation of constructing the subgraph inside the iteration body could be refer in {@link
* IterationBody}.
*
+ * <p>Note that the iteration framework cannot deal with watermarks correctly for now. It should be
+ * resolved by FLINK-31373.
+ *
* <p>An example of the iteration is like:
*
* <pre>{@code
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
index b6908ff..604715f 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
@@ -22,6 +22,7 @@
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
/** Input operator that wraps the user record into {@link IterationRecord}. */
@@ -46,4 +47,9 @@
reusable.getValue().setValue(streamRecord.getValue());
output.collect(reusable);
}
+
+ @Override
+ public void processWatermark(Watermark mark) {
+ // TODO: FLINK-31373 Support processing watermarks in iterations.
+ }
}