[FLINK-31173] TailOperator should have only one input
This closes #216.
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
index 8f71677..8f8fc3e 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
@@ -21,6 +21,7 @@
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.dag.Transformation;
import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
import org.apache.flink.iteration.operator.HeadOperator;
import org.apache.flink.iteration.operator.HeadOperatorFactory;
@@ -38,6 +39,7 @@
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
@@ -476,13 +478,24 @@
return new DataStreamList(
map(
dataStreams,
- (index, dataStream) ->
- ((DataStream<IterationRecord<?>>) dataStream)
- .transform(
- "tail-" + dataStream.getTransformation().getName(),
- new IterationRecordTypeInfo(dataStream.getType()),
- new TailOperator(iterationId, startIndex + index))
- .setParallelism(dataStream.getParallelism())));
+ (index, dataStream) -> {
+ Transformation<?> inputTransformation = dataStream.getTransformation();
+ if (!(inputTransformation instanceof PhysicalTransformation)
+ && inputTransformation.getInputs().size() > 1) {
+ // TODO: Support epoch watermark alignment for TailOperator.
+ throw new UnsupportedOperationException(
+ "Tail operator should have only one input. Please check whether operator \""
+ + inputTransformation.getName()
+ + "\" contains multiple inputs.");
+ }
+
+ return ((DataStream<IterationRecord<?>>) dataStream)
+ .transform(
+ "tail-" + dataStream.getTransformation().getName(),
+ new IterationRecordTypeInfo(dataStream.getType()),
+ new TailOperator(iterationId, startIndex + index))
+ .setParallelism(dataStream.getParallelism());
+ }));
}
@SuppressWarnings({"unchecked", "rawtypes"})