SAMOA-51: Update Flink Module to v0.10.1
Fix #44
diff --git a/pom.xml b/pom.xml
index 71b131f..af8fe98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,7 +137,7 @@
         <miniball.version>1.0.3</miniball.version>
         <s4.version>0.6.0-incubating</s4.version>
         <samza.version>0.7.0</samza.version>
-        <flink.version>0.9.0</flink.version>
+        <flink.version>0.10.1</flink.version>
         <slf4j-log4j12.version>1.7.2</slf4j-log4j12.version>
         <slf4j-simple.version>1.7.5</slf4j-simple.version>
         <maven-surefire-plugin.version>2.18</maven-surefire-plugin.version>
diff --git a/samoa-flink/pom.xml b/samoa-flink/pom.xml
index f0266fa..5575643 100644
--- a/samoa-flink/pom.xml
+++ b/samoa-flink/pom.xml
@@ -70,7 +70,7 @@
         </dependency>
   		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-core</artifactId>
+			<artifactId>flink-streaming-java</artifactId>
 			<version>${flink.version}</version>
             <!--<scope>provided</scope>-->
 		</dependency>
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java b/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java
index cd0b82c..7805371 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java
@@ -21,16 +21,9 @@
  */
 
 import com.github.javacliparser.ClassOption;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.samoa.flink.helpers.CircleDetection;
-import org.apache.samoa.flink.helpers.Utils;
 import org.apache.samoa.flink.topology.impl.FlinkComponentFactory;
-import org.apache.samoa.flink.topology.impl.FlinkProcessingItem;
-import org.apache.samoa.flink.topology.impl.FlinkStream;
 import org.apache.samoa.flink.topology.impl.FlinkTopology;
 import org.apache.samoa.tasks.Task;
 import org.slf4j.Logger;
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CycleDetection.java
similarity index 85%
rename from samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java
rename to samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CycleDetection.java
index 400e49c..9aedb25 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CycleDetection.java
@@ -26,12 +26,12 @@
 import java.util.Stack;
 
 /**
- * This class contains all logic needed in order to mark circles in job graphs explicitly such as 
- * in the case of Apache Flink. A circle is defined as a list of node ids ordered in topological 
+ * This class contains all logic needed in order to mark cycles in job graphs explicitly such as 
+ * in the case of Apache Flink. A cycle is defined as a list of node ids ordered in topological 
  * (DFS) order.
  * 
  */
-public class CircleDetection {
+public class CycleDetection {
 	private int[] index;
 	private int[] lowLink;
 	private int counter;
@@ -40,12 +40,12 @@
 	List<Integer>[] graph;
 
 
-	public CircleDetection() {
-		stack = new Stack<Integer>();
+	public CycleDetection() {
+		stack = new Stack<>();
 		scc = new ArrayList<>();
 	}
 
-	public List<List<Integer>> getCircles(List<Integer>[] adjacencyList) {
+	public List<List<Integer>> getCycles(List<Integer>[] adjacencyList) {
 		graph = adjacencyList;
 		index = new int[adjacencyList.length];
 		lowLink = new int[adjacencyList.length];
@@ -82,7 +82,7 @@
 					w = stack.pop();
 					sccComponent.add(w);
 				} while (neighbor != w);
-				//add neighbor again, just in case it is a member of another circle 
+				//add neighbor again, just in case it is a member of another cycle 
 				stack.add(neighbor); 
 				scc.add(sccComponent);
 			}
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java
index 38b4bdc..ce01567 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java
@@ -32,8 +32,6 @@
 import org.apache.samoa.flink.topology.impl.SamoaType;
 import org.apache.samoa.utils.PartitioningScheme;
 
-import java.util.List;
-
 import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
 
 public class Utils {
@@ -45,7 +43,7 @@
 			case BROADCAST:
 				return stream.broadcast();
 			case GROUP_BY_KEY:
-				return stream.groupBy(new KeySelector<SamoaType, String>() {
+				return stream.keyBy(new KeySelector<SamoaType, String>() {
 					@Override
 					public String getKey(SamoaType samoaType) throws Exception {
 						return samoaType.f0;
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
index 28701df..9e3c880 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
@@ -22,20 +22,20 @@
 
 
 import com.google.common.collect.Lists;
-
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.samoa.core.ContentEvent;
 import org.apache.samoa.core.Processor;
 import org.apache.samoa.flink.helpers.Utils;
 import org.apache.samoa.topology.ProcessingItem;
 import org.apache.samoa.topology.Stream;
 import org.apache.samoa.utils.PartitioningScheme;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,9 +60,8 @@
 	private int parallelism;
 	private static int numberOfPIs = 0;
 	private int piID;
-	private List<Integer> circleId; //check if we can refactor this
+	private List<Integer> cycleId; //check if we can refactor this
 	private boolean onIteration;
-	//private int circleId; //check if we can refactor this
 
 	public FlinkProcessingItem(StreamExecutionEnvironment env, Processor proc) {
 		this(env, proc, 1);
@@ -79,8 +78,8 @@
 		this.processor = proc;
 		this.parallelism = parallelism;
 		this.piID = numberOfPIs++;
-		this.circleId = new ArrayList<Integer>() {
-		}; // if size equals 0, then it is part of no circle
+		this.cycleId = new ArrayList<Integer>() {
+		}; // if size equals 0, then it is part of no cycle
 	}
 
 	public Stream createStream() {
@@ -90,12 +89,12 @@
 	}
 
 	public void putToStream(ContentEvent data, Stream targetStream) {
-		output.collect(SamoaType.of(data, targetStream.getStreamId()));
+		output.collect(new StreamRecord<>(SamoaType.of(data, targetStream.getStreamId())));
 	}
-
+	
 	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
+	public void open() throws Exception {
+		super.open();
 		this.processor.onCreate(getComponentId());
 	}
 
@@ -148,8 +147,13 @@
 	}
 
 	@Override
-	public void processElement(SamoaType samoaType) throws Exception {
-		fun.processEvent(samoaType.f1);
+	public void processElement(StreamRecord<SamoaType> streamRecord) throws Exception {
+		fun.processEvent(streamRecord.getValue().f1);
+	}
+
+	@Override
+	public void processWatermark(Watermark watermark) throws Exception {
+
 	}
 
 	@Override
@@ -175,10 +179,6 @@
 		return parallelism;
 	}
 
-	public void setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-	}
-
 	public List<FlinkStream> getOutputStreams() {
 		return outputStreams;
 	}
@@ -187,28 +187,24 @@
 		return this.outStream;
 	}
 
-	public void setOutStream(DataStream outStream) {
-		this.outStream = outStream;
-	}
-
 	@Override
 	public int getComponentId() {
 		return piID;
 	}
 
-	public boolean isPartOfCircle() {
-		return this.circleId.size() > 0;
+	public boolean isPartOfCycle() {
+		return this.cycleId.size() > 0;
 	}
 
-	public List<Integer> getCircleIds() {
-		return circleId;
+	public List<Integer> getCycleIds() {
+		return cycleId;
 	}
 
-	public void addPItoLoop(int piId) {
-		this.circleId.add(piId);
+	public void addPItoCycle(int piId) {
+		this.cycleId.add(piId);
 	}
 
-	public DataStream<SamoaType> getInStream() {
+	public DataStream<SamoaType> getDataStream() {
 		return inStream;
 	}
 
@@ -219,6 +215,7 @@
 	public void setOnIteration(boolean onIteration) {
 		this.onIteration = onIteration;
 	}
+	
 
 	static class SamoaDelegateFunction implements Function, Serializable {
 		private final Processor proc;
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java
index 31617a7..286802c 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java
@@ -36,46 +36,44 @@
 public class FlinkStream extends AbstractStream implements FlinkComponent, Serializable {
 
 	private static int outputCounter = 0;
-	private FlinkComponent procItem;
-	private transient DataStream<SamoaType> dataStream;
-	private int sourcePiId;
-	private String flinkStreamId;
+	private FlinkComponent sourceComponent;
+	private transient DataStream<SamoaType> filteredStream;
+	private String filterID;
 
 	public FlinkStream(FlinkComponent sourcePi) {
-		this.procItem = sourcePi;
-		this.sourcePiId = sourcePi.getComponentId();
+		this.sourceComponent = sourcePi;
 		setStreamId("stream-" + Integer.toString(outputCounter));
-		flinkStreamId = "stream-" + Integer.toString(outputCounter);
+		filterID = "stream-" + Integer.toString(outputCounter);
 		outputCounter++;
 	}
 
 	@Override
 	public void initialise() {
-		if (procItem instanceof FlinkProcessingItem) {
-			dataStream = procItem.getOutStream().filter(Utils.getFilter(getStreamId()))
-			.setParallelism(((FlinkProcessingItem) procItem).getParallelism());
+		if (sourceComponent instanceof FlinkProcessingItem) {
+			filteredStream = sourceComponent.getOutStream().filter(Utils.getFilter(getStreamId()))
+			.setParallelism(((FlinkProcessingItem) sourceComponent).getParallelism());
 		} else
-			dataStream = procItem.getOutStream();
+			filteredStream = sourceComponent.getOutStream();
 	}
 
 	@Override
 	public boolean canBeInitialised() {
-		return procItem.isInitialised();
+		return sourceComponent.isInitialised();
 	}
 
 	@Override
 	public boolean isInitialised() {
-		return dataStream != null;
+		return filteredStream != null;
 	}
 
 	@Override
 	public DataStream getOutStream() {
-		return dataStream;
+		return filteredStream;
 	}
 
 	@Override
 	public void put(ContentEvent event) {
-		((FlinkProcessingItem) procItem).putToStream(event, this);
+		((FlinkProcessingItem) sourceComponent).putToStream(event, this);
 	}
 
 	@Override
@@ -84,11 +82,11 @@
 	}
 
 	public int getSourcePiId() {
-		return sourcePiId;
+		return sourceComponent.getComponentId();
 	}
 
 	@Override
 	public String getStreamId() {
-		return flinkStreamId;
+		return filterID;
 	}
 }
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java
index 65c52c6..a09ba71 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java
@@ -21,17 +21,15 @@
  */
 
 
-
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.samoa.flink.helpers.CircleDetection;
-import org.apache.samoa.flink.helpers.Utils;
+import org.apache.samoa.flink.helpers.CycleDetection;
 import org.apache.samoa.topology.AbstractTopology;
 import org.apache.samoa.topology.EntranceProcessingItem;
 import org.apache.samoa.utils.PartitioningScheme;
@@ -43,144 +41,148 @@
 
 /**
  * A SAMOA topology on Apache Flink
- * 
+ * <p/>
  * A Samoa-Flink Streaming Topology is DAG of ProcessingItems encapsulated within custom operators.
- * Streams are tagged and filtered in each operator's output so they can be routed to the right 
+ * Streams are tagged and filtered in each operator's output so they can be routed to the right
  * operator respectively. Building a Flink topology from a Samoa task involves invoking all these
- * stream transformations and finally, marking and initiating loops in the graph. We have to do that
- * since Flink only allows explicit loops in the topology started with 'iterate()' and closed with
- * 'closeWith()'. Thus, when we build a flink topology we have to do it incrementally from the 
- * sources, mark loops and initialize them with explicit iterations.
- * 
+ * stream transformations and finally, marking and initiating cycles in the graph. We have to do that
+ * since Flink only allows explicit cycles in the topology started with 'iterate()' and closed with
+ * 'closeWith()'. Thus, when we build a flink topology we have to do it incrementally from the
+ * sources, mark cycles and initialize them with explicit iterations.
  */
 public class FlinkTopology extends AbstractTopology {
 
-	private static final Logger logger = LoggerFactory.getLogger(FlinkTopology.class);
-	public static StreamExecutionEnvironment env;
-	public List<List<FlinkProcessingItem>> topologyLoops = new ArrayList<>();
-	public  List<Integer> backEdges = new ArrayList<Integer>();
+    private static final Logger logger = LoggerFactory.getLogger(FlinkTopology.class);
+    public static StreamExecutionEnvironment env;
+    public List<List<FlinkProcessingItem>> cycles = new ArrayList<>();
+    public List<Integer> backEdges = new ArrayList<Integer>(); 
 
-	public FlinkTopology(String name, StreamExecutionEnvironment env) {
-		super(name);
-		this.env = env;
-	}
+    public FlinkTopology(String name, StreamExecutionEnvironment env) {
+        super(name);
+        this.env = env;
+    }
 
-	public StreamExecutionEnvironment getEnvironment() {
-		return env;
-	}
-	
-	public void build() {
-		markCircles();
-		for (EntranceProcessingItem src : getEntranceProcessingItems()) {
-			((FlinkEntranceProcessingItem) src).initialise();
-		}
-		initComponents(ImmutableList.copyOf(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class)));
-	}
+    public StreamExecutionEnvironment getEnvironment() {
+        return env;
+    }
 
-	private void initComponents(ImmutableList<FlinkProcessingItem> flinkComponents) {
-		if (flinkComponents.isEmpty()) return;
+    public void build() {
+        markCycles();
+        for (EntranceProcessingItem src : getEntranceProcessingItems()) {
+            ((FlinkEntranceProcessingItem) src).initialise();
+        }
+        initComponents(ImmutableList.copyOf(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class)));
+    }
 
-		for (FlinkProcessingItem comp : flinkComponents) {
-			if (comp.canBeInitialised() && !comp.isInitialised() && !comp.isPartOfCircle()) {
-				comp.initialise();
-				comp.initialiseStreams();
+    private void initComponents(ImmutableList<FlinkProcessingItem> flinkComponents) {
+        if (flinkComponents.isEmpty()) return;
 
-			}//if component is part of one or more circle
-			else if (comp.isPartOfCircle() && !comp.isInitialised()) {
-				for (Integer circle : comp.getCircleIds()) {
-					//check if circle can be initialized
-					if (checkCircleReady(circle)) {
-						logger.debug("Circle: " + circle + " can be initialised");
-						initialiseCircle(circle);
-					} else {
-						logger.debug("Circle cannot be initialised");
-					}
-				}
-			}
+        for (FlinkProcessingItem comp : flinkComponents) {
+            if (comp.canBeInitialised() && !comp.isInitialised() && !comp.isPartOfCycle()) {
+                comp.initialise();
+                comp.initialiseStreams();
 
-		}
-		initComponents(ImmutableList.copyOf(Iterables.filter(flinkComponents, new Predicate<FlinkProcessingItem>() {
-			@Override
-			public boolean apply(FlinkProcessingItem flinkComponent) {
-				return !flinkComponent.isInitialised();
-			}
-		})));
-	}
+            }//if component is part of one or more cycle
+            else if (comp.isPartOfCycle() && !comp.isInitialised()) {
+                for (Integer cycle : comp.getCycleIds()) {
+                    //check if cycle can be initialized
+                    if (completenessCheck(cycle)) {
+                        logger.debug("Cycle: " + cycle + " can be initialised");
+                        initializeCycle(cycle);
+                    } else {
+                        logger.debug("Cycle cannot be initialised");
+                    }
+                }
+            }
+        }
+        initComponents(ImmutableList.copyOf(Iterables.filter(flinkComponents, new Predicate<FlinkProcessingItem>() {
+            @Override
+            public boolean apply(FlinkProcessingItem flinkComponent) {
+                return !flinkComponent.isInitialised();
+            }
+        })));
+    }
 
-	private void markCircles(){
-		List<FlinkProcessingItem> pis = Lists.newArrayList(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class));
-		List<Integer>[] graph = new List[pis.size()];
-		FlinkProcessingItem[] processingItems = new FlinkProcessingItem[pis.size()];
+    /**
+     * Detects and marks all cycles and backedges needed to construct a Flink topology
+     */
+    private void markCycles() {
+        List<FlinkProcessingItem> pis = Lists.newArrayList(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class));
+        List<Integer>[] graph = new List[pis.size()];
+        FlinkProcessingItem[] processingItems = new FlinkProcessingItem[pis.size()];
 
 
-		for (int i=0;i<pis.size();i++) {
-			graph[i] = new ArrayList<Integer>();
-		}
-		//construct the graph of the topology for the Processing Items (No entrance pi is included)
-		for (FlinkProcessingItem pi: pis) {
-			processingItems[pi.getComponentId()] = pi;
-			for (Tuple3<FlinkStream, PartitioningScheme, Integer> is : pi.getInputStreams()) {
-				if (is.f2 != -1) graph[is.f2].add(pi.getComponentId());
-			}
-		}
-		for (int g=0;g<graph.length;g++)
-			logger.debug(graph[g].toString());
+        for (int i = 0; i < pis.size(); i++) {
+            graph[i] = new ArrayList<>();
+        }
+        //construct the graph of the topology for the Processing Items (No entrance pi is included)
+        for (FlinkProcessingItem pi : pis) {
+            processingItems[pi.getComponentId()] = pi;
+            for (Tuple3<FlinkStream, PartitioningScheme, Integer> is : pi.getInputStreams()) {
+                if (is.f2 != -1) graph[is.f2].add(pi.getComponentId());
+            }
+        }
+        for (int g = 0; g < graph.length; g++)
+            logger.debug(graph[g].toString());
 
-		CircleDetection detCircles = new CircleDetection();
-		List<List<Integer>> circles = detCircles.getCircles(graph);
+        CycleDetection detCycles = new CycleDetection();
+        List<List<Integer>> graphCycles = detCycles.getCycles(graph);
 
-		//update PIs, regarding being part of a circle.
-		for (List<Integer> c : circles){
-			List<FlinkProcessingItem> circle = new ArrayList<>();
-			for (Integer it : c){
-				circle.add(processingItems[it]);
-				processingItems[it].addPItoLoop(topologyLoops.size());
-			}
-			topologyLoops.add(circle);
-			backEdges.add(circle.get(0).getComponentId());
-		}
-		logger.debug("Circles detected in the topology: " + circles);
-	}
-	
+        //update PIs, regarding being part of a cycle.
+        for (List<Integer> c : graphCycles) {
+            List<FlinkProcessingItem> cycle = new ArrayList<>();
+            for (Integer it : c) {
+                cycle.add(processingItems[it]);
+                processingItems[it].addPItoCycle(cycles.size());
+            }
+            cycles.add(cycle);
+            backEdges.add(cycle.get(0).getComponentId());
+        }
+        logger.debug("Cycles detected in the topology: " + graphCycles);
+    }
 
-	private boolean checkCircleReady(int circleId) {
 
-		List<Integer> circleIds = new ArrayList<>();
+    private boolean completenessCheck(int cycleId) {
 
-		for (FlinkProcessingItem pi : topologyLoops.get(circleId)) {
-			circleIds.add(pi.getComponentId());
-		}
-		//check that all incoming to the circle streams are initialised
-		for (FlinkProcessingItem procItem : topologyLoops.get(circleId)) {
-			for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : procItem.getInputStreams()) {
-				//if a inputStream is not initialized AND source of inputStream is not in the circle or a tail of other circle
-				if ((!inputStream.f0.isInitialised()) && (!circleIds.contains(inputStream.f2)) && (!backEdges.contains(inputStream.f2)))
-					return false;
-			}
-		}
-		return true;
-	}
+        List<Integer> cycleIDs = new ArrayList<>();
 
-	private void initialiseCircle(int circleId) {
-		//get the head and tail of circle
-		FlinkProcessingItem tail = topologyLoops.get(circleId).get(0);
-		FlinkProcessingItem head = topologyLoops.get(circleId).get(topologyLoops.get(circleId).size() - 1);
+        for (FlinkProcessingItem pi : cycles.get(cycleId)) {
+            cycleIDs.add(pi.getComponentId());
+        }
+        //check that all incoming to the cycle streams are initialised
+        for (FlinkProcessingItem procItem : cycles.get(cycleId)) {
+            for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : procItem.getInputStreams()) {
+                //if a inputStream is not initialized AND source of inputStream is not in the cycle or a tail of other cycle
+                if ((!inputStream.f0.isInitialised()) && (!cycleIDs.contains(inputStream.f2)) && (!backEdges.contains(inputStream.f2)))
+                    return false;
+            }
+        }
+        return true;
+    }
 
-		//initialise source stream of the iteration, so as to use it for the iteration starting point
-		if (!head.isInitialised()) {
-			head.setOnIteration(true);
-			head.initialise();
-			head.initialiseStreams();
-		}
+    private void initializeCycle(int cycleID) {
+        //get the head and tail of cycle
+        FlinkProcessingItem tail = cycles.get(cycleID).get(0);
+        FlinkProcessingItem head = cycles.get(cycleID).get(cycles.get(cycleID).size() - 1);
 
-		//initialise all nodes after head
-		for (int node = topologyLoops.get(circleId).size() - 2; node >= 0; node--) {
-			topologyLoops.get(circleId).get(node).initialise();
-			topologyLoops.get(circleId).get(node).initialiseStreams();
-		}
+        //initialise source stream of the iteration, so as to use it for the iteration starting point
+        if (!head.isInitialised()) {
+            head.setOnIteration(true);
+            head.initialise();
+            head.initialiseStreams();
+        }
 
-		((IterativeDataStream) head.getInStream()).closeWith(head.getInputStreamBySourceID(tail.getComponentId()).getOutStream());
-	}
+        //initialise all nodes after head
+        for (int node = cycles.get(cycleID).size() - 2; node >= 0; node--) {
+            FlinkProcessingItem processingItem = cycles.get(cycleID).get(node);
+            processingItem.initialise();
+            processingItem.initialiseStreams();
+        }
+
+        SingleOutputStreamOperator backedge = (SingleOutputStreamOperator) head.getInputStreamBySourceID(tail.getComponentId()).getOutStream();
+        backedge.setParallelism(head.getParallelism());
+        ((IterativeStream) head.getDataStream()).closeWith(backedge);
+    }
 
 
 }