[FLINK-31173] TailOperator should have only one input

This closes #216.
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
index 8f71677..8f8fc3e 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
@@ -21,6 +21,7 @@
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
 import org.apache.flink.iteration.operator.HeadOperator;
 import org.apache.flink.iteration.operator.HeadOperatorFactory;
@@ -38,6 +39,7 @@
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
 import org.apache.flink.util.Collector;
 
 import java.util.ArrayList;
@@ -476,13 +478,24 @@
         return new DataStreamList(
                 map(
                         dataStreams,
-                        (index, dataStream) ->
-                                ((DataStream<IterationRecord<?>>) dataStream)
-                                        .transform(
-                                                "tail-" + dataStream.getTransformation().getName(),
-                                                new IterationRecordTypeInfo(dataStream.getType()),
-                                                new TailOperator(iterationId, startIndex + index))
-                                        .setParallelism(dataStream.getParallelism())));
+                        (index, dataStream) -> {
+                            Transformation<?> inputTransformation = dataStream.getTransformation();
+                            if (!(inputTransformation instanceof PhysicalTransformation)
+                                    && inputTransformation.getInputs().size() > 1) {
+                                // TODO: Support epoch watermark alignment for TailOperator.
+                                throw new UnsupportedOperationException(
+                                        "Tail operator should have only one input. Please check whether operator \""
+                                                + inputTransformation.getName()
+                                                + "\" contains multiple inputs.");
+                            }
+
+                            return ((DataStream<IterationRecord<?>>) dataStream)
+                                    .transform(
+                                            "tail-" + dataStream.getTransformation().getName(),
+                                            new IterationRecordTypeInfo(dataStream.getType()),
+                                            new TailOperator(iterationId, startIndex + index))
+                                    .setParallelism(dataStream.getParallelism());
+                        }));
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})