SAMOA-16: Add an adapter for Apache Flink-Streaming (senorcarbone)
Fix #11
diff --git a/.gitignore b/.gitignore
index 3cf0208..294c718 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,4 +11,5 @@
 
 #intellij
 .idea/
-.iml
+*.iml
+*.iws
diff --git a/bin/samoa b/bin/samoa
index b34f65b..0ace74b 100755
--- a/bin/samoa
+++ b/bin/samoa
@@ -4,7 +4,7 @@
 # #%L
 # SAMOA
 # %%
-# Copyright (C) 2013 Yahoo! Inc.
+# Copyright (C) 2014 - 2015 Apache Software Foundation
 # %%
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -281,6 +281,66 @@
                --kryo_register=$BASE_DIR/$KRYO_REGISTER_FILE --pi_per_container=$PI_PER_CONTAINER \
 	       --samoa_hdfs_dir=$HDFS_SAMOA_HOME
 
+elif [ $PLATFORM = 'FLINK' ]; then
+
+        echo "Deploying to $PLATFORM"
+        if [ -z $FLINK_HOME ];then
+            echo "FLINK_HOME is not set!"
+            echo "Please set FLINK_HOME to point to your Flink installation"
+            exit -1
+        fi
+
+        if [ ! -f $2 ];then
+            echo "$2 does not exist!"
+            echo "Please use a valid jar file for Flink execution"
+            exit -1
+        fi
+
+        FLINK_EXEC="$FLINK_HOME/bin/flink"
+
+        SAMOA_FLINK_PROPERTIES="samoa-flink.properties"
+        MODE_OPTION="samoa.flink.mode"
+#        NUM_WORKER_OPTION="samoa.flink.numWorker"
+
+        VALUE=""
+        getvalue()
+        {
+            VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_FLINK_PROPERTIES | grep "$1" | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'`
+        }
+        
+#        getvalue "$NUM_WORKER_OPTION"
+#        NUM_WORKER="$VALUE"
+
+        getvalue "$MODE_OPTION"
+        MODE_ARG="$VALUE"
+
+        COMPLETE_ARG=""
+        COUNTER=0
+        for var in "$@"
+        do
+            COUNTER=`expr $COUNTER + 1`
+            if [ $COUNTER -gt 2 ];then
+                COMPLETE_ARG="$COMPLETE_ARG $var"
+            fi
+        done
+
+        DEPLOYABLE=$JAR_PATH
+        echo "$DEPLOYABLE"
+        if [ "$MODE_ARG" = "cluster" ]; then
+				FLINK_MASTER_OPTION="samoa.flink.flinkMaster"
+				PORT_OPTION="samoa.flink.port"
+
+				getvalue "$FLINK_MASTER_OPTION"
+				FLINK_MASTER_OPTION="$VALUE"
+
+				getvalue "$PORT_OPTION"
+				PORT_OPTION="$VALUE"
+                $FLINK_EXEC run -m $FLINK_MASTER_OPTION:$PORT_OPTION $DEPLOYABLE  $COMPLETE_ARG
+
+        elif [ "$MODE_ARG" = "local" ]; then
+                $FLINK_EXEC run $DEPLOYABLE $COMPLETE_ARG
+        fi                                                    
+
 elif [ $PLATFORM = 'THREADS' ]; then
 	
 	echo "Deploying to LOCAL with MULTITHREADING."
diff --git a/bin/samoa-flink.properties b/bin/samoa-flink.properties
new file mode 100644
index 0000000..b9f56c0
--- /dev/null
+++ b/bin/samoa-flink.properties
@@ -0,0 +1,35 @@
+###
+# #%L
+# SAMOA
+# %%
+# Copyright (C) 2014 - 2015 Apache Software Foundation
+# %%
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# #L%
+###
+
+# SAMOA Flink properties file
+# This file contains specific configurations for SAMOA deployment in Flink platform
+
+# samoa.flink.mode corresponds to the execution mode of a Task in Flink
+# possible values:
+# 1. local   - to run the task in local StreamingEnvironment
+# 2. cluster - to run the task in the specified cluster
+samoa.flink.mode=cluster
+
+#in case samoa.flink.mode equals "cluster", then the user has to set up also the following parameters:
+# @samoa.flink.flinkMaster: the IP address of the cluster
+# @samoa.flink.port : the port
+samoa.flink.flinkMaster=127.0.0.1
+samoa.flink.port=6123
+
diff --git a/pom.xml b/pom.xml
index 622d952..819a13c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,15 @@
             </modules>
         </profile>
         <profile>
+            <id>flink</id>
+            <modules>
+                <module>samoa-instances</module>
+                <module>samoa-api</module>
+                <module>samoa-flink</module>
+                <module>samoa-test</module>
+            </modules>
+        </profile>
+        <profile>
             <id>samza</id>
             <modules>
                 <module>samoa-instances</module>
@@ -104,6 +113,7 @@
                 <module>samoa-local</module>
                 <module>samoa-threads</module>
                 <module>samoa-storm</module>
+                <module>samoa-flink</module>
                 <module>samoa-s4</module>
                 <module>samoa-samza</module>
                 <module>samoa-test</module>
@@ -130,6 +140,7 @@
         <miniball.version>1.0.3</miniball.version>
         <s4.version>0.6.0-incubating</s4.version>
         <samza.version>0.7.0</samza.version>
+        <flink.version>0.9.0-milestone-1</flink.version>
         <slf4j-log4j12.version>1.7.2</slf4j-log4j12.version>
         <slf4j-simple.version>1.7.5</slf4j-simple.version>
         <maven-surefire-plugin.version>2.18</maven-surefire-plugin.version>
@@ -211,6 +222,7 @@
                         <root>samoa-local</root>
                         <root>samoa-storm</root>
                         <root>samoa-s4</root>
+                        <root>samoa-flink</root>
                         <root>samoa-samza</root>
                         <root>bin</root>
                     </roots>
diff --git a/samoa-flink/pom.xml b/samoa-flink/pom.xml
new file mode 100644
index 0000000..f00fe3c
--- /dev/null
+++ b/samoa-flink/pom.xml
@@ -0,0 +1,134 @@
+<!--
+  #%L
+  SAMOA
+  %%
+  Copyright (C) 2013 Yahoo! Inc.
+  %%
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  #L%
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+            <kryo.version>2.24.0</kryo.version>
+    </properties>
+      <repositories>
+                <repository>
+                        <id>apache.snapshots</id>
+                        <name>Apache Development Snapshot Repository</name>
+                        <url>https://repository.apache.org/content/repositories/snapshots/</url>
+                        <releases>
+                                <enabled>false</enabled>
+                        </releases>
+                        <snapshots>
+                                <enabled>true</enabled>
+                        </snapshots>
+                </repository>
+        </repositories>
+    <name>samoa-flink</name>
+    <description>Flink engine for SAMOA</description>
+
+    <artifactId>samoa-flink</artifactId>
+    <parent>
+        <groupId>com.yahoo.labs.samoa</groupId>
+        <artifactId>samoa</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+    
+    
+
+    <dependencies>
+        <dependency>
+            <groupId>com.yahoo.labs.samoa</groupId>
+            <artifactId>samoa-api</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.esotericsoftware.kryo</groupId>
+                    <artifactId>kryo</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>${slf4j-log4j12.version}</version>
+        </dependency>
+  		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${flink.version}</version>
+            <!--<scope>provided</scope>-->
+		</dependency>
+        <dependency>
+            <groupId>com.esotericsoftware.kryo</groupId>
+            <artifactId>kryo</artifactId>
+            <version>${kryo.version}</version>
+        </dependency>
+    </dependencies>
+
+
+    <build>
+        <plugins>
+            <!-- Flink assembly -->
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>${maven-assembly-plugin.version}</version>
+                <configuration>
+                    <finalName>SAMOA-Flink-${project.version}</finalName>
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <attach>false</attach>
+                    <outputDirectory>../target</outputDirectory>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <archive>
+                        <manifestEntries>
+                            <!--<Bundle-Version>${parsedVersion.osgiVersion}</Bundle-Version>-->
+                            <Bundle-Description>${project.description}</Bundle-Description>
+                            <Implementation-Version>${project.version}</Implementation-Version>
+                            <Implementation-Vendor>Yahoo Labs</Implementation-Vendor>
+                            <Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id>
+                        </manifestEntries>
+                        <manifest>
+                            <addClasspath>true</addClasspath>
+                            <mainClass>com.yahoo.labs.flink.FlinkDoTask</mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>${maven-surefire-plugin.version}</version>
+                <configuration>
+                    <argLine>-Xmx1G</argLine>
+                    <redirectTestOutputToFile>false</redirectTestOutputToFile>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    
+</project>
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java
new file mode 100644
index 0000000..6069de9
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java
@@ -0,0 +1,87 @@
+package com.yahoo.labs.flink;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.github.javacliparser.ClassOption;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.CircleDetection;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.flink.topology.impl.FlinkComponentFactory;
+import com.yahoo.labs.flink.topology.impl.FlinkProcessingItem;
+import com.yahoo.labs.flink.topology.impl.FlinkStream;
+import com.yahoo.labs.flink.topology.impl.FlinkTopology;
+import com.yahoo.labs.samoa.tasks.Task;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+
+/**
+ * Main class to run a SAMOA on Apache Flink
+ */
+public class FlinkDoTask {
+
+	private static final Logger logger = LoggerFactory.getLogger(FlinkDoTask.class);
+
+
+	public static void main(String[] args) throws Exception {
+		List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+
+		args = tmpArgs.toArray(new String[0]);
+
+		// Init Task
+		StringBuilder cliString = new StringBuilder();
+		for (int i = 0; i < args.length; i++) {
+			cliString.append(" ").append(args[i]);
+		}
+		logger.debug("Command line string = {}", cliString.toString());
+		System.out.println("Command line string = " + cliString.toString());
+
+		Task task;
+		try {
+			task = ClassOption.cliStringToObject(cliString.toString(), Task.class, null);
+			logger.debug("Successfully instantiating {}", task.getClass().getCanonicalName());
+		} catch (Exception e) {
+			logger.error("Failed to initialize the task: ", e);
+			System.out.println("Failed to initialize the task: " + e);
+			return;
+		}
+		
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		task.setFactory(new FlinkComponentFactory(env));
+		task.init();
+		
+		logger.debug("Building Flink topology...");
+		((FlinkTopology) task.getTopology()).build();
+		
+		logger.debug("Submitting the job...");
+		env.execute();
+
+	}
+
+
+	
+
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java
new file mode 100644
index 0000000..a832ee9
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java
@@ -0,0 +1,99 @@
+package com.yahoo.labs.flink.com.yahoo.labs.flink.helpers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+/**
+ * This class contains all logic needed in order to mark circles in job graphs explicitly such as 
+ * in the case of Apache Flink. A circle is defined as a list of node ids ordered in topological 
+ * (DFS) order.
+ * 
+ */
+public class CircleDetection {
+	private int[] index;
+	private int[] lowLink;
+	private int counter;
+	private Stack<Integer> stack;
+	private List<List<Integer>> scc;
+	List<Integer>[] graph;
+
+
+	public CircleDetection() {
+		stack = new Stack<Integer>();
+		scc = new ArrayList<>();
+	}
+
+	public List<List<Integer>> getCircles(List<Integer>[] adjacencyList) {
+		graph = adjacencyList;
+		index = new int[adjacencyList.length];
+		lowLink = new int[adjacencyList.length];
+		counter = 0;
+
+		//initialize index and lowLink as "undefined"(=-1)
+		for (int j = 0; j < graph.length; j++) {
+			index[j] = -1;
+			lowLink[j] = -1;
+		}
+		for (int v = 0; v < graph.length; v++) {
+			if (index[v] == -1) { //undefined.
+				findSCC(v);
+			}
+		}
+		return scc;
+	}
+
+	private void findSCC(int node) {
+		index[node] = counter;
+		lowLink[node] = counter;
+		counter++;
+		stack.push(node);
+
+		for (int neighbor : graph[node]) {
+			if (index[neighbor] == -1) {
+				findSCC(neighbor);
+				lowLink[node] = Math.min(lowLink[node], lowLink[neighbor]);
+			} else if (stack.contains(neighbor)) { //if neighbor has been already visited
+				lowLink[node] = Math.min(lowLink[node], index[neighbor]);
+				List<Integer> sccComponent = new ArrayList<Integer>();
+				int w;
+				do {
+					w = stack.pop();
+					sccComponent.add(w);
+				} while (neighbor != w);
+				//add neighbor again, just in case it is a member of another circle 
+				stack.add(neighbor); 
+				scc.add(sccComponent);
+			}
+
+		}
+		if (lowLink[node] == index[node]) {
+			int w;
+			do {
+				w = stack.pop();
+			} while (node != w);
+		}
+	}
+
+}
\ No newline at end of file
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java
new file mode 100644
index 0000000..fe1b960
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java
@@ -0,0 +1,69 @@
+package com.yahoo.labs.flink.com.yahoo.labs.flink.helpers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import com.yahoo.labs.flink.topology.impl.SamoaType;
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import java.util.List;
+
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+
+public class Utils {
+
+	public static TypeInformation<SamoaType> tempTypeInfo = new TupleTypeInfo(SamoaType.class, STRING_TYPE_INFO, TypeExtractor.getForClass(ContentEvent.class), STRING_TYPE_INFO);
+
+	public static DataStream subscribe(DataStream<SamoaType> stream, PartitioningScheme partitioning) {
+		switch (partitioning) {
+			case BROADCAST:
+				return stream.broadcast();
+			case GROUP_BY_KEY:
+				return stream.groupBy(new KeySelector<SamoaType, String>() {
+					@Override
+					public String getKey(SamoaType samoaType) throws Exception {
+						return samoaType.f0;
+					}
+				});
+			case SHUFFLE:
+			default:
+				return stream.shuffle();
+		}
+	}
+
+	public static FilterFunction<SamoaType> getFilter(final String streamID) {
+		return new FilterFunction<SamoaType>() {
+			@Override
+			public boolean filter(SamoaType o) throws Exception {
+				return o.f2.equals(streamID);
+			}
+		};
+	}
+
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java
new file mode 100644
index 0000000..70a7838
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java
@@ -0,0 +1,68 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * Common interface of FlinkEntranceProcessingItem and FlinkProcessingItem
+ */
+public interface FlinkComponent {
+
+	/**
+	 * An initiation of the node. It should create the right invokables and apply the appropriate
+	 * stream transformations
+	 */
+	public void initialise();
+
+	/**
+	 * This check is needed in order to determine whether all requirements for a Flink Component 
+	 * (DataStream) are satisfied in order to initialise it. This is necessary in this integration
+	 * since Flink Streaming applies eager datastream generation based on transformations.
+	 * 
+	 * @return 
+	 */
+	public boolean canBeInitialised();
+
+	/**
+	 * 
+	 * @return
+	 */
+	public boolean isInitialised();
+
+	/**
+	 * The wrapped Flink DataStream generated by this Flink component. Mind that the component 
+	 * should first be initialised in order to have a generated DataStream
+	 * 
+	 * @return
+	 */
+	public DataStream<SamoaType> getOutStream();
+
+	/**
+	 * A unique component id
+	 * 
+	 * @return
+	 */
+	public int getComponentId();
+
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java
new file mode 100644
index 0000000..fca0c1a
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java
@@ -0,0 +1,66 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.*;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * An implementation of SAMOA's ComponentFactory for Apache Flink
+ */
+public class FlinkComponentFactory implements ComponentFactory {
+
+	private StreamExecutionEnvironment env;
+
+	public FlinkComponentFactory(StreamExecutionEnvironment env) {
+		this.env = env;
+	}
+
+	@Override
+	public ProcessingItem createPi(Processor processor) {
+		return new FlinkProcessingItem(env, processor);
+	}
+
+	@Override
+	public ProcessingItem createPi(Processor processor, int parallelism) {
+		return new FlinkProcessingItem(env, processor, parallelism);
+	}
+
+	@Override
+	public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) {
+		return new FlinkEntranceProcessingItem(env, entranceProcessor);
+	}
+
+	@Override
+	public Stream createStream(IProcessingItem sourcePi) {
+		if (sourcePi instanceof FlinkProcessingItem)
+			return ((FlinkProcessingItem) sourcePi).createStream();
+		else return new FlinkStream((FlinkComponent) sourcePi);
+	}
+
+	@Override
+	public Topology createTopology(String topologyName) {
+		return new FlinkTopology(topologyName, env);
+	}
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java
new file mode 100644
index 0000000..5dca509
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java
@@ -0,0 +1,101 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.topology.AbstractEntranceProcessingItem;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+public class FlinkEntranceProcessingItem extends AbstractEntranceProcessingItem
+		implements FlinkComponent, Serializable {
+
+	private transient StreamExecutionEnvironment env;
+	private transient DataStream outStream;
+
+
+	public FlinkEntranceProcessingItem(StreamExecutionEnvironment env, EntranceProcessor proc) {
+		super(proc);
+		this.env = env;
+	}
+
+	@Override
+	public void initialise() {
+		final EntranceProcessor proc = getProcessor();
+		final String streamId = getOutputStream().getStreamId();
+		final int compID = getComponentId();
+
+		
+		outStream = env.addSource(new RichSourceFunction<SamoaType>() {
+			volatile boolean canceled;
+			EntranceProcessor entrProc = proc;
+			String id = streamId;
+
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				super.open(parameters);
+				entrProc.onCreate(compID);
+			}
+
+			@Override
+			public void run(Collector<SamoaType> collector) throws Exception {
+				while (!canceled && entrProc.hasNext()) {
+					collector.collect(SamoaType.of(entrProc.nextEvent(), id));
+				}
+			}
+
+			@Override
+			public void cancel() {
+				canceled = true;
+			}
+		},Utils.tempTypeInfo);
+
+		((FlinkStream) getOutputStream()).initialise();
+	}
+
+
+	@Override
+	public boolean canBeInitialised() {
+		return true;
+	}
+
+	@Override
+	public boolean isInitialised() {
+		return outStream != null;
+	}
+
+	@Override
+	public int getComponentId() {
+		return -1; // dummy number shows that it comes from an Entrance PI
+	}
+
+	@Override
+	public DataStream getOutStream() {
+		return outStream;
+	}
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java
new file mode 100644
index 0000000..f92182e
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java
@@ -0,0 +1,248 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import com.google.common.collect.Lists;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class FlinkProcessingItem extends StreamInvokable<SamoaType, SamoaType> implements ProcessingItem, FlinkComponent, Serializable {
+
+	private static final Logger logger = LoggerFactory.getLogger(FlinkProcessingItem.class);
+	public static final int MAX_WAIT_TIME_MILLIS = 10000;
+
+	private final Processor processor;
+	private final transient StreamExecutionEnvironment env;
+	private final SamoaDelegateFunction fun;
+	private transient DataStream<SamoaType> inStream;
+	private transient DataStream<SamoaType> outStream;
+	private transient List<FlinkStream> outputStreams = Lists.newArrayList();
+	private transient List<Tuple3<FlinkStream, PartitioningScheme, Integer>> inputStreams = Lists.newArrayList();
+	private int parallelism;
+	private static int numberOfPIs = 0;
+	private int piID;
+	private List<Integer> circleId; //check if we can refactor this
+	private boolean onIteration;
+	//private int circleId; //check if we can refactor this
+
+	public FlinkProcessingItem(StreamExecutionEnvironment env, Processor proc) {
+		this(env, proc, 1);
+	}
+
+	public FlinkProcessingItem(StreamExecutionEnvironment env, Processor proc, int parallelism) {
+		this(env, new SamoaDelegateFunction(proc), proc, parallelism);
+	}
+
+	public FlinkProcessingItem(StreamExecutionEnvironment env, SamoaDelegateFunction fun, Processor proc, int parallelism) {
+		super(fun);
+		this.env = env;
+		this.fun = fun;
+		this.processor = proc;
+		this.parallelism = parallelism;
+		this.piID = numberOfPIs++;
+		this.circleId = new ArrayList<Integer>() {
+		}; // if size equals 0, then it is part of no circle
+	}
+
+	public Stream createStream() {
+		FlinkStream generatedStream = new FlinkStream(this);
+		outputStreams.add(generatedStream);
+		return generatedStream;
+	}
+
+	public void putToStream(ContentEvent data, Stream targetStream) {
+		collector.collect(SamoaType.of(data, targetStream.getStreamId()));
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.processor.onCreate(getComponentId());
+	}
+
+	@Override
+	public void initialise() {
+		for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : inputStreams) {
+			if (inputStream.f0.isInitialised()) { //if input stream is initialised
+				try {
+					DataStream toBeMerged = Utils.subscribe(inputStream.f0.getOutStream(), inputStream.f1);
+					if (inStream == null) {
+						inStream = toBeMerged;
+					} else {
+						inStream = inStream.merge(toBeMerged);
+					}
+				} catch (RuntimeException e) {
+					e.printStackTrace();
+					System.exit(1);
+				}
+			}
+		}
+
+		if (onIteration) {
+			inStream = inStream.iterate(MAX_WAIT_TIME_MILLIS);
+		}
+		outStream = inStream.transform("samoaProcessor", Utils.tempTypeInfo, this).setParallelism(parallelism);
+	}
+
+	public void initialiseStreams() {
+		for (FlinkStream stream : this.getOutputStreams()) {
+			stream.initialise();
+		}
+	}
+
+	@Override
+	public boolean canBeInitialised() {
+		for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : inputStreams) {
+			if (!inputStream.f0.isInitialised()) return false;
+		}
+		return true;
+	}
+
+	@Override
+	public boolean isInitialised() {
+		return outStream != null;
+	}
+
+	@Override
+	public Processor getProcessor() {
+		return processor;
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		while (readNext() != null) {
+			SamoaType t = nextObject;
+			fun.processEvent(t.f1);
+		}
+	}
+
+	@Override
+	public ProcessingItem connectInputShuffleStream(Stream inputStream) {
+		inputStreams.add(new Tuple3<>((FlinkStream) inputStream, PartitioningScheme.SHUFFLE, ((FlinkStream) inputStream).getSourcePiId()));
+		return this;
+	}
+
+	@Override
+	public ProcessingItem connectInputKeyStream(Stream inputStream) {
+		inputStreams.add(new Tuple3<>((FlinkStream) inputStream, PartitioningScheme.GROUP_BY_KEY, ((FlinkStream) inputStream).getSourcePiId()));
+		return this;
+	}
+
+	@Override
+	public ProcessingItem connectInputAllStream(Stream inputStream) {
+		inputStreams.add(new Tuple3<>((FlinkStream) inputStream, PartitioningScheme.BROADCAST, ((FlinkStream) inputStream).getSourcePiId()));
+		return this;
+	}
+
+	@Override
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public void setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+	}
+
+	public List<FlinkStream> getOutputStreams() {
+		return outputStreams;
+	}
+
+	public DataStream<SamoaType> getOutStream() {
+		return this.outStream;
+	}
+
+	public void setOutStream(DataStream outStream) {
+		this.outStream = outStream;
+	}
+
+	@Override
+	public int getComponentId() {
+		return piID;
+	}
+
+	public boolean isPartOfCircle() {
+		return this.circleId.size() > 0;
+	}
+
+	public List<Integer> getCircleIds() {
+		return circleId;
+	}
+
+	public void addPItoLoop(int piId) {
+		this.circleId.add(piId);
+	}
+
+	public DataStream<SamoaType> getInStream() {
+		return inStream;
+	}
+
+	public List<Tuple3<FlinkStream, PartitioningScheme, Integer>> getInputStreams() {
+		return inputStreams;
+	}
+
+	public void setOnIteration(boolean onIteration) {
+		this.onIteration = onIteration;
+	}
+
+	public boolean isOnIteration() {
+		return onIteration;
+	}
+
+	static class SamoaDelegateFunction implements Function, Serializable {
+		private final Processor proc;
+
+		SamoaDelegateFunction(Processor proc) {
+			this.proc = proc;
+		}
+
+		public void processEvent(ContentEvent event) {
+			proc.process(event);
+		}
+	}
+
+	public FlinkStream getInputStreamBySourceID(int sourceID) {
+		for (Tuple3<FlinkStream, PartitioningScheme, Integer> fstreams : inputStreams) {
+			if (fstreams.f2 == sourceID) {
+				return fstreams.f0;
+			}
+		}
+		return null;
+	}
+
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java
new file mode 100644
index 0000000..c5cb0ed
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java
@@ -0,0 +1,94 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.topology.AbstractStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import java.io.Serializable;
+
+
+/**
+ * A stream for SAMOA based on Apache Flink's DataStream
+ */
+public class FlinkStream extends AbstractStream implements FlinkComponent, Serializable {
+
+	private static int outputCounter = 0;
+	private FlinkComponent procItem;
+	private transient DataStream<SamoaType> dataStream;
+	private int sourcePiId;
+	private String flinkStreamId;
+
+	public FlinkStream(FlinkComponent sourcePi) {
+		this.procItem = sourcePi;
+		this.sourcePiId = sourcePi.getComponentId();
+		setStreamId("stream-" + Integer.toString(outputCounter));
+		flinkStreamId = "stream-" + Integer.toString(outputCounter);
+		outputCounter++;
+	}
+
+	@Override
+	public void initialise() {
+		if (procItem instanceof FlinkProcessingItem) {
+			dataStream = procItem.getOutStream().filter(Utils.getFilter(getStreamId()))
+			.setParallelism(((FlinkProcessingItem) procItem).getParallelism());
+		} else
+			dataStream = procItem.getOutStream();
+	}
+
+	@Override
+	public boolean canBeInitialised() {
+		return procItem.isInitialised();
+	}
+
+	@Override
+	public boolean isInitialised() {
+		return dataStream != null;
+	}
+
+	@Override
+	public DataStream getOutStream() {
+		return dataStream;
+	}
+
+	@Override
+	public void put(ContentEvent event) {
+		((FlinkProcessingItem) procItem).putToStream(event, this);
+	}
+
+	@Override
+	public int getComponentId() {
+		return -1; //dummy number shows that it comes from a Stream
+	}
+
+	public int getSourcePiId() {
+		return sourcePiId;
+	}
+
+	@Override
+	public String getStreamId() {
+		return flinkStreamId;
+	}
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java
new file mode 100644
index 0000000..f04d792
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java
@@ -0,0 +1,185 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.CircleDetection;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.samoa.topology.AbstractTopology;
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A SAMOA topology on Apache Flink
+ * 
+ * A Samoa-Flink Streaming Topology is DAG of ProcessingItems encapsulated within custom operators.
+ * Streams are tagged and filtered in each operator's output so they can be routed to the right 
+ * operator respectively. Building a Flink topology from a Samoa task involves invoking all these
+ * stream transformations and finally, marking and initiating loops in the graph. We have to do that
+ * since Flink only allows explicit loops in the topology started with 'iterate()' and closed with
+ * 'closeWith()'. Thus, when we build a flink topology we have to do it incrementally from the 
+ * sources, mark loops and initialize them with explicit iterations.
+ * 
+ */
+public class FlinkTopology extends AbstractTopology {
+
+	private static final Logger logger = LoggerFactory.getLogger(FlinkTopology.class);
+	public static StreamExecutionEnvironment env;
+	public List<List<FlinkProcessingItem>> topologyLoops = new ArrayList<>();
+	public  List<Integer> backEdges = new ArrayList<Integer>();
+
+	public FlinkTopology(String name, StreamExecutionEnvironment env) {
+		super(name);
+		this.env = env;
+	}
+
+	public StreamExecutionEnvironment getEnvironment() {
+		return env;
+	}
+	
+	public void build() {
+		markCircles();
+		for (EntranceProcessingItem src : getEntranceProcessingItems()) {
+			((FlinkEntranceProcessingItem) src).initialise();
+		}
+		initComponents(ImmutableList.copyOf(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class)));
+	}
+
+	private void initComponents(ImmutableList<FlinkProcessingItem> flinkComponents) {
+		if (flinkComponents.isEmpty()) return;
+
+		for (FlinkProcessingItem comp : flinkComponents) {
+			if (comp.canBeInitialised() && !comp.isInitialised() && !comp.isPartOfCircle()) {
+				comp.initialise();
+				comp.initialiseStreams();
+
+			}//if component is part of one or more circle
+			else if (comp.isPartOfCircle() && !comp.isInitialised()) {
+				for (Integer circle : comp.getCircleIds()) {
+					//check if circle can be initialized
+					if (checkCircleReady(circle)) {
+						logger.debug("Circle: " + circle + " can be initialised");
+						initialiseCircle(circle);
+					} else {
+						logger.debug("Circle cannot be initialised");
+					}
+				}
+			}
+
+		}
+		initComponents(ImmutableList.copyOf(Iterables.filter(flinkComponents, new Predicate<FlinkProcessingItem>() {
+			@Override
+			public boolean apply(FlinkProcessingItem flinkComponent) {
+				return !flinkComponent.isInitialised();
+			}
+		})));
+	}
+
+	private void markCircles(){
+		List<FlinkProcessingItem> pis = Lists.newArrayList(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class));
+		List<Integer>[] graph = new List[pis.size()];
+		FlinkProcessingItem[] processingItems = new FlinkProcessingItem[pis.size()];
+
+
+		for (int i=0;i<pis.size();i++) {
+			graph[i] = new ArrayList<Integer>();
+		}
+		//construct the graph of the topology for the Processing Items (No entrance pi is included)
+		for (FlinkProcessingItem pi: pis) {
+			processingItems[pi.getComponentId()] = pi;
+			for (Tuple3<FlinkStream, PartitioningScheme, Integer> is : pi.getInputStreams()) {
+				if (is.f2 != -1) graph[is.f2].add(pi.getComponentId());
+			}
+		}
+		for (int g=0;g<graph.length;g++)
+			logger.debug(graph[g].toString());
+
+		CircleDetection detCircles = new CircleDetection();
+		List<List<Integer>> circles = detCircles.getCircles(graph);
+
+		//update PIs, regarding being part of a circle.
+		for (List<Integer> c : circles){
+			List<FlinkProcessingItem> circle = new ArrayList<>();
+			for (Integer it : c){
+				circle.add(processingItems[it]);
+				processingItems[it].addPItoLoop(topologyLoops.size());
+			}
+			topologyLoops.add(circle);
+			backEdges.add(circle.get(0).getComponentId());
+		}
+		logger.debug("Circles detected in the topology: " + circles);
+	}
+	
+
+	private boolean checkCircleReady(int circleId) {
+
+		List<Integer> circleIds = new ArrayList<>();
+
+		for (FlinkProcessingItem pi : topologyLoops.get(circleId)) {
+			circleIds.add(pi.getComponentId());
+		}
+		//check that all incoming to the circle streams are initialised
+		for (FlinkProcessingItem procItem : topologyLoops.get(circleId)) {
+			for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : procItem.getInputStreams()) {
+				//if a inputStream is not initialized AND source of inputStream is not in the circle or a tail of other circle
+				if ((!inputStream.f0.isInitialised()) && (!circleIds.contains(inputStream.f2)) && (!backEdges.contains(inputStream.f2)))
+					return false;
+			}
+		}
+		return true;
+	}
+
+	private void initialiseCircle(int circleId) {
+		//get the head and tail of circle
+		FlinkProcessingItem tail = topologyLoops.get(circleId).get(0);
+		FlinkProcessingItem head = topologyLoops.get(circleId).get(topologyLoops.get(circleId).size() - 1);
+
+		//initialise source stream of the iteration, so as to use it for the iteration starting point
+		if (!head.isInitialised()) {
+			head.setOnIteration(true);
+			head.initialise();
+			head.initialiseStreams();
+		}
+
+		//initialise all nodes after head
+		for (int node = topologyLoops.get(circleId).size() - 2; node >= 0; node--) {
+			topologyLoops.get(circleId).get(node).initialise();
+			topologyLoops.get(circleId).get(node).initialiseStreams();
+		}
+
+		((IterativeDataStream) head.getInStream()).closeWith(head.getInputStreamBySourceID(tail.getComponentId()).getOutStream());
+	}
+
+
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java
new file mode 100644
index 0000000..16d050a
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java
@@ -0,0 +1,42 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+public class SamoaType extends Tuple3<String, ContentEvent, String> {
+	public SamoaType() {
+		super();
+	}
+
+	private SamoaType(String key, ContentEvent event, String streamId) {
+		super(key, event, streamId);
+	}
+
+	public static SamoaType of(ContentEvent event, String streamId) {
+		String key = event.getKey() == null ? "none" : event.getKey();
+		return new SamoaType(key, event, streamId);
+	}
+}
\ No newline at end of file