SAMOA-16: Add an adapter for Apache Flink-Streaming (senorcarbone)
Fix #11
diff --git a/.gitignore b/.gitignore
index 3cf0208..294c718 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,4 +11,5 @@
#intellij
.idea/
-.iml
+*.iml
+*.iws
diff --git a/bin/samoa b/bin/samoa
index b34f65b..0ace74b 100755
--- a/bin/samoa
+++ b/bin/samoa
@@ -4,7 +4,7 @@
# #%L
# SAMOA
# %%
-# Copyright (C) 2013 Yahoo! Inc.
+# Copyright (C) 2014 - 2015 Apache Software Foundation
# %%
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -281,6 +281,66 @@
--kryo_register=$BASE_DIR/$KRYO_REGISTER_FILE --pi_per_container=$PI_PER_CONTAINER \
--samoa_hdfs_dir=$HDFS_SAMOA_HOME
+elif [ $PLATFORM = 'FLINK' ]; then
+
+ echo "Deploying to $PLATFORM"
+ if [ -z $FLINK_HOME ];then
+ echo "FLINK_HOME is not set!"
+ echo "Please set FLINK_HOME to point to your Flink installation"
+ exit -1
+ fi
+
+ if [ ! -f $2 ];then
+ echo "$2 does not exist!"
+ echo "Please use a valid jar file for Flink execution"
+ exit -1
+ fi
+
+ FLINK_EXEC="$FLINK_HOME/bin/flink"
+
+ SAMOA_FLINK_PROPERTIES="samoa-flink.properties"
+ MODE_OPTION="samoa.flink.mode"
+# NUM_WORKER_OPTION="samoa.flink.numWorker"
+
+ VALUE=""
+ getvalue()
+ {
+ VALUE=`sed '/^\#/d' $BASE_DIR/$SAMOA_FLINK_PROPERTIES | grep "$1" | tail -n 1 | cut -d "=" -f2- | sed 's/^[[:space:]]*//;s/[[:space:]]*$//'`
+ }
+
+# getvalue "$NUM_WORKER_OPTION"
+# NUM_WORKER="$VALUE"
+
+ getvalue "$MODE_OPTION"
+ MODE_ARG="$VALUE"
+
+ COMPLETE_ARG=""
+ COUNTER=0
+ for var in "$@"
+ do
+ COUNTER=`expr $COUNTER + 1`
+ if [ $COUNTER -gt 2 ];then
+ COMPLETE_ARG="$COMPLETE_ARG $var"
+ fi
+ done
+
+ DEPLOYABLE=$JAR_PATH
+ echo "$DEPLOYABLE"
+ if [ "$MODE_ARG" = "cluster" ]; then
+ FLINK_MASTER_OPTION="samoa.flink.flinkMaster"
+ PORT_OPTION="samoa.flink.port"
+
+ getvalue "$FLINK_MASTER_OPTION"
+ FLINK_MASTER_OPTION="$VALUE"
+
+ getvalue "$PORT_OPTION"
+ PORT_OPTION="$VALUE"
+ $FLINK_EXEC run -m $FLINK_MASTER_OPTION:$PORT_OPTION $DEPLOYABLE $COMPLETE_ARG
+
+ elif [ "$MODE_ARG" = "local" ]; then
+ $FLINK_EXEC run $DEPLOYABLE $COMPLETE_ARG
+ fi
+
elif [ $PLATFORM = 'THREADS' ]; then
echo "Deploying to LOCAL with MULTITHREADING."
diff --git a/bin/samoa-flink.properties b/bin/samoa-flink.properties
new file mode 100644
index 0000000..b9f56c0
--- /dev/null
+++ b/bin/samoa-flink.properties
@@ -0,0 +1,35 @@
+###
+# #%L
+# SAMOA
+# %%
+# Copyright (C) 2014 - 2015 Apache Software Foundation
+# %%
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# #L%
+###
+
+# SAMOA Flink properties file
+# This file contains specific configurations for SAMOA deployment in Flink platform
+
+# samoa.flink.mode corresponds to the execution mode of a Task in Flink
+# possible values:
+# 1. local - to run the task in local StreamingEnvironment
+# 2. cluster - to run the task in the specified cluster
+samoa.flink.mode=cluster
+
+#in case samoa.flink.mode equals "cluster", then the user has to set up also the following parameters:
+# @samoa.flink.flinkMaster: the IP address of the cluster
+# @samoa.flink.port : the port
+samoa.flink.flinkMaster=127.0.0.1
+samoa.flink.port=6123
+
diff --git a/pom.xml b/pom.xml
index 622d952..819a13c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,15 @@
</modules>
</profile>
<profile>
+ <id>flink</id>
+ <modules>
+ <module>samoa-instances</module>
+ <module>samoa-api</module>
+ <module>samoa-flink</module>
+ <module>samoa-test</module>
+ </modules>
+ </profile>
+ <profile>
<id>samza</id>
<modules>
<module>samoa-instances</module>
@@ -104,6 +113,7 @@
<module>samoa-local</module>
<module>samoa-threads</module>
<module>samoa-storm</module>
+ <module>samoa-flink</module>
<module>samoa-s4</module>
<module>samoa-samza</module>
<module>samoa-test</module>
@@ -130,6 +140,7 @@
<miniball.version>1.0.3</miniball.version>
<s4.version>0.6.0-incubating</s4.version>
<samza.version>0.7.0</samza.version>
+ <flink.version>0.9.0-milestone-1</flink.version>
<slf4j-log4j12.version>1.7.2</slf4j-log4j12.version>
<slf4j-simple.version>1.7.5</slf4j-simple.version>
<maven-surefire-plugin.version>2.18</maven-surefire-plugin.version>
@@ -211,6 +222,7 @@
<root>samoa-local</root>
<root>samoa-storm</root>
<root>samoa-s4</root>
+ <root>samoa-flink</root>
<root>samoa-samza</root>
<root>bin</root>
</roots>
diff --git a/samoa-flink/pom.xml b/samoa-flink/pom.xml
new file mode 100644
index 0000000..f00fe3c
--- /dev/null
+++ b/samoa-flink/pom.xml
@@ -0,0 +1,134 @@
+<!--
+ #%L
+ SAMOA
+ %%
+ Copyright (C) 2013 Yahoo! Inc.
+ %%
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ #L%
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <kryo.version>2.24.0</kryo.version>
+ </properties>
+ <repositories>
+ <repository>
+ <id>apache.snapshots</id>
+ <name>Apache Development Snapshot Repository</name>
+ <url>https://repository.apache.org/content/repositories/snapshots/</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+ <name>samoa-flink</name>
+ <description>Flink engine for SAMOA</description>
+
+ <artifactId>samoa-flink</artifactId>
+ <parent>
+ <groupId>com.yahoo.labs.samoa</groupId>
+ <artifactId>samoa</artifactId>
+ <version>0.3.0-SNAPSHOT</version>
+ </parent>
+
+
+
+ <dependencies>
+ <dependency>
+ <groupId>com.yahoo.labs.samoa</groupId>
+ <artifactId>samoa-api</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j-log4j12.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-core</artifactId>
+ <version>${flink.version}</version>
+ <!--<scope>provided</scope>-->
+ </dependency>
+ <dependency>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ <version>${kryo.version}</version>
+ </dependency>
+ </dependencies>
+
+
+ <build>
+ <plugins>
+ <!-- Flink assembly -->
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>${maven-assembly-plugin.version}</version>
+ <configuration>
+ <finalName>SAMOA-Flink-${project.version}</finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ <attach>false</attach>
+ <outputDirectory>../target</outputDirectory>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <archive>
+ <manifestEntries>
+ <!--<Bundle-Version>${parsedVersion.osgiVersion}</Bundle-Version>-->
+ <Bundle-Description>${project.description}</Bundle-Description>
+ <Implementation-Version>${project.version}</Implementation-Version>
+ <Implementation-Vendor>Yahoo Labs</Implementation-Vendor>
+ <Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id>
+ </manifestEntries>
+ <manifest>
+ <addClasspath>true</addClasspath>
+ <mainClass>com.yahoo.labs.flink.FlinkDoTask</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id> <!-- this is used for inheritance merges -->
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${maven-surefire-plugin.version}</version>
+ <configuration>
+ <argLine>-Xmx1G</argLine>
+ <redirectTestOutputToFile>false</redirectTestOutputToFile>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java
new file mode 100644
index 0000000..6069de9
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/FlinkDoTask.java
@@ -0,0 +1,87 @@
+package com.yahoo.labs.flink;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.github.javacliparser.ClassOption;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.CircleDetection;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.flink.topology.impl.FlinkComponentFactory;
+import com.yahoo.labs.flink.topology.impl.FlinkProcessingItem;
+import com.yahoo.labs.flink.topology.impl.FlinkStream;
+import com.yahoo.labs.flink.topology.impl.FlinkTopology;
+import com.yahoo.labs.samoa.tasks.Task;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+
+/**
+ * Main class to run a SAMOA on Apache Flink
+ */
+public class FlinkDoTask {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlinkDoTask.class);
+
+
+ public static void main(String[] args) throws Exception {
+ List<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+
+ args = tmpArgs.toArray(new String[0]);
+
+ // Init Task
+ StringBuilder cliString = new StringBuilder();
+ for (int i = 0; i < args.length; i++) {
+ cliString.append(" ").append(args[i]);
+ }
+ logger.debug("Command line string = {}", cliString.toString());
+ System.out.println("Command line string = " + cliString.toString());
+
+ Task task;
+ try {
+ task = ClassOption.cliStringToObject(cliString.toString(), Task.class, null);
+ logger.debug("Successfully instantiating {}", task.getClass().getCanonicalName());
+ } catch (Exception e) {
+ logger.error("Failed to initialize the task: ", e);
+ System.out.println("Failed to initialize the task: " + e);
+ return;
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ task.setFactory(new FlinkComponentFactory(env));
+ task.init();
+
+ logger.debug("Building Flink topology...");
+ ((FlinkTopology) task.getTopology()).build();
+
+ logger.debug("Submitting the job...");
+ env.execute();
+
+ }
+
+
+
+
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java
new file mode 100644
index 0000000..a832ee9
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/CircleDetection.java
@@ -0,0 +1,99 @@
+package com.yahoo.labs.flink.com.yahoo.labs.flink.helpers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+/**
+ * This class contains all logic needed in order to mark circles in job graphs explicitly such as
+ * in the case of Apache Flink. A circle is defined as a list of node ids ordered in topological
+ * (DFS) order.
+ *
+ */
+public class CircleDetection {
+ private int[] index;
+ private int[] lowLink;
+ private int counter;
+ private Stack<Integer> stack;
+ private List<List<Integer>> scc;
+ List<Integer>[] graph;
+
+
+ public CircleDetection() {
+ stack = new Stack<Integer>();
+ scc = new ArrayList<>();
+ }
+
+ public List<List<Integer>> getCircles(List<Integer>[] adjacencyList) {
+ graph = adjacencyList;
+ index = new int[adjacencyList.length];
+ lowLink = new int[adjacencyList.length];
+ counter = 0;
+
+ //initialize index and lowLink as "undefined"(=-1)
+ for (int j = 0; j < graph.length; j++) {
+ index[j] = -1;
+ lowLink[j] = -1;
+ }
+ for (int v = 0; v < graph.length; v++) {
+ if (index[v] == -1) { //undefined.
+ findSCC(v);
+ }
+ }
+ return scc;
+ }
+
+ private void findSCC(int node) {
+ index[node] = counter;
+ lowLink[node] = counter;
+ counter++;
+ stack.push(node);
+
+ for (int neighbor : graph[node]) {
+ if (index[neighbor] == -1) {
+ findSCC(neighbor);
+ lowLink[node] = Math.min(lowLink[node], lowLink[neighbor]);
+ } else if (stack.contains(neighbor)) { //if neighbor has been already visited
+ lowLink[node] = Math.min(lowLink[node], index[neighbor]);
+ List<Integer> sccComponent = new ArrayList<Integer>();
+ int w;
+ do {
+ w = stack.pop();
+ sccComponent.add(w);
+ } while (neighbor != w);
+ //add neighbor again, just in case it is a member of another circle
+ stack.add(neighbor);
+ scc.add(sccComponent);
+ }
+
+ }
+ if (lowLink[node] == index[node]) {
+ int w;
+ do {
+ w = stack.pop();
+ } while (node != w);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java
new file mode 100644
index 0000000..fe1b960
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/com/yahoo/labs/flink/helpers/Utils.java
@@ -0,0 +1,69 @@
+package com.yahoo.labs.flink.com.yahoo.labs.flink.helpers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import com.yahoo.labs.flink.topology.impl.SamoaType;
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import java.util.List;
+
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+
+public class Utils {
+
+ public static TypeInformation<SamoaType> tempTypeInfo = new TupleTypeInfo(SamoaType.class, STRING_TYPE_INFO, TypeExtractor.getForClass(ContentEvent.class), STRING_TYPE_INFO);
+
+ public static DataStream subscribe(DataStream<SamoaType> stream, PartitioningScheme partitioning) {
+ switch (partitioning) {
+ case BROADCAST:
+ return stream.broadcast();
+ case GROUP_BY_KEY:
+ return stream.groupBy(new KeySelector<SamoaType, String>() {
+ @Override
+ public String getKey(SamoaType samoaType) throws Exception {
+ return samoaType.f0;
+ }
+ });
+ case SHUFFLE:
+ default:
+ return stream.shuffle();
+ }
+ }
+
+ public static FilterFunction<SamoaType> getFilter(final String streamID) {
+ return new FilterFunction<SamoaType>() {
+ @Override
+ public boolean filter(SamoaType o) throws Exception {
+ return o.f2.equals(streamID);
+ }
+ };
+ }
+
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java
new file mode 100644
index 0000000..70a7838
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponent.java
@@ -0,0 +1,68 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * Common interface of FlinkEntranceProcessingItem and FlinkProcessingItem
+ */
+public interface FlinkComponent {
+
+ /**
+ * An initiation of the node. It should create the right invokables and apply the appropriate
+ * stream transformations
+ */
+ public void initialise();
+
+ /**
+ * This check is needed in order to determine whether all requirements for a Flink Component
+ * (DataStream) are satisfied in order to initialise it. This is necessary in this integration
+ * since Flink Streaming applies eager datastream generation based on transformations.
+ *
+ * @return
+ */
+ public boolean canBeInitialised();
+
+ /**
+ *
+ * @return
+ */
+ public boolean isInitialised();
+
+ /**
+ * The wrapped Flink DataStream generated by this Flink component. Mind that the component
+ * should first be initialised in order to have a generated DataStream
+ *
+ * @return
+ */
+ public DataStream<SamoaType> getOutStream();
+
+ /**
+ * A unique component id
+ *
+ * @return
+ */
+ public int getComponentId();
+
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java
new file mode 100644
index 0000000..fca0c1a
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkComponentFactory.java
@@ -0,0 +1,66 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.*;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * An implementation of SAMOA's ComponentFactory for Apache Flink
+ */
+public class FlinkComponentFactory implements ComponentFactory {
+
+ private StreamExecutionEnvironment env;
+
+ public FlinkComponentFactory(StreamExecutionEnvironment env) {
+ this.env = env;
+ }
+
+ @Override
+ public ProcessingItem createPi(Processor processor) {
+ return new FlinkProcessingItem(env, processor);
+ }
+
+ @Override
+ public ProcessingItem createPi(Processor processor, int parallelism) {
+ return new FlinkProcessingItem(env, processor, parallelism);
+ }
+
+ @Override
+ public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) {
+ return new FlinkEntranceProcessingItem(env, entranceProcessor);
+ }
+
+ @Override
+ public Stream createStream(IProcessingItem sourcePi) {
+ if (sourcePi instanceof FlinkProcessingItem)
+ return ((FlinkProcessingItem) sourcePi).createStream();
+ else return new FlinkStream((FlinkComponent) sourcePi);
+ }
+
+ @Override
+ public Topology createTopology(String topologyName) {
+ return new FlinkTopology(topologyName, env);
+ }
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java
new file mode 100644
index 0000000..5dca509
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkEntranceProcessingItem.java
@@ -0,0 +1,101 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.topology.AbstractEntranceProcessingItem;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+public class FlinkEntranceProcessingItem extends AbstractEntranceProcessingItem
+ implements FlinkComponent, Serializable {
+
+ private transient StreamExecutionEnvironment env;
+ private transient DataStream outStream;
+
+
+ public FlinkEntranceProcessingItem(StreamExecutionEnvironment env, EntranceProcessor proc) {
+ super(proc);
+ this.env = env;
+ }
+
+ @Override
+ public void initialise() {
+ final EntranceProcessor proc = getProcessor();
+ final String streamId = getOutputStream().getStreamId();
+ final int compID = getComponentId();
+
+
+ outStream = env.addSource(new RichSourceFunction<SamoaType>() {
+ volatile boolean canceled;
+ EntranceProcessor entrProc = proc;
+ String id = streamId;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ entrProc.onCreate(compID);
+ }
+
+ @Override
+ public void run(Collector<SamoaType> collector) throws Exception {
+ while (!canceled && entrProc.hasNext()) {
+ collector.collect(SamoaType.of(entrProc.nextEvent(), id));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ canceled = true;
+ }
+ },Utils.tempTypeInfo);
+
+ ((FlinkStream) getOutputStream()).initialise();
+ }
+
+
+ @Override
+ public boolean canBeInitialised() {
+ return true;
+ }
+
+ @Override
+ public boolean isInitialised() {
+ return outStream != null;
+ }
+
+ @Override
+ public int getComponentId() {
+ return -1; // dummy number shows that it comes from an Entrance PI
+ }
+
+ @Override
+ public DataStream getOutStream() {
+ return outStream;
+ }
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java
new file mode 100644
index 0000000..f92182e
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkProcessingItem.java
@@ -0,0 +1,248 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+import com.google.common.collect.Lists;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class FlinkProcessingItem extends StreamInvokable<SamoaType, SamoaType> implements ProcessingItem, FlinkComponent, Serializable {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlinkProcessingItem.class);
+ public static final int MAX_WAIT_TIME_MILLIS = 10000;
+
+ private final Processor processor;
+ private final transient StreamExecutionEnvironment env;
+ private final SamoaDelegateFunction fun;
+ private transient DataStream<SamoaType> inStream;
+ private transient DataStream<SamoaType> outStream;
+ private transient List<FlinkStream> outputStreams = Lists.newArrayList();
+ private transient List<Tuple3<FlinkStream, PartitioningScheme, Integer>> inputStreams = Lists.newArrayList();
+ private int parallelism;
+ private static int numberOfPIs = 0;
+ private int piID;
+ private List<Integer> circleId; //check if we can refactor this
+ private boolean onIteration;
+ //private int circleId; //check if we can refactor this
+
+ public FlinkProcessingItem(StreamExecutionEnvironment env, Processor proc) {
+ this(env, proc, 1);
+ }
+
+ public FlinkProcessingItem(StreamExecutionEnvironment env, Processor proc, int parallelism) {
+ this(env, new SamoaDelegateFunction(proc), proc, parallelism);
+ }
+
+ public FlinkProcessingItem(StreamExecutionEnvironment env, SamoaDelegateFunction fun, Processor proc, int parallelism) {
+ super(fun);
+ this.env = env;
+ this.fun = fun;
+ this.processor = proc;
+ this.parallelism = parallelism;
+ this.piID = numberOfPIs++;
+ this.circleId = new ArrayList<Integer>() {
+ }; // if size equals 0, then it is part of no circle
+ }
+
+ public Stream createStream() {
+ FlinkStream generatedStream = new FlinkStream(this);
+ outputStreams.add(generatedStream);
+ return generatedStream;
+ }
+
+ public void putToStream(ContentEvent data, Stream targetStream) {
+ collector.collect(SamoaType.of(data, targetStream.getStreamId()));
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ this.processor.onCreate(getComponentId());
+ }
+
+ @Override
+ public void initialise() {
+ for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : inputStreams) {
+ if (inputStream.f0.isInitialised()) { //if input stream is initialised
+ try {
+ DataStream toBeMerged = Utils.subscribe(inputStream.f0.getOutStream(), inputStream.f1);
+ if (inStream == null) {
+ inStream = toBeMerged;
+ } else {
+ inStream = inStream.merge(toBeMerged);
+ }
+ } catch (RuntimeException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+ }
+
+ if (onIteration) {
+ inStream = inStream.iterate(MAX_WAIT_TIME_MILLIS);
+ }
+ outStream = inStream.transform("samoaProcessor", Utils.tempTypeInfo, this).setParallelism(parallelism);
+ }
+
+ public void initialiseStreams() {
+ for (FlinkStream stream : this.getOutputStreams()) {
+ stream.initialise();
+ }
+ }
+
+ @Override
+ public boolean canBeInitialised() {
+ for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : inputStreams) {
+ if (!inputStream.f0.isInitialised()) return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean isInitialised() {
+ return outStream != null;
+ }
+
+ @Override
+ public Processor getProcessor() {
+ return processor;
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ while (readNext() != null) {
+ SamoaType t = nextObject;
+ fun.processEvent(t.f1);
+ }
+ }
+
+ @Override
+ public ProcessingItem connectInputShuffleStream(Stream inputStream) {
+ inputStreams.add(new Tuple3<>((FlinkStream) inputStream, PartitioningScheme.SHUFFLE, ((FlinkStream) inputStream).getSourcePiId()));
+ return this;
+ }
+
+ @Override
+ public ProcessingItem connectInputKeyStream(Stream inputStream) {
+ inputStreams.add(new Tuple3<>((FlinkStream) inputStream, PartitioningScheme.GROUP_BY_KEY, ((FlinkStream) inputStream).getSourcePiId()));
+ return this;
+ }
+
+ @Override
+ public ProcessingItem connectInputAllStream(Stream inputStream) {
+ inputStreams.add(new Tuple3<>((FlinkStream) inputStream, PartitioningScheme.BROADCAST, ((FlinkStream) inputStream).getSourcePiId()));
+ return this;
+ }
+
+ @Override
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ public void setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ }
+
+ public List<FlinkStream> getOutputStreams() {
+ return outputStreams;
+ }
+
+ public DataStream<SamoaType> getOutStream() {
+ return this.outStream;
+ }
+
+ public void setOutStream(DataStream outStream) {
+ this.outStream = outStream;
+ }
+
+ @Override
+ public int getComponentId() {
+ return piID;
+ }
+
+ public boolean isPartOfCircle() {
+ return this.circleId.size() > 0;
+ }
+
+ public List<Integer> getCircleIds() {
+ return circleId;
+ }
+
+ public void addPItoLoop(int piId) {
+ this.circleId.add(piId);
+ }
+
+ public DataStream<SamoaType> getInStream() {
+ return inStream;
+ }
+
+ public List<Tuple3<FlinkStream, PartitioningScheme, Integer>> getInputStreams() {
+ return inputStreams;
+ }
+
+ public void setOnIteration(boolean onIteration) {
+ this.onIteration = onIteration;
+ }
+
+ public boolean isOnIteration() {
+ return onIteration;
+ }
+
+ static class SamoaDelegateFunction implements Function, Serializable {
+ private final Processor proc;
+
+ SamoaDelegateFunction(Processor proc) {
+ this.proc = proc;
+ }
+
+ public void processEvent(ContentEvent event) {
+ proc.process(event);
+ }
+ }
+
+ public FlinkStream getInputStreamBySourceID(int sourceID) {
+ for (Tuple3<FlinkStream, PartitioningScheme, Integer> fstreams : inputStreams) {
+ if (fstreams.f2 == sourceID) {
+ return fstreams.f0;
+ }
+ }
+ return null;
+ }
+
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java
new file mode 100644
index 0000000..c5cb0ed
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkStream.java
@@ -0,0 +1,94 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.topology.AbstractStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import java.io.Serializable;
+
+
+/**
+ * A stream for SAMOA based on Apache Flink's DataStream
+ */
+public class FlinkStream extends AbstractStream implements FlinkComponent, Serializable {
+
+ private static int outputCounter = 0;
+ private FlinkComponent procItem;
+ private transient DataStream<SamoaType> dataStream;
+ private int sourcePiId;
+ private String flinkStreamId;
+
+ public FlinkStream(FlinkComponent sourcePi) {
+ this.procItem = sourcePi;
+ this.sourcePiId = sourcePi.getComponentId();
+ setStreamId("stream-" + Integer.toString(outputCounter));
+ flinkStreamId = "stream-" + Integer.toString(outputCounter);
+ outputCounter++;
+ }
+
+ @Override
+ public void initialise() {
+ if (procItem instanceof FlinkProcessingItem) {
+ dataStream = procItem.getOutStream().filter(Utils.getFilter(getStreamId()))
+ .setParallelism(((FlinkProcessingItem) procItem).getParallelism());
+ } else
+ dataStream = procItem.getOutStream();
+ }
+
+ @Override
+ public boolean canBeInitialised() {
+ return procItem.isInitialised();
+ }
+
+ @Override
+ public boolean isInitialised() {
+ return dataStream != null;
+ }
+
+ @Override
+ public DataStream getOutStream() {
+ return dataStream;
+ }
+
+ @Override
+ public void put(ContentEvent event) {
+ ((FlinkProcessingItem) procItem).putToStream(event, this);
+ }
+
+ @Override
+ public int getComponentId() {
+ return -1; //dummy number shows that it comes from a Stream
+ }
+
+ public int getSourcePiId() {
+ return sourcePiId;
+ }
+
+ @Override
+ public String getStreamId() {
+ return flinkStreamId;
+ }
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java
new file mode 100644
index 0000000..f04d792
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/FlinkTopology.java
@@ -0,0 +1,185 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.CircleDetection;
+import com.yahoo.labs.flink.com.yahoo.labs.flink.helpers.Utils;
+import com.yahoo.labs.samoa.topology.AbstractTopology;
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A SAMOA topology on Apache Flink
+ *
+ * A Samoa-Flink Streaming Topology is DAG of ProcessingItems encapsulated within custom operators.
+ * Streams are tagged and filtered in each operator's output so they can be routed to the right
+ * operator respectively. Building a Flink topology from a Samoa task involves invoking all these
+ * stream transformations and finally, marking and initiating loops in the graph. We have to do that
+ * since Flink only allows explicit loops in the topology started with 'iterate()' and closed with
+ * 'closeWith()'. Thus, when we build a flink topology we have to do it incrementally from the
+ * sources, mark loops and initialize them with explicit iterations.
+ *
+ */
+public class FlinkTopology extends AbstractTopology {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlinkTopology.class);
+ public static StreamExecutionEnvironment env;
+ public List<List<FlinkProcessingItem>> topologyLoops = new ArrayList<>();
+ public List<Integer> backEdges = new ArrayList<Integer>();
+
+ public FlinkTopology(String name, StreamExecutionEnvironment env) {
+ super(name);
+ this.env = env;
+ }
+
+ public StreamExecutionEnvironment getEnvironment() {
+ return env;
+ }
+
+ public void build() {
+ markCircles();
+ for (EntranceProcessingItem src : getEntranceProcessingItems()) {
+ ((FlinkEntranceProcessingItem) src).initialise();
+ }
+ initComponents(ImmutableList.copyOf(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class)));
+ }
+
+ private void initComponents(ImmutableList<FlinkProcessingItem> flinkComponents) {
+ if (flinkComponents.isEmpty()) return;
+
+ for (FlinkProcessingItem comp : flinkComponents) {
+ if (comp.canBeInitialised() && !comp.isInitialised() && !comp.isPartOfCircle()) {
+ comp.initialise();
+ comp.initialiseStreams();
+
+ }//if component is part of one or more circle
+ else if (comp.isPartOfCircle() && !comp.isInitialised()) {
+ for (Integer circle : comp.getCircleIds()) {
+ //check if circle can be initialized
+ if (checkCircleReady(circle)) {
+ logger.debug("Circle: " + circle + " can be initialised");
+ initialiseCircle(circle);
+ } else {
+ logger.debug("Circle cannot be initialised");
+ }
+ }
+ }
+
+ }
+ initComponents(ImmutableList.copyOf(Iterables.filter(flinkComponents, new Predicate<FlinkProcessingItem>() {
+ @Override
+ public boolean apply(FlinkProcessingItem flinkComponent) {
+ return !flinkComponent.isInitialised();
+ }
+ })));
+ }
+
+ private void markCircles(){
+ List<FlinkProcessingItem> pis = Lists.newArrayList(Iterables.filter(getProcessingItems(), FlinkProcessingItem.class));
+ List<Integer>[] graph = new List[pis.size()];
+ FlinkProcessingItem[] processingItems = new FlinkProcessingItem[pis.size()];
+
+
+ for (int i=0;i<pis.size();i++) {
+ graph[i] = new ArrayList<Integer>();
+ }
+ //construct the graph of the topology for the Processing Items (No entrance pi is included)
+ for (FlinkProcessingItem pi: pis) {
+ processingItems[pi.getComponentId()] = pi;
+ for (Tuple3<FlinkStream, PartitioningScheme, Integer> is : pi.getInputStreams()) {
+ if (is.f2 != -1) graph[is.f2].add(pi.getComponentId());
+ }
+ }
+ for (int g=0;g<graph.length;g++)
+ logger.debug(graph[g].toString());
+
+ CircleDetection detCircles = new CircleDetection();
+ List<List<Integer>> circles = detCircles.getCircles(graph);
+
+ //update PIs, regarding being part of a circle.
+ for (List<Integer> c : circles){
+ List<FlinkProcessingItem> circle = new ArrayList<>();
+ for (Integer it : c){
+ circle.add(processingItems[it]);
+ processingItems[it].addPItoLoop(topologyLoops.size());
+ }
+ topologyLoops.add(circle);
+ backEdges.add(circle.get(0).getComponentId());
+ }
+ logger.debug("Circles detected in the topology: " + circles);
+ }
+
+
+ private boolean checkCircleReady(int circleId) {
+
+ List<Integer> circleIds = new ArrayList<>();
+
+ for (FlinkProcessingItem pi : topologyLoops.get(circleId)) {
+ circleIds.add(pi.getComponentId());
+ }
+ //check that all incoming to the circle streams are initialised
+ for (FlinkProcessingItem procItem : topologyLoops.get(circleId)) {
+ for (Tuple3<FlinkStream, PartitioningScheme, Integer> inputStream : procItem.getInputStreams()) {
+ //if a inputStream is not initialized AND source of inputStream is not in the circle or a tail of other circle
+ if ((!inputStream.f0.isInitialised()) && (!circleIds.contains(inputStream.f2)) && (!backEdges.contains(inputStream.f2)))
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void initialiseCircle(int circleId) {
+ //get the head and tail of circle
+ FlinkProcessingItem tail = topologyLoops.get(circleId).get(0);
+ FlinkProcessingItem head = topologyLoops.get(circleId).get(topologyLoops.get(circleId).size() - 1);
+
+ //initialise source stream of the iteration, so as to use it for the iteration starting point
+ if (!head.isInitialised()) {
+ head.setOnIteration(true);
+ head.initialise();
+ head.initialiseStreams();
+ }
+
+ //initialise all nodes after head
+ for (int node = topologyLoops.get(circleId).size() - 2; node >= 0; node--) {
+ topologyLoops.get(circleId).get(node).initialise();
+ topologyLoops.get(circleId).get(node).initialiseStreams();
+ }
+
+ ((IterativeDataStream) head.getInStream()).closeWith(head.getInputStreamBySourceID(tail.getComponentId()).getOutStream());
+ }
+
+
+}
diff --git a/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java
new file mode 100644
index 0000000..16d050a
--- /dev/null
+++ b/samoa-flink/src/main/java/com/yahoo/labs/flink/topology/impl/SamoaType.java
@@ -0,0 +1,42 @@
+package com.yahoo.labs.flink.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+public class SamoaType extends Tuple3<String, ContentEvent, String> {
+ public SamoaType() {
+ super();
+ }
+
+ private SamoaType(String key, ContentEvent event, String streamId) {
+ super(key, event, streamId);
+ }
+
+ public static SamoaType of(ContentEvent event, String streamId) {
+ String key = event.getKey() == null ? "none" : event.getKey();
+ return new SamoaType(key, event, streamId);
+ }
+}
\ No newline at end of file