SAMOA-36: update flink adapter to flink 0.9.0 release
diff --git a/pom.xml b/pom.xml
index 1599c3f..0dedc02 100644
--- a/pom.xml
+++ b/pom.xml
@@ -140,7 +140,7 @@
         <miniball.version>1.0.3</miniball.version>
         <s4.version>0.6.0-incubating</s4.version>
         <samza.version>0.7.0</samza.version>
-        <flink.version>0.9.0-milestone-1</flink.version>
+        <flink.version>0.9.0</flink.version>
         <slf4j-log4j12.version>1.7.2</slf4j-log4j12.version>
         <slf4j-simple.version>1.7.5</slf4j-simple.version>
         <maven-surefire-plugin.version>2.18</maven-surefire-plugin.version>
diff --git a/samoa-flink/pom.xml b/samoa-flink/pom.xml
index f56ac70..1ca3af5 100644
--- a/samoa-flink/pom.xml
+++ b/samoa-flink/pom.xml
@@ -1,15 +1,16 @@
+<?xml version="1.0" encoding="UTF-8"?>
 <!--
   #%L
   SAMOA
   %%
-  Copyright (C) 2013 Yahoo! Inc.
+  Copyright (C) 2014 - 2015 Apache Software Foundation
   %%
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at
-
+  
        http://www.apache.org/licenses/LICENSE-2.0
-
+  
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java
index e00874b..45f791a 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java
@@ -24,11 +24,10 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.RichSourceFunction;
-import org.apache.flink.util.Collector;
 import org.apache.samoa.core.EntranceProcessor;
 import org.apache.samoa.flink.helpers.Utils;
 import org.apache.samoa.topology.AbstractEntranceProcessingItem;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 
 import java.io.Serializable;
 
@@ -51,8 +50,23 @@
 		final int compID = getComponentId();
 
 		
-		outStream = env.addSource(new RichSourceFunction<SamoaType>() {
-			volatile boolean canceled;
+		outStream = env.addSource(new RichSourceFunction() {
+
+			private volatile boolean isCancelled;
+			
+			@Override
+			public void run(SourceContext sourceContext) throws Exception {
+				while(!isCancelled && entrProc.hasNext())
+				{
+					sourceContext.collect(SamoaType.of(entrProc.nextEvent(), id));
+				}
+			}
+
+			@Override
+			public void cancel() {
+				isCancelled = true;
+			}
+
 			EntranceProcessor entrProc = proc;
 			String id = streamId;
 
@@ -61,19 +75,8 @@
 				super.open(parameters);
 				entrProc.onCreate(compID);
 			}
-
-			@Override
-			public void run(Collector<SamoaType> collector) throws Exception {
-				while (!canceled && entrProc.hasNext()) {
-					collector.collect(SamoaType.of(entrProc.nextEvent(), id));
-				}
-			}
-
-			@Override
-			public void cancel() {
-				canceled = true;
-			}
-		},Utils.tempTypeInfo);
+			
+		}).returns(Utils.tempTypeInfo);
 
 		((FlinkStream) getOutputStream()).initialise();
 	}
diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
index 3f5431c..28701df 100644
--- a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
+++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
@@ -28,13 +28,14 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.samoa.core.ContentEvent;
 import org.apache.samoa.core.Processor;
 import org.apache.samoa.flink.helpers.Utils;
 import org.apache.samoa.topology.ProcessingItem;
 import org.apache.samoa.topology.Stream;
 import org.apache.samoa.utils.PartitioningScheme;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +44,8 @@
 import java.util.List;
 
 
-public class FlinkProcessingItem extends StreamInvokable<SamoaType, SamoaType> implements ProcessingItem, FlinkComponent, Serializable {
+public class FlinkProcessingItem extends AbstractUdfStreamOperator<SamoaType, FlinkProcessingItem.SamoaDelegateFunction> 
+		implements OneInputStreamOperator<SamoaType, SamoaType>, ProcessingItem, FlinkComponent, Serializable {
 
 	private static final Logger logger = LoggerFactory.getLogger(FlinkProcessingItem.class);
 	public static final int MAX_WAIT_TIME_MILLIS = 10000;
@@ -88,7 +90,7 @@
 	}
 
 	public void putToStream(ContentEvent data, Stream targetStream) {
-		collector.collect(SamoaType.of(data, targetStream.getStreamId()));
+		output.collect(SamoaType.of(data, targetStream.getStreamId()));
 	}
 
 	@Override
@@ -106,7 +108,7 @@
 					if (inStream == null) {
 						inStream = toBeMerged;
 					} else {
-						inStream = inStream.merge(toBeMerged);
+						inStream = inStream.union(toBeMerged);
 					}
 				} catch (RuntimeException e) {
 					e.printStackTrace();
@@ -146,11 +148,8 @@
 	}
 
 	@Override
-	public void invoke() throws Exception {
-		while (readNext() != null) {
-			SamoaType t = nextObject;
-			fun.processEvent(t.f1);
-		}
+	public void processElement(SamoaType samoaType) throws Exception {
+		fun.processEvent(samoaType.f1);
 	}
 
 	@Override
@@ -221,10 +220,6 @@
 		this.onIteration = onIteration;
 	}
 
-	public boolean isOnIteration() {
-		return onIteration;
-	}
-
 	static class SamoaDelegateFunction implements Function, Serializable {
 		private final Processor proc;