SAMOA-51: Update Flink Module to v0.10.1
Fix #44
diff --git a/pom.xml b/pom.xml
index 71b131f..af8fe98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,7 +137,7 @@
<miniball.version>1.0.3</miniball.version>
<s4.version>0.6.0-incubating</s4.version>
<samza.version>0.7.0</samza.version>
- <flink.version>0.9.0</flink.version>
+ <flink.version>0.10.1</flink.version>
<slf4j-log4j12.version>1.7.2</slf4j-log4j12.version>
<slf4j-simple.version>1.7.5</slf4j-simple.version>
<maven-surefire-plugin.version>2.18</maven-surefire-plugin.version>
diff --git a/samoa-flink/pom.xml b/samoa-flink/pom.xml
index f0266fa..5575643 100644
--- a/samoa-flink/pom.xml
+++ b/samoa-flink/pom.xml
@@ -70,7 +70,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-core</artifactId>
+ <artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java b/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java
index cd0b82c..7805371 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/FlinkDoTask.java
@@ -21,16 +21,9 @@
*/
import com.github.javacliparser.ClassOption;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.samoa.flink.helpers.CircleDetection;
-import org.apache.samoa.flink.helpers.Utils;
import org.apache.samoa.flink.topology.impl.FlinkComponentFactory;
-import org.apache.samoa.flink.topology.impl.FlinkProcessingItem;
-import org.apache.samoa.flink.topology.impl.FlinkStream;
import org.apache.samoa.flink.topology.impl.FlinkTopology;
import org.apache.samoa.tasks.Task;
import org.slf4j.Logger;
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CycleDetection.java
similarity index 85%
rename from samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java
rename to samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CycleDetection.java
index 400e49c..9aedb25 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CircleDetection.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/CycleDetection.java
@@ -26,12 +26,12 @@
import java.util.Stack;
/**
- * This class contains all logic needed in order to mark circles in job graphs explicitly such as
- * in the case of Apache Flink. A circle is defined as a list of node ids ordered in topological
+ * This class contains all logic needed in order to mark cycles in job graphs explicitly such as
+ * in the case of Apache Flink. A cycle is defined as a list of node ids ordered in topological
* (DFS) order.
*
*/
-public class CircleDetection {
+public class CycleDetection {
private int[] index;
private int[] lowLink;
private int counter;
@@ -40,12 +40,12 @@
List<Integer>[] graph;
- public CircleDetection() {
- stack = new Stack<Integer>();
+ public CycleDetection() {
+ stack = new Stack<>();
scc = new ArrayList<>();
}
- public List<List<Integer>> getCircles(List<Integer>[] adjacencyList) {
+ public List<List<Integer>> getCycles(List<Integer>[] adjacencyList) {
graph = adjacencyList;
index = new int[adjacencyList.length];
lowLink = new int[adjacencyList.length];
@@ -82,7 +82,7 @@
w = stack.pop();
sccComponent.add(w);
} while (neighbor != w);
- //add neighbor again, just in case it is a member of another circle
+ //add neighbor again, just in case it is a member of another cycle
stack.add(neighbor);
scc.add(sccComponent);
}
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java
index 38b4bdc..ce01567 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/helpers/Utils.java
@@ -32,8 +32,6 @@
import org.apache.samoa.flink.topology.impl.SamoaType;
import org.apache.samoa.utils.PartitioningScheme;
-import java.util.List;
-
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
public class Utils {
@@ -45,7 +43,7 @@
case BROADCAST:
return stream.broadcast();
case GROUP_BY_KEY:
- return stream.groupBy(new KeySelector<SamoaType, String>() {
+ return stream.keyBy(new KeySelector<SamoaType, String>() {
@Override
public String getKey(SamoaType samoaType) throws Exception {
return samoaType.f0;
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
index 28701df..9e3c880 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
@@ -22,20 +22,20 @@
import com.google.common.collect.Lists;
-
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.samoa.core.ContentEvent;
import org.apache.samoa.core.Processor;
import org.apache.samoa.flink.helpers.Utils;
import org.apache.samoa.topology.ProcessingItem;
import org.apache.samoa.topology.Stream;
import org.apache.samoa.utils.PartitioningScheme;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,9 +60,8 @@
private int parallelism;
private static int numberOfPIs = 0;
private int piID;
- private List<Integer> circleId; //check if we can refactor this
+ private List<Integer> cycleId; //check if we can refactor this
private boolean onIteration;
- //private int circleId; //check if we can refactor this
public FlinkProcessingItem(StreamExecutionEnvironment env, Processor proc) {
this(env, proc, 1);
@@ -79,8 +78,8 @@
this.processor = proc;
this.parallelism = parallelism;
this.piID = numberOfPIs++;
- this.circleId = new ArrayList<Integer>() {
- }; // if size equals 0, then it is part of no circle
+ this.cycleId = new ArrayList<Integer>() {
+ }; // if size equals 0, then it is part of no cycle
}
public Stream createStream() {
@@ -90,12 +89,12 @@
}
public void putToStream(ContentEvent data, Stream targetStream) {
- output.collect(SamoaType.of(data, targetStream.getStreamId()));
+ output.collect(new StreamRecord<>(SamoaType.of(data, targetStream.getStreamId())));
}
-
+
@Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
+ public void open() throws Exception {
+ super.open();
this.processor.onCreate(getComponentId());
}
@@ -148,8 +147,13 @@
}
@Override
- public void processElement(SamoaType samoaType) throws Exception {
- fun.processEvent(samoaType.f1);
+ public void processElement(StreamRecord<SamoaType> streamRecord) throws Exception {
+ fun.processEvent(streamRecord.getValue().f1);
+ }
+
+ @Override
+ public void processWatermark(Watermark watermark) throws Exception {
+
}
@Override
@@ -175,10 +179,6 @@
return parallelism;
}
- public void setParallelism(int parallelism) {
- this.parallelism = parallelism;
- }
-
public List<FlinkStream> getOutputStreams() {
return outputStreams;
}
@@ -187,28 +187,24 @@
return this.outStream;
}
- public void setOutStream(DataStream outStream) {
- this.outStream = outStream;
- }
-
@Override
public int getComponentId() {
return piID;
}
- public boolean isPartOfCircle() {
- return this.circleId.size() > 0;
+ public boolean isPartOfCycle() {
+ return this.cycleId.size() > 0;
}
- public List<Integer> getCircleIds() {
- return circleId;
+ public List<Integer> getCycleIds() {
+ return cycleId;
}
- public void addPItoLoop(int piId) {
- this.circleId.add(piId);
+ public void addPItoCycle(int piId) {
+ this.cycleId.add(piId);
}
- public DataStream<SamoaType> getInStream() {
+ public DataStream<SamoaType> getDataStream() {
return inStream;
}
@@ -219,6 +215,7 @@
public void setOnIteration(boolean onIteration) {
this.onIteration = onIteration;
}
+
static class SamoaDelegateFunction implements Function, Serializable {
private final Processor proc;
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java
index 31617a7..286802c 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkStream.java
@@ -36,46 +36,44 @@
public class FlinkStream extends AbstractStream implements FlinkComponent, Serializable {
private static int outputCounter = 0;
- private FlinkComponent procItem;
- private transient DataStream<SamoaType> dataStream;
- private int sourcePiId;
- private String flinkStreamId;
+ private FlinkComponent sourceComponent;
+ private transient DataStream<SamoaType> filteredStream;
+ private String filterID;
public FlinkStream(FlinkComponent sourcePi) {
- this.procItem = sourcePi;
- this.sourcePiId = sourcePi.getComponentId();
+ this.sourceComponent = sourcePi;
setStreamId("stream-" + Integer.toString(outputCounter));
- flinkStreamId = "stream-" + Integer.toString(outputCounter);
+ filterID = "stream-" + Integer.toString(outputCounter);
outputCounter++;
}
@Override
public void initialise() {
- if (procItem instanceof FlinkProcessingItem) {
- dataStream = procItem.getOutStream().filter(Utils.getFilter(getStreamId()))
- .setParallelism(((FlinkProcessingItem) procItem).getParallelism());
+ if (sourceComponent instanceof FlinkProcessingItem) {
+ filteredStream = sourceComponent.getOutStream().filter(Utils.getFilter(getStreamId()))
+ .setParallelism(((FlinkProcessingItem) sourceComponent).getParallelism());
} else
- dataStream = procItem.getOutStream();
+ filteredStream = sourceComponent.getOutStream();
}
@Override
public boolean canBeInitialised() {
- return procItem.isInitialised();
+ return sourceComponent.isInitialised();
}
@Override
public boolean isInitialised() {
- return dataStream != null;
+ return filteredStream != null;
}
@Override
public DataStream getOutStream() {
- return dataStream;
+ return filteredStream;
}
@Override
public void put(ContentEvent event) {
- ((FlinkProcessingItem) procItem).putToStream(event, this);
+ ((FlinkProcessingItem) sourceComponent).putToStream(event, this);
}
@Override
@@ -84,11 +82,11 @@
}
public int getSourcePiId() {
- return sourcePiId;
+ return sourceComponent.getComponentId();
}
@Override
public String getStreamId() {
- return flinkStreamId;
+ return filterID;
}
}
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java
index 65c52c6..a09ba71 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkTopology.java
@@ -21,17 +21,15 @@
*/
-
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.samoa.flink.helpers.CircleDetection;
-import org.apache.samoa.flink.helpers.Utils;
+import org.apache.samoa.flink.helpers.CycleDetection;
import org.apache.samoa.topology.AbstractTopology;
import org.apache.samoa.topology.EntranceProcessingItem;
import org.apache.samoa.utils.PartitioningScheme;
@@ -43,144 +41,148 @@
/**
* A SAMOA topology on Apache Flink
- *
+ * <p/>
* A Samoa-Flink Streaming Topology is DAG of ProcessingItems encapsulated within custom operators.
- * Streams are tagged and filtered in each operator's output so they can be routed to the right
+ * Streams are tagged and filtered in each operator's output so they can be routed to the right
* operator respectively. Building a Flink topology from a Samoa task involves invoking all these
- * stream transformations and finally, marking and initiating loops in the graph. We have to do that
- * since Flink only allows explicit loops in the topology started with 'iterate()' and closed with
- * 'closeWith()'. Thus, when we build a flink topology we have to do it incrementally from the
- * sources, mark loops and initialize them with explicit iterations.
- *
+ * stream transformations and finally, marking and initiating cycles in the graph. We have to do that
+ * since Flink only allows explicit cycles in the topology started with 'iterate()' and closed with
+ * 'closeWith()'. Thus, when we build a flink topology we have to do it incrementally from the
+ * sources, mark cycles and initialize them with explicit iterations.
*/
public class FlinkTopology extends AbstractTopology {
- private static final Logger logger = LoggerFactory.getLogger(FlinkTopology.class);
- public static StreamExecutionEnvironment env;
- public List<List<FlinkProcessingItem>> topologyLoops = new ArrayList<>();
- public List<Integer> backEdges = new ArrayList<Integer>();
+ private static final Logger logger = LoggerFactory.getLogger(FlinkTopology.class);
+ public static StreamExecutionEnvironment env;
+ public List<List<FlinkProcessingItem>> cycles = new ArrayList<>();
+ public List<Integer> backEdges = new ArrayList<Integer>();
- public FlinkTopology(String name, StreamExecutionEnvironment env) {
- super(name);
- this.env = env;
- }
+ public FlinkTopology(String name, StreamExecutionEnvironment env) {
+ super(name);
+ this.env = env;
+ }
- public StreamExecutionEnvironment getEnvironment() {
- return env;
- }
-
- public void build() {
- markCircles();
- for (EntranceProcessingItem src : getEntranceProcessingItems()) {
- ((FlinkEntranceProcessingItem) src).initialise();
- }
- initComponents(ImmutableList.copyOf(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class)));
- }
+ public StreamExecutionEnvironment getEnvironment() {
+ return env;
+ }
- private void initComponents(ImmutableList<FlinkProcessingItem> flinkComponents) {
- if (flinkComponents.isEmpty()) return;
+ public void build() {
+ markCycles();
+ for (EntranceProcessingItem src : getEntranceProcessingItems()) {
+ ((FlinkEntranceProcessingItem) src).initialise();
+ }
+ initComponents(ImmutableList.copyOf(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class)));
+ }
- for (FlinkProcessingItem comp : flinkComponents) {
- if (comp.canBeInitialised() && !comp.isInitialised() && !comp.isPartOfCircle()) {
- comp.initialise();
- comp.initialiseStreams();
+ private void initComponents(ImmutableList<FlinkProcessingItem> flinkComponents) {
+ if (flinkComponents.isEmpty()) return;
- }//if component is part of one or more circle
- else if (comp.isPartOfCircle() && !comp.isInitialised()) {
- for (Integer circle : comp.getCircleIds()) {
- //check if circle can be initialized
- if (checkCircleReady(circle)) {
- logger.debug("Circle: " + circle + " can be initialised");
- initialiseCircle(circle);
- } else {
- logger.debug("Circle cannot be initialised");
- }
- }
- }
+ for (FlinkProcessingItem comp : flinkComponents) {
+ if (comp.canBeInitialised() && !comp.isInitialised() && !comp.isPartOfCycle()) {
+ comp.initialise();
+ comp.initialiseStreams();
- }
- initComponents(ImmutableList.copyOf(Iterables.filter(flinkComponents, new Predicate<FlinkProcessingItem>() {
- @Override
- public boolean apply(FlinkProcessingItem flinkComponent) {
- return !flinkComponent.isInitialised();
- }
- })));
- }
+ }//if component is part of one or more cycle
+ else if (comp.isPartOfCycle() && !comp.isInitialised()) {
+ for (Integer cycle : comp.getCycleIds()) {
+ //check if cycle can be initialized
+ if (completenessCheck(cycle)) {
+ logger.debug("Cycle: " + cycle + " can be initialised");
+ initializeCycle(cycle);
+ } else {
+ logger.debug("Cycle cannot be initialised");
+ }
+ }
+ }
+ }
+ initComponents(ImmutableList.copyOf(Iterables.filter(flinkComponents, new Predicate<FlinkProcessingItem>() {
+ @Override
+ public boolean apply(FlinkProcessingItem flinkComponent) {
+ return !flinkComponent.isInitialised();
+ }
+ })));
+ }
- private void markCircles(){
- List<FlinkProcessingItem> pis = Lists.newArrayList(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class));
- List<Integer>[] graph = new List[pis.size()];
- FlinkProcessingItem[] processingItems = new FlinkProcessingItem[pis.size()];
+ /**
+ * Detects and marks all cycles and backedges needed to construct a Flink topology
+ */
+ private void markCycles() {
+ List<FlinkProcessingItem> pis = Lists.newArrayList(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class));
+ List<Integer>[] graph = new List[pis.size()];
+ FlinkProcessingItem[] processingItems = new FlinkProcessingItem[pis.size()];
- for (int i=0;i<pis.size();i++) {
- graph[i] = new ArrayList<Integer>();
- }
- //construct the graph of the topology for the Processing Items (No entrance pi is included)
- for (FlinkProcessingItem pi: pis) {
- processingItems[pi.getComponentId()] = pi;
- for (Tuple3<FlinkStream, PartitioningScheme, Integer> is : pi.getInputStreams()) {
- if (is.f2 != -1) graph[is.f2].add(pi.getComponentId());
- }
- }
- for (int g=0;g<graph.length;g++)
- logger.debug(graph[g].toString());
+ for (int i = 0; i < pis.size(); i++) {
+ graph[i] = new ArrayList<>();
+ }
+ //construct the graph of the topology for the Processing Items (No entrance pi is included)
+ for (FlinkProcessingItem pi : pis) {
+ processingItems[pi.getComponentId()] = pi;
+ for (Tuple3<FlinkStream, PartitioningScheme, Integer> is : pi.getInputStreams()) {
+ if (is.f2 != -1) graph[is.f2].add(pi.getComponentId());
+ }
+ }
+ for (int g = 0; g < graph.length; g++)
+ logger.debug(graph[g].toString());
- CircleDetection detCircles = new CircleDetection();
- List<List<Integer>> circles = detCircles.getCircles(graph);
+ CycleDetection detCycles = new CycleDetection();
+ List<List<Integer>> graphCycles = detCycles.getCycles(graph);
- //update PIs, regarding being part of a circle.
- for (List<Integer> c : circles){
- List<FlinkProcessingItem> circle = new ArrayList<>();
- for (Integer it : c){
- circle.add(processingItems[it]);
- processingItems[it].addPItoLoop(topologyLoops.size());
- }
- topologyLoops.add(circle);
- backEdges.add(circle.get(0).getComponentId());
- }
- logger.debug("Circles detected in the topology: " + circles);
- }
-
+ //update PIs, regarding being part of a cycle.
+ for (List<Integer> c : graphCycles) {
+ List<FlinkProcessingItem> cycle = new ArrayList<>();
+ for (Integer it : c) {
+ cycle.add(processingItems[it]);
+ processingItems[it].addPItoCycle(cycles.size());
+ }
+ cycles.add(cycle);
+ backEdges.add(cycle.get(0).getComponentId());
+ }
+ logger.debug("Cycles detected in the topology: " + graphCycles);
+ }
- private boolean checkCircleReady(int circleId) {
- List<Integer> circleIds = new ArrayList<>();
+ private boolean completenessCheck(int cycleId) {
- for (FlinkProcessingItem pi : topologyLoops.get(circleId)) {
- circleIds.add(pi.getComponentId());
- }
- //check that all incoming to the circle streams are initialised
- for (FlinkProcessingItem procItem : topologyLoops.get(circleId)) {
- for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : procItem.getInputStreams()) {
- //if a inputStream is not initialized AND source of inputStream is not in the circle or a tail of other circle
- if ((!inputStream.f0.isInitialised()) && (!circleIds.contains(inputStream.f2)) && (!backEdges.contains(inputStream.f2)))
- return false;
- }
- }
- return true;
- }
+ List<Integer> cycleIDs = new ArrayList<>();
- private void initialiseCircle(int circleId) {
- //get the head and tail of circle
- FlinkProcessingItem tail = topologyLoops.get(circleId).get(0);
- FlinkProcessingItem head = topologyLoops.get(circleId).get(topologyLoops.get(circleId).size() - 1);
+ for (FlinkProcessingItem pi : cycles.get(cycleId)) {
+ cycleIDs.add(pi.getComponentId());
+ }
+ //check that all incoming to the cycle streams are initialised
+ for (FlinkProcessingItem procItem : cycles.get(cycleId)) {
+ for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : procItem.getInputStreams()) {
+ //if a inputStream is not initialized AND source of inputStream is not in the cycle or a tail of other cycle
+ if ((!inputStream.f0.isInitialised()) && (!cycleIDs.contains(inputStream.f2)) && (!backEdges.contains(inputStream.f2)))
+ return false;
+ }
+ }
+ return true;
+ }
- //initialise source stream of the iteration, so as to use it for the iteration starting point
- if (!head.isInitialised()) {
- head.setOnIteration(true);
- head.initialise();
- head.initialiseStreams();
- }
+ private void initializeCycle(int cycleID) {
+ //get the head and tail of cycle
+ FlinkProcessingItem tail = cycles.get(cycleID).get(0);
+ FlinkProcessingItem head = cycles.get(cycleID).get(cycles.get(cycleID).size() - 1);
- //initialise all nodes after head
- for (int node = topologyLoops.get(circleId).size() - 2; node >= 0; node--) {
- topologyLoops.get(circleId).get(node).initialise();
- topologyLoops.get(circleId).get(node).initialiseStreams();
- }
+ //initialise source stream of the iteration, so as to use it for the iteration starting point
+ if (!head.isInitialised()) {
+ head.setOnIteration(true);
+ head.initialise();
+ head.initialiseStreams();
+ }
- ((IterativeDataStream) head.getInStream()).closeWith(head.getInputStreamBySourceID(tail.getComponentId()).getOutStream());
- }
+ //initialise all nodes after head
+ for (int node = cycles.get(cycleID).size() - 2; node >= 0; node--) {
+ FlinkProcessingItem processingItem = cycles.get(cycleID).get(node);
+ processingItem.initialise();
+ processingItem.initialiseStreams();
+ }
+
+ SingleOutputStreamOperator backedge = (SingleOutputStreamOperator) head.getInputStreamBySourceID(tail.getComponentId()).getOutStream();
+ backedge.setParallelism(head.getParallelism());
+ ((IterativeStream) head.getDataStream()).closeWith(backedge);
+ }
}